AssignedStreamsTasks — AssignedTasks For StreamTasks

AssignedStreamsTasks is a concrete AssignedTasks of StreamTasks that…​FIXME

AssignedStreamsTasks is created for a StreamThread (when KafkaStreams is created).

AssignedStreamsTasks is a RestoringTasks that…​FIXME

It appears that AssignedStreamsTasks simply operates on the running tasks (i.e. the tasks that are in running internal registry). When requested to process or punctuate AssignedStreamsTasks simply walks over the running internal registry and triggers execution of every task.

AssignedStreamsTasks uses the maybeCommit task action (TaskAction<StreamTask>) that is used in maybeCommit. The task action takes a stream task and checks if the task needs a commit. If so, the action does the following:

  1. Increments the committed internal counter

  2. Requests the stream task to commit

  3. Prints out the following DEBUG message to the logs:

    Committed active task [id] per user request in

AssignedStreamsTasks takes a LogContext when created.

AssignedStreamsTasks uses stream task for taskTypeName.

Tip

Enable ALL logging level for org.apache.kafka.streams.processor.internals.AssignedStreamsTasks logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.AssignedStreamsTasks=ALL

Processing Records Using Running Stream Tasks (One Record per Task) — process Method

int process()

process requests every running stream task (that is processable) to process a single record.

In the end, process returns how many stream tasks processed a single record.

Note
process is used exclusively when TaskManager is requested to process records by the running stream tasks (one record per task).

process and TaskMigratedException

In case of a TaskMigratedException, process prints out the following INFO message to the logs:

Failed to process stream task [id] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.

process then closes the task (considering the task a zombie). If this reports a RuntimeException, process re-throws it.

process removes the task from running and throws the TaskMigratedException.

process and RuntimeException

In case of a RuntimeException, process prints out the following ERROR message to the logs followed by the exception.

Failed to process stream task [id] due to the following error:

process re-throws the RuntimeException.

Committing Running Stream Tasks that Requested It — maybeCommit Method

int maybeCommit()

maybeCommit resets the committed internal counter (to 0) and executes the maybeCommitAction task action to every running task (that modifies committed).

In the end, maybeCommit gives the number of running stream tasks that needed a commit.

Note
maybeCommit is used exclusively when TaskManager is requested to maybeCommitActiveTasks.

Punctuating Running Stream Tasks (by Stream and System Time) — punctuate Method

int punctuate()

punctuate walks over the running stream tasks and requests them to attempt to punctuate by stream and system time.

For every successfully executed punctuation, punctuate increments an internal punctuated counter.

In the end, punctuate returns the internal punctuated counter.

In case of a TaskMigratedException, punctuate prints out the following INFO message to the logs, closes the zombie task, and possibly removes the task from the running stream tasks.

