TaskManager
TaskManager manages active and standby tasks of a StreamThread.
TaskManager is created alongside a StreamThread (with empty AssignedStreamsTasks and AssignedStandbyTasks).
TaskManager creates new tasks when RebalanceListener handles onPartitionsAssigned (and requests to create processor tasks for assigned topic partitions that in turn adds new stream tasks).
TaskManager uses the StandbyTaskCreator to create standby tasks when requested to addStandbyTasks.
TaskManager is given a AssignedStandbyTasks when created for the following:
When created, TaskManager is given a StreamsMetadataState that is used exclusively to get notifications about the changes in cluster metadata. The StreamsMetadataState can then be displayed when TaskManager is requested for text representation.
// FIXME: TaskManager.toString
|
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
Creating TaskManager Instance
TaskManager takes the following to be created:
-
Kafka "restore" Consumer (
Consumer<byte[], byte[]>) -
AbstractTaskCreator of StreamTasks (
StreamThread.AbstractTaskCreator<StreamTask>) -
AbstractTaskCreator of StandbyTasks (
StreamThread.AbstractTaskCreator<StandbyTask>)
TaskManager initializes the internal properties.
TaskManager and AdminClient
When created, TaskManager is given a Kafka AdminClient.
TaskManager uses the AdminClient when requested to try to purge (delete) commited records.
The AdminClient is also used for the following:
-
StreamsPartitionAssignoris requested to configure (and creates an InternalTopicManager) -
StreamThreadis requested for the adminClientMetrics
Notifying StreamsMetadataState About Changes in Cluster Metadata — setPartitionsByHostState Method
void setPartitionsByHostState(Map<HostInfo, Set<TopicPartition>> partitionsByHostState)
setPartitionsByHostState simply notifies the StreamsMetadataState that cluster metadata has changed.
|
Note
|
setPartitionsByHostState is used when StreamsPartitionAssignor is requested to assign and onAssignment.
|
Setting Assignment Metadata with Active and Standby Tasks — setAssignmentMetadata Method
setAssignmentMetadata(
Map<TaskId, Set<TopicPartition>> activeTasks,
Map<TaskId, Set<TopicPartition>> standbyTasks)
setAssignmentMetadata simply initializes the assignedActiveTasks and assignedStandbyTasks internal registries with the given activeTasks and standbyTasks, respectively.
|
Note
|
setAssignmentMetadata is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the consumer group leader.
|
updateSubscriptionsFromAssignment Method
void updateSubscriptionsFromAssignment(List<TopicPartition> partitions)
updateSubscriptionsFromAssignment…FIXME
|
Note
|
updateSubscriptionsFromAssignment is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the leader
|
updateSubscriptionsFromMetadata Method
void updateSubscriptionsFromMetadata(Set<String> topics)
updateSubscriptionsFromMetadata…FIXME
|
Note
|
updateSubscriptionsFromMetadata is used when StreamsPartitionAssignor is requested to subscription.
|
Suspending All (Active and Standby) Tasks And State — suspendTasksAndState Method
void suspendTasksAndState()
suspendTasksAndState…FIXME
|
Note
|
suspendTasksAndState is used exclusively when RebalanceListener is requested to handle onPartitionsAssigned event.
|
updateNewAndRestoringTasks Method
boolean updateNewAndRestoringTasks()
updateNewAndRestoringTasks returns true only when the AssignedStreamsTasks tasks are all running and AssignedStandbyTasks tasks are all running in the end. Otherwise, updateNewAndRestoringTasks returns false.
Internally, updateNewAndRestoringTasks requests the AssignedStreamsTasks and the AssignedStandbyTasks tasks to initialize new tasks.
updateNewAndRestoringTasks requests the ChangelogReader to restore the AssignedStreamsTasks tasks (restored).
updateNewAndRestoringTasks requests the AssignedStreamsTasks tasks to updateRestored (from the ChangelogReader).
updateNewAndRestoringTasks requests the AssignedStreamsTasks tasks to check whether all StreamTasks are running.
If not all active StreamTasks are running, updateNewAndRestoringTasks simply returns false.
Otherwise, if all active StreamTasks are running, updateNewAndRestoringTasks requests the Kafka consumer for the partition assignment (using Consumer.assignment) and prints out the following TRACE message to the logs:
Resuming partitions [assignment]
updateNewAndRestoringTasks then requests the Kafka consumer to resume partitions (using Consumer.resume), assignStandbyPartitions and returns true.
|
Note
|
updateNewAndRestoringTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (while running the main record processing loop in PARTITIONS_ASSIGNED state).
|
assignStandbyPartitions Internal Method
void assignStandbyPartitions()
assignStandbyPartitions…FIXME
|
Note
|
assignStandbyPartitions is used when…FIXME
|
Looking Up Stream Task Per Partition — activeTask Method
StreamTask activeTask(TopicPartition partition)
activeTask simply requests the AssignedStreamsTasks for the running StreamTask for the input partition.
|
Note
|
activeTask is used exclusively when StreamThread is requested to add records to active stream tasks.
|
hasActiveRunningTasks Method
boolean hasActiveRunningTasks()
hasActiveRunningTasks simply asks the AssignedStreamsTasks whether it has any running stream tasks or not.
|
Note
|
hasActiveRunningTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (and there are records to be processed).
|
hasStandbyRunningTasks Method
boolean hasStandbyRunningTasks()
hasStandbyRunningTasks simply asks the AssignedStandbyTasks whether it has any running standby tasks or not.
|
Note
|
hasStandbyRunningTasks is used exclusively when StreamThread is requested to maybeUpdateStandbyTasks (while poll records once and process them using active stream tasks).
|
Creating Tasks for Assigned Partitions — createTasks Method
void createTasks(final Collection<TopicPartition> assignment)
createTasks requests the AssignedStandbyTasks and the AssignedStreamsTasks to closeNonAssignedSuspendedTasks (with the assignedStandbyTasks and the assignedActiveTasks, respectively).
createTasks (re)creates the stream tasks for the input assignment partitions.
createTasks addStandbyTasks.
createTasks prints out the following TRACE message to the logs:
Pausing partitions: [assignment]
In the end, createTasks requests the Kafka consumer to pause the assignment partitions.
|
Note
|
createTasks triggers Consumer.pause method that suspends fetching records from partitions until they have been resumed using Consumer.resume.
|
createTasks reports an IllegalStateException if the consumer is not defined (null):
stream-thread [threadClientId] consumer has not been initialized while adding stream tasks. This should not happen.
|
Note
|
createTasks is used exclusively when RebalanceListener is requested to handles an onPartitionsAssigned event.
|
(Re)Creating Stream Tasks Per Assigned Partitions — addStreamTasks Internal Method
void addStreamTasks(
Collection<TopicPartition> assignment)
addStreamTasks registers new stream tasks.
|
Note
|
addStreamTasks does nothing (and simply exits) unless assignedActiveTasks has at least one task id.
|
addStreamTasks prints out the following DEBUG message to the logs:
Adding assigned tasks as active: [assignedActiveTasks]
For every task id in assignedActiveTasks, if the partitions of a task are all included in the input assignment partitions addStreamTasks requests AssignedStreamsTasks to maybeResumeSuspendedTask (passing in the task id and partitions). If negative, addStreamTasks records the task and partitions in a local registry of new tasks to be created.
If the partitions of a task are not all included in the input assignment partitions addStreamTasks prints out the following WARN message to the logs:
Task [taskId] owned partitions [partitions] are not contained in the assignment [assignment]
When there are new tasks to be created, addStreamTasks prints out the following TRACE message to the logs:
New active tasks to be created: [newTasks]
addStreamTasks then requests StreamThread.AbstractTaskCreator<StreamTask> to createTasks for every new task (with the Kafka Consumer) and requests AssignedStreamsTasks to register a new task.
|
Note
|
addStreamTasks is used exclusively when TaskManager is requested to create tasks for assigned topic partitions.
|
Adding Assigned Standby Tasks — addStandbyTasks Internal Method
void addStandbyTasks()
addStandbyTasks registers new standby tasks.
|
Note
|
addStandbyTasks does nothing and simply exits when the assignedStandbyTasks internal registry has no standby tasks assigned.
|
addStandbyTasks prints out the following DEBUG message to the logs:
Adding assigned standby tasks [assignedStandbyTasks]
For every task (id and partitions) in the assignedStandbyTasks registry, addStandbyTasks requests AssignedStandbyTasks to maybeResumeSuspendedTask and, if negative, adds the task to tasks to be created in standby mode.
If no new tasks should be in standby mode, addStandbyTasks simply exits.
When there are new tasks to be in standby mode, addStandbyTasks prints out the following TRACE message to the logs:
New standby tasks to be created: [newStandbyTasks]
addStandbyTasks then requests StreamThread.AbstractTaskCreator<StandbyTask> to createTasks for every new standby task (with the Kafka Consumer) and requests AssignedStandbyTasks to register a new task.
|
Note
|
addStandbyTasks is used exclusively when TaskManager is requested to create tasks for assigned partitions.
|
Describing Itself (Textual Representation) — toString Method
String toString(final String indent)
toString gives a text representation with the following:
-
"Active tasks:" followed by the text representation of AssignedStreamsTasks
-
"Standby tasks:" followed by the text representation of AssignedStandbyTasks
FIXME toString in action
|
Note
|
toString is used exclusively when StreamThread is requested to describe itself.
|
Attempting to Purge (Delete) Committed Records of Repartition Topics — maybePurgeCommitedRecords Method
void maybePurgeCommitedRecords()
In essence, maybePurgeCommitedRecords requests the AssignedStreamsTasks for the purgable offsets of the repartition topics (of a topology) and then the AdminClient to delete the records (whose offset is smaller than the given offset of the corresponding partition).
maybePurgeCommitedRecords…FIXME
|
Note
|
maybePurgeCommitedRecords is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed) (when StreamThread is requested to poll records once and process them using active stream tasks in the main record processing loop).
|
Processing Records by Running Stream Tasks (One Record Per Task) — process Method
int process()
process simply requests AssignedStreamsTasks to request the running stream tasks to process a single record (per task).
In the end, process gives the number of stream tasks that processed a record.
|
Note
|
process is used exclusively when StreamThread is requested to process records (with optional commit) (when requested to poll records once and process them using active stream tasks).
|
Committing Active Running Stream Tasks that Requested It — maybeCommitActiveTasks Method
int maybeCommitActiveTasks()
maybeCommitActiveTasks simply requests AssignedStreamsTasks to commit running stream tasks that requested it.
In the end, maybeCommitActiveTasks gives the number of running stream tasks that needed a commit.
|
Note
|
maybeCommitActiveTasks is used exclusively when StreamThread is requested to processAndMaybeCommit.
|
Punctuating Stream Tasks — punctuate Method
int punctuate()
punctuate simply requests the AssignedStreamsTasks to punctuate running stream tasks (by stream and system time).
|
Note
|
punctuate is used exclusively when StreamThread is requested to attempt to punctuate.
|
Committing All Active (Stream and Standby) Tasks — commitAll Method
int commitAll()
commitAll…FIXME
|
Note
|
commitAll is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed).
|
All Active Tasks — activeTaskIds Method
Set<TaskId> activeTaskIds()
activeTaskIds simply requests the AssignedStreamsTasks for the assigned task IDs.
|
Note
|
|
standbyTaskIds Method
Set<TaskId> standbyTaskIds()
standbyTaskIds…FIXME
|
Note
|
standbyTaskIds is used when…FIXME
|
cachedTasksIds Method
Set<TaskId> cachedTasksIds()
cachedTasksIds requests the StreamTask creator for the StateDirectory that is in turn requested for the task directories.
cachedTasksIds collects the TaskIds (parsing the names of the directories) that correspond to task directories with .checkpoint file.
|
Note
|
cachedTasksIds is used exclusively when StreamsPartitionAssignor is requested to subscription.
|
maybeCommitActiveTasksPerUserRequested Method
int maybeCommitActiveTasksPerUserRequested()
maybeCommitActiveTasksPerUserRequested simply requests the AssignedStreamsTasks to maybeCommitPerUserRequested.
|
Note
|
maybeCommitActiveTasksPerUserRequested is used when StreamThread is requested to poll records once and process them using active stream tasks (and any of the active stream tasks have processed a record) and commit all tasks (when commit interval elapsed).
|
Shutting Down — shutdown Method
void shutdown(boolean clean)
shutdown…FIXME
|
Note
|
shutdown is used when…FIXME
|
Looking Up StandbyTask Per Partition — standbyTask Method
StandbyTask standbyTask(TopicPartition partition)
standbyTask simply requests the AssignedStandbyTasks for the running StandbyTask for the input partition.
|
Note
|
standbyTask is used exclusively when StreamThread is requested to attempt to update running StandbyTasks.
|
Previously Active Tasks — prevActiveTaskIds Method
Set<TaskId> prevActiveTaskIds()
prevActiveTaskIds simply requests the AssignedStreamsTasks for the previously active tasks.
|
Note
|
prevActiveTaskIds is used when…FIXME
|
Internal Properties
| Name | Description |
|---|---|
|
Assigned active tasks with the partitions per task id Initialized when setting assignment metadata with active and standby tasks Used exclusively when |
|
Assigned standby tasks (as Kafka TopicPartitions per task id) |
|
Cluster metadata, i.e. Kafka Cluster with topic partitions
|
|
Kafka Consumer ( Assigned right when |
|
Kafka’s Used in maybePurgeCommitedRecords for informatory purposes |