TaskManager

TaskManager manages active and standby tasks of a StreamThread.

TaskManager is created alongside a StreamThread (with empty AssignedStreamsTasks and AssignedStandbyTasks).

kafka streams TaskManager.png
Figure 1. Creating TaskManager

TaskManager creates new tasks when RebalanceListener handles onPartitionsAssigned (and requests to create processor tasks for assigned topic partitions that in turn adds new stream tasks).

TaskManager uses the StandbyTaskCreator to create standby tasks when requested to addStandbyTasks.

TaskManager is given a AssignedStandbyTasks when created for the following:

When created, TaskManager is given a StreamsMetadataState that is used exclusively to get notifications about the changes in cluster metadata. The StreamsMetadataState can then be displayed when TaskManager is requested for text representation.

Displaying StreamsMetadataState (In Textual Representation)
// FIXME: TaskManager.toString
Tip

Enable ALL logging level for org.apache.kafka.streams.processor.internals.TaskManager logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.TaskManager=ALL

Creating TaskManager Instance

TaskManager takes the following to be created:

TaskManager initializes the internal properties.

TaskManager and AdminClient

When created, TaskManager is given a Kafka AdminClient.

TaskManager uses the AdminClient when requested to try to purge (delete) commited records.

The AdminClient is also used for the following:

Notifying StreamsMetadataState About Changes in Cluster Metadata — setPartitionsByHostState Method

void setPartitionsByHostState(Map<HostInfo, Set<TopicPartition>> partitionsByHostState)

setPartitionsByHostState simply notifies the StreamsMetadataState that cluster metadata has changed.

Note
setPartitionsByHostState is used when StreamsPartitionAssignor is requested to assign and onAssignment.

Setting Assignment Metadata with Active and Standby Tasks — setAssignmentMetadata Method

setAssignmentMetadata(
  Map<TaskId, Set<TopicPartition>> activeTasks,
  Map<TaskId, Set<TopicPartition>> standbyTasks)

setAssignmentMetadata simply initializes the assignedActiveTasks and assignedStandbyTasks internal registries with the given activeTasks and standbyTasks, respectively.

Note
setAssignmentMetadata is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the consumer group leader.

updateSubscriptionsFromAssignment Method

void updateSubscriptionsFromAssignment(List<TopicPartition> partitions)

updateSubscriptionsFromAssignment…​FIXME

Note
updateSubscriptionsFromAssignment is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the leader

updateSubscriptionsFromMetadata Method

void updateSubscriptionsFromMetadata(Set<String> topics)

updateSubscriptionsFromMetadata…​FIXME

Note
updateSubscriptionsFromMetadata is used when StreamsPartitionAssignor is requested to subscription.

Suspending All (Active and Standby) Tasks And State — suspendTasksAndState Method

void suspendTasksAndState()

suspendTasksAndState…​FIXME

Note
suspendTasksAndState is used exclusively when RebalanceListener is requested to handle onPartitionsAssigned event.

updateNewAndRestoringTasks Method

boolean updateNewAndRestoringTasks()

updateNewAndRestoringTasks returns true only when the AssignedStreamsTasks tasks are all running and AssignedStandbyTasks tasks are all running in the end. Otherwise, updateNewAndRestoringTasks returns false.

kafka streams TaskManager updateNewAndRestoringTasks.png
Figure 2. TaskManager and updateNewAndRestoringTasks

Internally, updateNewAndRestoringTasks requests the AssignedStreamsTasks and the AssignedStandbyTasks tasks to initialize new tasks.

updateNewAndRestoringTasks requests the ChangelogReader to restore the AssignedStreamsTasks tasks (restored).

updateNewAndRestoringTasks requests the AssignedStreamsTasks tasks to updateRestored (from the ChangelogReader).

updateNewAndRestoringTasks requests the AssignedStreamsTasks tasks to check whether all StreamTasks are running.

If not all active StreamTasks are running, updateNewAndRestoringTasks simply returns false.

Otherwise, if all active StreamTasks are running, updateNewAndRestoringTasks requests the Kafka consumer for the partition assignment (using Consumer.assignment) and prints out the following TRACE message to the logs:

Resuming partitions [assignment]

updateNewAndRestoringTasks then requests the Kafka consumer to resume partitions (using Consumer.resume), assignStandbyPartitions and returns true.

Note
updateNewAndRestoringTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (while running the main record processing loop in PARTITIONS_ASSIGNED state).

assignStandbyPartitions Internal Method

void assignStandbyPartitions()

assignStandbyPartitions…​FIXME

Note
assignStandbyPartitions is used when…​FIXME

Looking Up Stream Task Per Partition — activeTask Method

