
TaskManager manages active and standby tasks of a StreamThread.

TaskManager is created alongside a StreamThread (with empty AssignedStreamsTasks and AssignedStandbyTasks).

kafka streams TaskManager.png
Figure 1. Creating TaskManager

TaskManager creates new tasks when RebalanceListener handles onPartitionsAssigned (and requests to create processor tasks for assigned topic partitions that in turn adds new stream tasks).

TaskManager uses the StandbyTaskCreator to create standby tasks when requested to addStandbyTasks.

TaskManager is given a AssignedStandbyTasks when created for the following:

When created, TaskManager is given a StreamsMetadataState that is used exclusively to get notifications about the changes in cluster metadata. The StreamsMetadataState can then be displayed when TaskManager is requested for text representation.

Displaying StreamsMetadataState (In Textual Representation)
Enable ALL logging level for org.apache.kafka.streams.processor.internals.TaskManager logger to see what happens inside.

Add the following line to log4j.properties:


Creating TaskManager Instance

TaskManager takes the following to be created:

TaskManager initializes the internal properties.

TaskManager and AdminClient

When created, TaskManager is given a Kafka AdminClient.

TaskManager uses the AdminClient when requested to try to purge (delete) commited records.

The AdminClient is also used for the following:

Notifying StreamsMetadataState About Changes in Cluster Metadata — setPartitionsByHostState Method

void setPartitionsByHostState(Map<HostInfo, Set<TopicPartition>> partitionsByHostState)

setPartitionsByHostState simply notifies the StreamsMetadataState that cluster metadata has changed.

setPartitionsByHostState is used when StreamsPartitionAssignor is requested to assign and onAssignment.

Setting Assignment Metadata with Active and Standby Tasks — setAssignmentMetadata Method

  Map<TaskId, Set<TopicPartition>> activeTasks,
  Map<TaskId, Set<TopicPartition>> standbyTasks)

setAssignmentMetadata simply initializes the assignedActiveTasks and assignedStandbyTasks internal registries with the given activeTasks and standbyTasks, respectively.

setAssignmentMetadata is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the consumer group leader.

updateSubscriptionsFromAssignment Method

void updateSubscriptionsFromAssignment(List<TopicPartition> partitions)


updateSubscriptionsFromAssignment is used exclusively when StreamsPartitionAssignor is requested to handle assignment from the leader

updateSubscriptionsFromMetadata Method

void updateSubscriptionsFromMetadata(Set<String> topics)


updateSubscriptionsFromMetadata is used when StreamsPartitionAssignor is requested to subscription.

Suspending All (Active and Standby) Tasks And State — suspendTasksAndState Method

void suspendTasksAndState()


suspendTasksAndState is used exclusively when RebalanceListener is requested to handle onPartitionsAssigned event.

updateNewAndRestoringTasks Method

boolean updateNewAndRestoringTasks()

updateNewAndRestoringTasks returns true only when the AssignedStreamsTasks tasks are all running and AssignedStandbyTasks tasks are all running in the end. Otherwise, updateNewAndRestoringTasks returns false.

kafka streams TaskManager updateNewAndRestoringTasks.png
Figure 2. TaskManager and updateNewAndRestoringTasks

Internally, updateNewAndRestoringTasks requests the AssignedStreamsTasks and the AssignedStandbyTasks tasks to initialize new tasks.

updateNewAndRestoringTasks requests the ChangelogReader to restore the AssignedStreamsTasks tasks (restored).

updateNewAndRestoringTasks requests the AssignedStreamsTasks tasks to updateRestored (from the ChangelogReader).

updateNewAndRestoringTasks requests the AssignedStreamsTasks tasks to check whether all StreamTasks are running.

If not all active StreamTasks are running, updateNewAndRestoringTasks simply returns false.

Otherwise, if all active StreamTasks are running, updateNewAndRestoringTasks requests the Kafka consumer for the partition assignment (using Consumer.assignment) and prints out the following TRACE message to the logs:

Resuming partitions [assignment]

updateNewAndRestoringTasks then requests the Kafka consumer to resume partitions (using Consumer.resume), assignStandbyPartitions and returns true.

updateNewAndRestoringTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (while running the main record processing loop in PARTITIONS_ASSIGNED state).

assignStandbyPartitions Internal Method

void assignStandbyPartitions()


assignStandbyPartitions is used when…​FIXME

Looking Up Stream Task Per Partition — activeTask Method

StreamTask activeTask(TopicPartition partition)