Failed to punctuate stream task [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.

In case of a KafkaException, punctuate prints out the following ERROR message to the logs and re-throws the exception.

Failed to punctuate stream task [taskId] due to the following error:
Note
punctuate is used exclusively when TaskManager is requested to punctuate stream tasks.

updateRestored Method

void updateRestored(Collection<TopicPartition> restored)

updateRestored prints out the following TRACE message to the logs:

Stream task changelog partitions that have completed restoring so far: [restored]

updateRestored adds all of the restored partitions to the restoredPartitions internal registry unless they are already present.

For every pair (of TaskId and StreamTask) in the restoring internal registry, updateRestored requests the changelog partitions of the StreamTask and checks whether they are all in the restoredPartitions internal registry or not.

If the changelog partitions are all in the restoredPartitions registry, updateRestored transitionToRunning the task, removes the task from the restoring registry and then prints out the following TRACE message to the logs:

Stream task [id] completed restoration as all its changelog partitions [changelogPartitions] have been applied to restore state

If the changelog partitions are not all in the restoredPartitions registry, updateRestored simply prints out the following TRACE message to the logs:

Stream task [id] cannot resume processing yet since some of its changelog partitions have not completed restoring: [outstandingPartitions]

In the end, updateRestored removes all of the elements from the restoredPartitions internal registry when allTasksRunning.

Note
updateRestored is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.

addToRestoring Method

void addToRestoring(StreamTask task)

addToRestoring adds the given StreamTask to the restoring internal registry.

addToRestoring adds the partitions of the given StreamTask to the restoringByPartition internal registry.

addToRestoring adds the changelog partitions of the given StreamTask to the restoringByPartition internal registry.

Note
addToRestoring is used exclusively when AssignedTasks is requested to initialize new tasks (when TaskManager is requested to updateNewAndRestoringTasks).

Checking Whether All StreamTasks Are Running — allTasksRunning Method

boolean allTasksRunning()
Note
allTasksRunning is part of the AssignedTasks Contract to check whether all tasks are running or not.

allTasksRunning is positive (true) when all StreamTasks are running of the parent AssignedTasks and there are no restoring tasks. Otherwise, allTasksRunning is negative (false).

maybeCommitPerUserRequested Method

int maybeCommitPerUserRequested()

maybeCommitPerUserRequested returns how many running stream tasks have been requested to commit.

Internally, maybeCommitPerUserRequested walks over the running stream tasks and requests them to commit when the commit was requested or needed.

For every commit, maybeCommitPerUserRequested increments an internal committed counter and prints out the following DEBUG message to the logs:

Committed active task [taskId] per user request in

In the end, maybeCommitPerUserRequested returns the internal committed counter.

In case of a TaskMigratedException, maybeCommitPerUserRequested prints out the following INFO message to the logs, closes the zombie task, and possibly removes the task from the running stream tasks.

Failed to commit [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.

In case of a RuntimeException, maybeCommitPerUserRequested prints out the following ERROR message to the logs and re-throws the exception.

Failed to commit StreamTask [taskId] due to the following error:
Note
maybeCommitPerUserRequested is used exclusively when TaskManager is requested to maybeCommitActiveTasksPerUserRequested.

Purgable Offsets of Repartition Topics (of Topology) — recordsToDelete Method

Map<TopicPartition, Long> recordsToDelete()

recordsToDelete simply requests all the running StreamTasks for the purgable offsets of the repartition topics (of a topology).

Note
recordsToDelete is used exclusively when TaskManager is requested to attempt to purge (delete) committed records.

closeAllRestoringTasks Method

RuntimeException closeAllRestoringTasks()

closeAllRestoringTasks…​FIXME

Note
closeAllRestoringTasks is used exclusively when TaskManager is requested to suspend all (active and standby) stream tasks and state.

clear Method

void clear()

clear requests the parent AssignedTasks to clear.

In the end, clear clears up (removes all elements from) the restoring, restoringByPartition, and restoredPartitions internal registries.

Note
clear is used exclusively when AssignedTasks is requested to close.

Describing Itself (Textual Representation) — toString Method

String toString(String indent)
Note
toString is part of the AssignedTasks Contract to describe itself.

toString requests the parent AssignedTasks to describe itself and then describe (with the StreamTasks from the restoring registry and Restoring: name).

FIXME toString in action

allTasks Method

List<StreamTask> allTasks()
Note
allTasks is part of the AssignedTasks Contract to get all stream processor tasks.

allTasks requests the parent AssignedTasks for the all tasks and then adds the restoring tasks.

allAssignedTaskIds Method

Set<TaskId> allAssignedTaskIds()
Note
allAssignedTaskIds is part of the AssignedTasks Contract to get all assigned TaskIds.

allAssignedTaskIds requests the parent AssignedTasks for the assigned task IDs and then adds the restoring task IDs.

Internal Properties

Name Description

committed

Number of…​FIXME

log

restoredPartitions

Restored partitions (Set<TopicPartition>)

restoring

Lookup table of StreamTasks by TaskId (Map<TaskId, StreamTask>)

restoringByPartition

results matching ""

    No results matching ""