StreamTask activeTask(TopicPartition partition)

activeTask simply requests the AssignedStreamsTasks for the running StreamTask for the input partition.

Note
activeTask is used exclusively when StreamThread is requested to add records to active stream tasks.

hasActiveRunningTasks Method

boolean hasActiveRunningTasks()

hasActiveRunningTasks simply asks the AssignedStreamsTasks whether it has any running stream tasks or not.

kafka streams TaskManager hasActiveRunningTasks.png
Figure 3. TaskManager and AssignedStreamsTasks
Note
hasActiveRunningTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (and there are records to be processed).

hasStandbyRunningTasks Method

boolean hasStandbyRunningTasks()

hasStandbyRunningTasks simply asks the AssignedStandbyTasks whether it has any running standby tasks or not.

kafka streams TaskManager hasStandbyRunningTasks.png
Figure 4. TaskManager and AssignedStandbyTasks
Note
hasStandbyRunningTasks is used exclusively when StreamThread is requested to maybeUpdateStandbyTasks (while poll records once and process them using active stream tasks).

Creating Tasks for Assigned Partitions — createTasks Method

void createTasks(final Collection<TopicPartition> assignment)
kafka streams TaskManager createTasks.png
Figure 5. TaskManager.createTasks

createTasks requests the AssignedStandbyTasks and the AssignedStreamsTasks to closeNonAssignedSuspendedTasks (with the assignedStandbyTasks and the assignedActiveTasks, respectively).

createTasks (re)creates the stream tasks for the input assignment partitions.

createTasks addStandbyTasks.

createTasks prints out the following TRACE message to the logs:

Pausing partitions: [assignment]

In the end, createTasks requests the Kafka consumer to pause the assignment partitions.

Note
createTasks triggers Consumer.pause method that suspends fetching records from partitions until they have been resumed using Consumer.resume.

createTasks reports an IllegalStateException if the consumer is not defined (null):

stream-thread [threadClientId] consumer has not been initialized while adding stream tasks. This should not happen.
Note
createTasks is used exclusively when RebalanceListener is requested to handles an onPartitionsAssigned event.

(Re)Creating Stream Tasks Per Assigned Partitions — addStreamTasks Internal Method

void addStreamTasks(
  Collection<TopicPartition> assignment)

addStreamTasks registers new stream tasks.

Note
addStreamTasks does nothing (and simply exits) unless assignedActiveTasks has at least one task id.
kafka streams TaskManager addStreamTasks.png
Figure 6. TaskManager.addStreamTasks

addStreamTasks prints out the following DEBUG message to the logs:

Adding assigned tasks as active: [assignedActiveTasks]

For every task id in assignedActiveTasks, if the partitions of a task are all included in the input assignment partitions addStreamTasks requests AssignedStreamsTasks to maybeResumeSuspendedTask (passing in the task id and partitions). If negative, addStreamTasks records the task and partitions in a local registry of new tasks to be created.

If the partitions of a task are not all included in the input assignment partitions addStreamTasks prints out the following WARN message to the logs:

Task [taskId] owned partitions [partitions] are not contained in the assignment [assignment]

When there are new tasks to be created, addStreamTasks prints out the following TRACE message to the logs:

New active tasks to be created: [newTasks]

addStreamTasks then requests StreamThread.AbstractTaskCreator<StreamTask> to createTasks for every new task (with the Kafka Consumer) and requests AssignedStreamsTasks to register a new task.

Note
addStreamTasks is used exclusively when TaskManager is requested to create tasks for assigned topic partitions.

Adding Assigned Standby Tasks — addStandbyTasks Internal Method

void addStandbyTasks()

addStandbyTasks registers new standby tasks.

Note
addStandbyTasks does nothing and simply exits when the assignedStandbyTasks internal registry has no standby tasks assigned.
kafka streams TaskManager addStandbyTasks.png
Figure 7. TaskManager.addStandbyTasks

addStandbyTasks prints out the following DEBUG message to the logs:

Adding assigned standby tasks [assignedStandbyTasks]

For every task (id and partitions) in the assignedStandbyTasks registry, addStandbyTasks requests AssignedStandbyTasks to maybeResumeSuspendedTask and, if negative, adds the task to tasks to be created in standby mode.

If no new tasks should be in standby mode, addStandbyTasks simply exits.

When there are new tasks to be in standby mode, addStandbyTasks prints out the following TRACE message to the logs:

New standby tasks to be created: [newStandbyTasks]

addStandbyTasks then requests StreamThread.AbstractTaskCreator<StandbyTask> to createTasks for every new standby task (with the Kafka Consumer) and requests AssignedStandbyTasks to register a new task.