activeTask simply requests the AssignedStreamsTasks for the running StreamTask for the input partition.

activeTask is used exclusively when StreamThread is requested to add records to active stream tasks.

hasActiveRunningTasks Method

boolean hasActiveRunningTasks()

hasActiveRunningTasks simply asks the AssignedStreamsTasks whether it has any running stream tasks or not.

kafka streams TaskManager hasActiveRunningTasks.png
Figure 3. TaskManager and AssignedStreamsTasks
hasActiveRunningTasks is used exclusively when StreamThread is requested to poll records once and process them using active stream tasks (and there are records to be processed).

hasStandbyRunningTasks Method

boolean hasStandbyRunningTasks()

hasStandbyRunningTasks simply asks the AssignedStandbyTasks whether it has any running standby tasks or not.

kafka streams TaskManager hasStandbyRunningTasks.png
Figure 4. TaskManager and AssignedStandbyTasks
hasStandbyRunningTasks is used exclusively when StreamThread is requested to maybeUpdateStandbyTasks (while poll records once and process them using active stream tasks).

Creating Tasks for Assigned Partitions — createTasks Method

void createTasks(final Collection<TopicPartition> assignment)
kafka streams TaskManager createTasks.png
Figure 5. TaskManager.createTasks

createTasks requests the AssignedStandbyTasks and the AssignedStreamsTasks to closeNonAssignedSuspendedTasks (with the assignedStandbyTasks and the assignedActiveTasks, respectively).

createTasks (re)creates the stream tasks for the input assignment partitions.

createTasks addStandbyTasks.

createTasks prints out the following TRACE message to the logs:

Pausing partitions: [assignment]

In the end, createTasks requests the Kafka consumer to pause the assignment partitions.

createTasks triggers Consumer.pause method that suspends fetching records from partitions until they have been resumed using Consumer.resume.

createTasks reports an IllegalStateException if the consumer is not defined (null):

stream-thread [threadClientId] consumer has not been initialized while adding stream tasks. This should not happen.
createTasks is used exclusively when RebalanceListener is requested to handles an onPartitionsAssigned event.

(Re)Creating Stream Tasks Per Assigned Partitions — addStreamTasks Internal Method

void addStreamTasks(
  Collection<TopicPartition> assignment)

addStreamTasks registers new stream tasks.

addStreamTasks does nothing (and simply exits) unless assignedActiveTasks has at least one task id.
kafka streams TaskManager addStreamTasks.png
Figure 6. TaskManager.addStreamTasks

addStreamTasks prints out the following DEBUG message to the logs:

Adding assigned tasks as active: [assignedActiveTasks]

For every task id in assignedActiveTasks, if the partitions of a task are all included in the input assignment partitions addStreamTasks requests AssignedStreamsTasks to maybeResumeSuspendedTask (passing in the task id and partitions). If negative, addStreamTasks records the task and partitions in a local registry of new tasks to be created.

If the partitions of a task are not all included in the input assignment partitions addStreamTasks prints out the following WARN message to the logs:

Task [taskId] owned partitions [partitions] are not contained in the assignment [assignment]

When there are new tasks to be created, addStreamTasks prints out the following TRACE message to the logs:

New active tasks to be created: [newTasks]

addStreamTasks then requests StreamThread.AbstractTaskCreator<StreamTask> to createTasks for every new task (with the Kafka Consumer) and requests AssignedStreamsTasks to register a new task.

addStreamTasks is used exclusively when TaskManager is requested to create tasks for assigned topic partitions.

Adding Assigned Standby Tasks — addStandbyTasks Internal Method

void addStandbyTasks()

addStandbyTasks registers new standby tasks.

addStandbyTasks does nothing and simply exits when the assignedStandbyTasks internal registry has no standby tasks assigned.
kafka streams TaskManager addStandbyTasks.png
Figure 7. TaskManager.addStandbyTasks

addStandbyTasks prints out the following DEBUG message to the logs:

Adding assigned standby tasks [assignedStandbyTasks]

For every task (id and partitions) in the assignedStandbyTasks registry, addStandbyTasks requests AssignedStandbyTasks to maybeResumeSuspendedTask and, if negative, adds the task to tasks to be created in standby mode.

If no new tasks should be in standby mode, addStandbyTasks simply exits.

When there are new tasks to be in standby mode, addStandbyTasks prints out the following TRACE message to the logs:

New standby tasks to be created: [newStandbyTasks]

