TaskManager
TaskManager
manages active and standby tasks of a StreamThread.
TaskManager
is created alongside a StreamThread (with empty AssignedStreamsTasks and AssignedStandbyTasks).
TaskManager
creates new tasks when RebalanceListener
handles onPartitionsAssigned (and requests to create processor tasks for assigned topic partitions that in turn adds new stream tasks).
TaskManager
uses the StandbyTaskCreator to create standby tasks when requested to addStandbyTasks.
TaskManager
is given a AssignedStandbyTasks when created for the following:
When created, TaskManager
is given a StreamsMetadataState that is used exclusively to get notifications about the changes in cluster metadata. The StreamsMetadataState can then be displayed when TaskManager
is requested for text representation.
// FIXME: TaskManager.toString
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
Creating TaskManager Instance
TaskManager
takes the following to be created:
-
Kafka "restore" Consumer (
Consumer<byte[], byte[]>
) -
AbstractTaskCreator of StreamTasks (
StreamThread.AbstractTaskCreator<StreamTask>
) -
AbstractTaskCreator of StandbyTasks (
StreamThread.AbstractTaskCreator<StandbyTask>
)
TaskManager
initializes the internal properties.
TaskManager and AdminClient
When created, TaskManager
is given a Kafka AdminClient.
TaskManager
uses the AdminClient
when requested to try to purge (delete) commited records.
The AdminClient
is also used for the following:
-
StreamsPartitionAssignor
is requested to configure (and creates an InternalTopicManager) -
StreamThread
is requested for the adminClientMetrics
Notifying StreamsMetadataState About Changes in Cluster Metadata — setPartitionsByHostState
Method
void setPartitionsByHostState(Map<HostInfo, Set<TopicPartition>> partitionsByHostState)
setPartitionsByHostState
simply notifies the StreamsMetadataState that cluster metadata has changed.
Note
|
setPartitionsByHostState is used when StreamsPartitionAssignor is requested to assign and onAssignment.
|
Setting Assignment Metadata with Active and Standby Tasks — setAssignmentMetadata
Method
setAssignmentMetadata(
Map<TaskId, Set<TopicPartition>> activeTasks,
Map<TaskId, Set<TopicPartition>> standbyTasks)
setAssignmentMetadata
simply initializes the assignedActiveTasks and assignedStandbyTasks internal registries with the given activeTasks
and standbyTasks
, respectively.
Note
|
setAssignmentMetadata is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the consumer group leader.
|
updateSubscriptionsFromAssignment
Method
void updateSubscriptionsFromAssignment(List<TopicPartition> partitions)
updateSubscriptionsFromAssignment
…FIXME
Note
|
updateSubscriptionsFromAssignment is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the leader
|
updateSubscriptionsFromMetadata
Method
void updateSubscriptionsFromMetadata(Set<String> topics)
updateSubscriptionsFromMetadata
…FIXME
Note
|
updateSubscriptionsFromMetadata is used when StreamsPartitionAssignor is requested to subscription.
|
Suspending All (Active and Standby) Tasks And State — suspendTasksAndState
Method
void suspendTasksAndState()
suspendTasksAndState
…FIXME
Note
|
suspendTasksAndState is used exclusively when RebalanceListener is requested to handle onPartitionsAssigned event.
|
updateNewAndRestoringTasks
Method
boolean updateNewAndRestoringTasks()
updateNewAndRestoringTasks
returns true
only when the AssignedStreamsTasks tasks are all running and AssignedStandbyTasks tasks are all running in the end. Otherwise, updateNewAndRestoringTasks
returns false
.
Internally, updateNewAndRestoringTasks
requests the AssignedStreamsTasks and the AssignedStandbyTasks tasks to initialize new tasks.
updateNewAndRestoringTasks
requests the ChangelogReader to restore the AssignedStreamsTasks tasks (restored).
updateNewAndRestoringTasks
requests the AssignedStreamsTasks tasks to updateRestored (from the ChangelogReader).
updateNewAndRestoringTasks
requests the AssignedStreamsTasks tasks to check whether all StreamTasks are running.
If not all active StreamTasks are running, updateNewAndRestoringTasks
simply returns false
.
Otherwise, if all active StreamTasks are running, updateNewAndRestoringTasks
requests the Kafka consumer for the partition assignment (using Consumer.assignment) and prints out the following TRACE message to the logs:
Resuming partitions [assignment]
updateNewAndRestoringTasks
then requests the Kafka consumer to resume partitions (using Consumer.resume), assignStandbyPartitions and returns true
.
Note
|
updateNewAndRestoringTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (while running the main record processing loop in PARTITIONS_ASSIGNED state).
|
assignStandbyPartitions
Internal Method
void assignStandbyPartitions()
assignStandbyPartitions
…FIXME
Note
|
assignStandbyPartitions is used when…FIXME
|
Looking Up Stream Task Per Partition — activeTask
Method
StreamTask activeTask(TopicPartition partition)
activeTask
simply requests the AssignedStreamsTasks for the running StreamTask for the input partition.
Note
|
activeTask is used exclusively when StreamThread is requested to add records to active stream tasks.
|
hasActiveRunningTasks
Method
boolean hasActiveRunningTasks()
hasActiveRunningTasks
simply asks the AssignedStreamsTasks whether it has any running stream tasks or not.
Note
|
hasActiveRunningTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (and there are records to be processed).
|
hasStandbyRunningTasks
Method
boolean hasStandbyRunningTasks()
hasStandbyRunningTasks
simply asks the AssignedStandbyTasks whether it has any running standby tasks or not.
Note
|
hasStandbyRunningTasks is used exclusively when StreamThread is requested to maybeUpdateStandbyTasks (while poll records once and process them using active stream tasks).
|
Creating Tasks for Assigned Partitions — createTasks
Method
void createTasks(final Collection<TopicPartition> assignment)
createTasks
requests the AssignedStandbyTasks and the AssignedStreamsTasks to closeNonAssignedSuspendedTasks (with the assignedStandbyTasks and the assignedActiveTasks, respectively).
createTasks
(re)creates the stream tasks for the input assignment
partitions.
createTasks
addStandbyTasks.
createTasks
prints out the following TRACE message to the logs:
Pausing partitions: [assignment]
In the end, createTasks
requests the Kafka consumer to pause the assignment
partitions.
Note
|
createTasks triggers Consumer.pause method that suspends fetching records from partitions until they have been resumed using Consumer.resume .
|
createTasks
reports an IllegalStateException
if the consumer is not defined (null
):
stream-thread [threadClientId] consumer has not been initialized while adding stream tasks. This should not happen.
Note
|
createTasks is used exclusively when RebalanceListener is requested to handles an onPartitionsAssigned event.
|
(Re)Creating Stream Tasks Per Assigned Partitions — addStreamTasks
Internal Method
void addStreamTasks(
Collection<TopicPartition> assignment)
addStreamTasks
registers new stream tasks.
Note
|
addStreamTasks does nothing (and simply exits) unless assignedActiveTasks has at least one task id.
|
addStreamTasks
prints out the following DEBUG message to the logs:
Adding assigned tasks as active: [assignedActiveTasks]
For every task id in assignedActiveTasks, if the partitions of a task are all included in the input assignment
partitions addStreamTasks
requests AssignedStreamsTasks to maybeResumeSuspendedTask (passing in the task id and partitions). If negative, addStreamTasks
records the task and partitions in a local registry of new tasks to be created.
If the partitions of a task are not all included in the input assignment
partitions addStreamTasks
prints out the following WARN message to the logs:
Task [taskId] owned partitions [partitions] are not contained in the assignment [assignment]
When there are new tasks to be created, addStreamTasks
prints out the following TRACE message to the logs:
New active tasks to be created: [newTasks]
addStreamTasks
then requests StreamThread.AbstractTaskCreator<StreamTask> to createTasks for every new task (with the Kafka Consumer) and requests AssignedStreamsTasks to register a new task.
Note
|
addStreamTasks is used exclusively when TaskManager is requested to create tasks for assigned topic partitions.
|
Adding Assigned Standby Tasks — addStandbyTasks
Internal Method
void addStandbyTasks()
addStandbyTasks
registers new standby tasks.
Note
|
addStandbyTasks does nothing and simply exits when the assignedStandbyTasks internal registry has no standby tasks assigned.
|
addStandbyTasks
prints out the following DEBUG message to the logs:
Adding assigned standby tasks [assignedStandbyTasks]
For every task (id and partitions) in the assignedStandbyTasks registry, addStandbyTasks
requests AssignedStandbyTasks to maybeResumeSuspendedTask and, if negative, adds the task to tasks to be created in standby mode.
If no new tasks should be in standby mode, addStandbyTasks
simply exits.
When there are new tasks to be in standby mode, addStandbyTasks
prints out the following TRACE message to the logs:
New standby tasks to be created: [newStandbyTasks]
addStandbyTasks
then requests StreamThread.AbstractTaskCreator<StandbyTask> to createTasks for every new standby task (with the Kafka Consumer) and requests AssignedStandbyTasks to register a new task.
Note
|
addStandbyTasks is used exclusively when TaskManager is requested to create tasks for assigned partitions.
|
Describing Itself (Textual Representation) — toString
Method
String toString(final String indent)
toString
gives a text representation with the following:
-
"Active tasks:" followed by the text representation of AssignedStreamsTasks
-
"Standby tasks:" followed by the text representation of AssignedStandbyTasks
FIXME toString in action
Note
|
toString is used exclusively when StreamThread is requested to describe itself.
|
Attempting to Purge (Delete) Committed Records of Repartition Topics — maybePurgeCommitedRecords
Method
void maybePurgeCommitedRecords()
In essence, maybePurgeCommitedRecords
requests the AssignedStreamsTasks for the purgable offsets of the repartition topics (of a topology) and then the AdminClient to delete the records (whose offset is smaller than the given offset of the corresponding partition).
maybePurgeCommitedRecords
…FIXME
Note
|
maybePurgeCommitedRecords is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed) (when StreamThread is requested to poll records once and process them using active stream tasks in the main record processing loop).
|
Processing Records by Running Stream Tasks (One Record Per Task) — process
Method
int process()
process
simply requests AssignedStreamsTasks to request the running stream tasks to process a single record (per task).
In the end, process
gives the number of stream tasks that processed a record.
Note
|
process is used exclusively when StreamThread is requested to process records (with optional commit) (when requested to poll records once and process them using active stream tasks).
|
Committing Active Running Stream Tasks that Requested It — maybeCommitActiveTasks
Method
int maybeCommitActiveTasks()
maybeCommitActiveTasks
simply requests AssignedStreamsTasks to commit running stream tasks that requested it.
In the end, maybeCommitActiveTasks
gives the number of running stream tasks that needed a commit.
Note
|
maybeCommitActiveTasks is used exclusively when StreamThread is requested to processAndMaybeCommit.
|
Punctuating Stream Tasks — punctuate
Method
int punctuate()
punctuate
simply requests the AssignedStreamsTasks to punctuate running stream tasks (by stream and system time).
Note
|
punctuate is used exclusively when StreamThread is requested to attempt to punctuate.
|
Committing All Active (Stream and Standby) Tasks — commitAll
Method
int commitAll()
commitAll
…FIXME
Note
|
commitAll is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed).
|
All Active Tasks — activeTaskIds
Method
Set<TaskId> activeTaskIds()
activeTaskIds
simply requests the AssignedStreamsTasks for the assigned task IDs.
Note
|
|
standbyTaskIds
Method
Set<TaskId> standbyTaskIds()
standbyTaskIds
…FIXME
Note
|
standbyTaskIds is used when…FIXME
|
cachedTasksIds
Method
Set<TaskId> cachedTasksIds()
cachedTasksIds
requests the StreamTask creator for the StateDirectory that is in turn requested for the task directories.
cachedTasksIds
collects the TaskIds (parsing the names of the directories) that correspond to task directories with .checkpoint file.
Note
|
cachedTasksIds is used exclusively when StreamsPartitionAssignor is requested to subscription.
|
maybeCommitActiveTasksPerUserRequested
Method
int maybeCommitActiveTasksPerUserRequested()
maybeCommitActiveTasksPerUserRequested
simply requests the AssignedStreamsTasks to maybeCommitPerUserRequested.
Note
|
maybeCommitActiveTasksPerUserRequested is used when StreamThread is requested to poll records once and process them using active stream tasks (and any of the active stream tasks have processed a record) and commit all tasks (when commit interval elapsed).
|
Shutting Down — shutdown
Method
void shutdown(boolean clean)
shutdown
…FIXME
Note
|
shutdown is used when…FIXME
|
Looking Up StandbyTask Per Partition — standbyTask
Method
StandbyTask standbyTask(TopicPartition partition)
standbyTask
simply requests the AssignedStandbyTasks for the running StandbyTask for the input partition.
Note
|
standbyTask is used exclusively when StreamThread is requested to attempt to update running StandbyTasks.
|
Previously Active Tasks — prevActiveTaskIds
Method
Set<TaskId> prevActiveTaskIds()
prevActiveTaskIds
simply requests the AssignedStreamsTasks for the previously active tasks.
Note
|
prevActiveTaskIds is used when…FIXME
|
Internal Properties
Name | Description |
---|---|
|
Assigned active tasks with the partitions per task id Initialized when setting assignment metadata with active and standby tasks Used exclusively when |
|
Assigned standby tasks (as Kafka TopicPartitions per task id) |
|
Cluster metadata, i.e. Kafka Cluster with topic partitions
|
|
Kafka Consumer ( Assigned right when |
|
Kafka’s Used in maybePurgeCommitedRecords for informatory purposes |