Note
addStandbyTasks is used exclusively when TaskManager is requested to create tasks for assigned partitions.

Describing Itself (Textual Representation) — toString Method

String toString(final String indent)

toString gives a text representation with the following:

FIXME toString in action
Note
toString is used exclusively when StreamThread is requested to describe itself.

Attempting to Purge (Delete) Committed Records of Repartition Topics — maybePurgeCommitedRecords Method

void maybePurgeCommitedRecords()

In essence, maybePurgeCommitedRecords requests the AssignedStreamsTasks for the purgable offsets of the repartition topics (of a topology) and then the AdminClient to delete the records (whose offset is smaller than the given offset of the corresponding partition).

maybePurgeCommitedRecords…​FIXME

Note
maybePurgeCommitedRecords is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed) (when StreamThread is requested to poll records once and process them using active stream tasks in the main record processing loop).

Processing Records by Running Stream Tasks (One Record Per Task) — process Method

int process()

In the end, process gives the number of stream tasks that processed a record.

Note
process is used exclusively when StreamThread is requested to process records (with optional commit) (when requested to poll records once and process them using active stream tasks).

Committing Active Running Stream Tasks that Requested It — maybeCommitActiveTasks Method

int maybeCommitActiveTasks()

maybeCommitActiveTasks simply requests AssignedStreamsTasks to commit running stream tasks that requested it.

In the end, maybeCommitActiveTasks gives the number of running stream tasks that needed a commit.

Note
maybeCommitActiveTasks is used exclusively when StreamThread is requested to processAndMaybeCommit.

Punctuating Stream Tasks — punctuate Method

int punctuate()
Note
punctuate is used exclusively when StreamThread is requested to attempt to punctuate.

Committing All Active (Stream and Standby) Tasks — commitAll Method

int commitAll()

commitAll…​FIXME

Note
commitAll is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed).

All Active Tasks — activeTaskIds Method

Set<TaskId> activeTaskIds()

activeTaskIds simply requests the AssignedStreamsTasks for the assigned task IDs.

Note

activeTaskIds is used when:

standbyTaskIds Method

Set<TaskId> standbyTaskIds()

standbyTaskIds…​FIXME

Note
standbyTaskIds is used when…​FIXME

cachedTasksIds Method

Set<TaskId> cachedTasksIds()

cachedTasksIds requests the StreamTask creator for the StateDirectory that is in turn requested for the task directories.

cachedTasksIds collects the TaskIds (parsing the names of the directories) that correspond to task directories with .checkpoint file.

Note
cachedTasksIds is used exclusively when StreamsPartitionAssignor is requested to subscription.

maybeCommitActiveTasksPerUserRequested Method

int maybeCommitActiveTasksPerUserRequested()

maybeCommitActiveTasksPerUserRequested simply requests the AssignedStreamsTasks to maybeCommitPerUserRequested.

Note
maybeCommitActiveTasksPerUserRequested is used when StreamThread is requested to poll records once and process them using active stream tasks (and any of the active stream tasks have processed a record) and commit all tasks (when commit interval elapsed).

Shutting Down — shutdown Method

void shutdown(boolean clean)

shutdown…​FIXME

Note
shutdown is used when…​FIXME

Looking Up StandbyTask Per Partition — standbyTask Method

StandbyTask standbyTask(TopicPartition partition)

standbyTask simply requests the AssignedStandbyTasks for the running StandbyTask for the input partition.

Note
standbyTask is used exclusively when StreamThread is requested to attempt to update running StandbyTasks.

Previously Active Tasks — prevActiveTaskIds Method

Set<TaskId> prevActiveTaskIds()

prevActiveTaskIds simply requests the AssignedStreamsTasks for the previously active tasks.

Note
prevActiveTaskIds is used when…​FIXME

Internal Properties

Name Description

assignedActiveTasks

Map<TaskId, Set<TopicPartition>> assignedActiveTasks

Assigned active tasks with the partitions per task id

Used exclusively when TaskManager is requested to create tasks for the assigned partitions

assignedStandbyTasks

Map<TaskId, Set<TopicPartition>> assignedStandbyTasks

Assigned standby tasks (as Kafka TopicPartitions per task id)

cluster

Cluster metadata, i.e. Kafka Cluster with topic partitions

consumer

Kafka Consumer (Consumer<byte[], byte[]>)

Assigned right when StreamThread is created (and corresponds to the Kafka consumer from the KafkaClientSupplier that was used to create the KafkaStreams)

deleteRecordsResult

Kafka’s DeleteRecordsResult (after maybePurgeCommitedRecords and requesting the AdminClient to delete records)

Used in maybePurgeCommitedRecords for informatory purposes

results matching ""

    No results matching ""