addStandbyTasks then requests StreamThread.AbstractTaskCreator<StandbyTask> to createTasks for every new standby task (with the Kafka Consumer) and requests AssignedStandbyTasks to register a new task.

addStandbyTasks is used exclusively when TaskManager is requested to create tasks for assigned partitions.

Describing Itself (Textual Representation) — toString Method

String toString(final String indent)

toString gives a text representation with the following:

toString is used exclusively when StreamThread is requested to describe itself.

Attempting to Purge (Delete) Committed Records of Repartition Topics — maybePurgeCommitedRecords Method

void maybePurgeCommitedRecords()

In essence, maybePurgeCommitedRecords requests the AssignedStreamsTasks for the purgable offsets of the repartition topics (of a topology) and then the AdminClient to delete the records (whose offset is smaller than the given offset of the corresponding partition).


maybePurgeCommitedRecords is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed) (when StreamThread is requested to poll records once and process them using active stream tasks in the main record processing loop).

Processing Records by Running Stream Tasks (One Record Per Task) — process Method

int process()

In the end, process gives the number of stream tasks that processed a record.

process is used exclusively when StreamThread is requested to process records (with optional commit) (when requested to poll records once and process them using active stream tasks).

Committing Active Running Stream Tasks that Requested It — maybeCommitActiveTasks Method

int maybeCommitActiveTasks()

maybeCommitActiveTasks simply requests AssignedStreamsTasks to commit running stream tasks that requested it.

In the end, maybeCommitActiveTasks gives the number of running stream tasks that needed a commit.

maybeCommitActiveTasks is used exclusively when StreamThread is requested to processAndMaybeCommit.

Punctuating Stream Tasks — punctuate Method

int punctuate()
punctuate is used exclusively when StreamThread is requested to attempt to punctuate.

Committing All Active (Stream and Standby) Tasks — commitAll Method

int commitAll()


commitAll is used exclusively when StreamThread is requested to commit all tasks (when commit interval elapsed).

All Active Tasks — activeTaskIds Method

Set<TaskId> activeTaskIds()

activeTaskIds simply requests the AssignedStreamsTasks for the assigned task IDs.


activeTaskIds is used when:

standbyTaskIds Method

Set<TaskId> standbyTaskIds()


standbyTaskIds is used when…​FIXME

cachedTasksIds Method

Set<TaskId> cachedTasksIds()

cachedTasksIds requests the StreamTask creator for the StateDirectory that is in turn requested for the task directories.

cachedTasksIds collects the TaskIds (parsing the names of the directories) that correspond to task directories with .checkpoint file.

cachedTasksIds is used exclusively when StreamsPartitionAssignor is requested to subscription.

maybeCommitActiveTasksPerUserRequested Method

int maybeCommitActiveTasksPerUserRequested()

maybeCommitActiveTasksPerUserRequested simply requests the AssignedStreamsTasks to maybeCommitPerUserRequested.

maybeCommitActiveTasksPerUserRequested is used when StreamThread is requested to poll records once and process them using active stream tasks (and any of the active stream tasks have processed a record) and commit all tasks (when commit interval elapsed).

Shutting Down — shutdown Method

void shutdown(boolean clean)


shutdown is used when…​FIXME

Looking Up StandbyTask Per Partition — standbyTask Method

StandbyTask standbyTask(TopicPartition partition)

standbyTask simply requests the AssignedStandbyTasks for the running StandbyTask for the input partition.

standbyTask is used exclusively when StreamThread is requested to attempt to update running StandbyTasks.

Previously Active Tasks — prevActiveTaskIds Method

Set<TaskId> prevActiveTaskIds()

prevActiveTaskIds simply requests the AssignedStreamsTasks for the previously active tasks.

prevActiveTaskIds is used when…​FIXME

Internal Properties

Name Description


Map<TaskId, Set<TopicPartition>> assignedActiveTasks

Assigned active tasks with the partitions per task id

Used exclusively when TaskManager is requested to create tasks for the assigned partitions


Map<TaskId, Set<TopicPartition>> assignedStandbyTasks

Assigned standby tasks (as Kafka TopicPartitions per task id)


Cluster metadata, i.e. Kafka Cluster with topic partitions


Kafka Consumer (Consumer<byte[], byte[]>)

Assigned right when StreamThread is created (and corresponds to the Kafka consumer from the KafkaClientSupplier that was used to create the KafkaStreams)


Kafka’s DeleteRecordsResult (after maybePurgeCommitedRecords and requesting the AdminClient to delete records)

Used in maybePurgeCommitedRecords for informatory purposes

