Committed active task [id] per user request in
AssignedStreamsTasks — AssignedTasks For StreamTasks
AssignedStreamsTasks is a concrete AssignedTasks of StreamTasks that…FIXME
AssignedStreamsTasks is created for a StreamThread (when KafkaStreams is created).
AssignedStreamsTasks is a RestoringTasks that…FIXME
It appears that AssignedStreamsTasks simply operates on the running tasks (i.e. the tasks that are in running internal registry). When requested to process or punctuate AssignedStreamsTasks simply walks over the running internal registry and triggers execution of every task.
AssignedStreamsTasks uses the maybeCommit task action (TaskAction<StreamTask>) that is used in maybeCommit. The task action takes a stream task and checks if the task needs a commit. If so, the action does the following:
AssignedStreamsTasks takes a LogContext when created.
AssignedStreamsTasks uses stream task for taskTypeName.
|
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
Processing Records Using Running Stream Tasks (One Record per Task) — process Method
int process()
process requests every running stream task (that is processable) to process a single record.
In the end, process returns how many stream tasks processed a single record.
|
Note
|
process is used exclusively when TaskManager is requested to process records by the running stream tasks (one record per task).
|
process and TaskMigratedException
In case of a TaskMigratedException, process prints out the following INFO message to the logs:
Failed to process stream task [id] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
process then closes the task (considering the task a zombie). If this reports a RuntimeException, process re-throws it.
process removes the task from running and throws the TaskMigratedException.
Committing Running Stream Tasks that Requested It — maybeCommit Method
int maybeCommit()
maybeCommit resets the committed internal counter (to 0) and executes the maybeCommitAction task action to every running task (that modifies committed).
In the end, maybeCommit gives the number of running stream tasks that needed a commit.
|
Note
|
maybeCommit is used exclusively when TaskManager is requested to maybeCommitActiveTasks.
|
Punctuating Running Stream Tasks (by Stream and System Time) — punctuate Method
int punctuate()
punctuate walks over the running stream tasks and requests them to attempt to punctuate by stream and system time.
For every successfully executed punctuation, punctuate increments an internal punctuated counter.
In the end, punctuate returns the internal punctuated counter.
In case of a TaskMigratedException, punctuate prints out the following INFO message to the logs, closes the zombie task, and possibly removes the task from the running stream tasks.
Failed to punctuate stream task [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
In case of a KafkaException, punctuate prints out the following ERROR message to the logs and re-throws the exception.
Failed to punctuate stream task [taskId] due to the following error:
|
Note
|
punctuate is used exclusively when TaskManager is requested to punctuate stream tasks.
|
updateRestored Method
void updateRestored(Collection<TopicPartition> restored)
updateRestored prints out the following TRACE message to the logs:
Stream task changelog partitions that have completed restoring so far: [restored]
updateRestored adds all of the restored partitions to the restoredPartitions internal registry unless they are already present.
For every pair (of TaskId and StreamTask) in the restoring internal registry, updateRestored requests the changelog partitions of the StreamTask and checks whether they are all in the restoredPartitions internal registry or not.
If the changelog partitions are all in the restoredPartitions registry, updateRestored transitionToRunning the task, removes the task from the restoring registry and then prints out the following TRACE message to the logs:
Stream task [id] completed restoration as all its changelog partitions [changelogPartitions] have been applied to restore state
If the changelog partitions are not all in the restoredPartitions registry, updateRestored simply prints out the following TRACE message to the logs:
Stream task [id] cannot resume processing yet since some of its changelog partitions have not completed restoring: [outstandingPartitions]
In the end, updateRestored removes all of the elements from the restoredPartitions internal registry when allTasksRunning.
|
Note
|
updateRestored is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.
|
addToRestoring Method
void addToRestoring(StreamTask task)
addToRestoring adds the given StreamTask to the restoring internal registry.
addToRestoring adds the partitions of the given StreamTask to the restoringByPartition internal registry.
addToRestoring adds the changelog partitions of the given StreamTask to the restoringByPartition internal registry.
|
Note
|
addToRestoring is used exclusively when AssignedTasks is requested to initialize new tasks (when TaskManager is requested to updateNewAndRestoringTasks).
|
Checking Whether All StreamTasks Are Running — allTasksRunning Method
boolean allTasksRunning()
|
Note
|
allTasksRunning is part of the AssignedTasks Contract to check whether all tasks are running or not.
|
allTasksRunning is positive (true) when all StreamTasks are running of the parent AssignedTasks and there are no restoring tasks. Otherwise, allTasksRunning is negative (false).
maybeCommitPerUserRequested Method
int maybeCommitPerUserRequested()
maybeCommitPerUserRequested returns how many running stream tasks have been requested to commit.
Internally, maybeCommitPerUserRequested walks over the running stream tasks and requests them to commit when the commit was requested or needed.
For every commit, maybeCommitPerUserRequested increments an internal committed counter and prints out the following DEBUG message to the logs:
Committed active task [taskId] per user request in
In the end, maybeCommitPerUserRequested returns the internal committed counter.
In case of a TaskMigratedException, maybeCommitPerUserRequested prints out the following INFO message to the logs, closes the zombie task, and possibly removes the task from the running stream tasks.
Failed to commit [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
In case of a RuntimeException, maybeCommitPerUserRequested prints out the following ERROR message to the logs and re-throws the exception.
Failed to commit StreamTask [taskId] due to the following error:
|
Note
|
maybeCommitPerUserRequested is used exclusively when TaskManager is requested to maybeCommitActiveTasksPerUserRequested.
|
Purgable Offsets of Repartition Topics (of Topology) — recordsToDelete Method
Map<TopicPartition, Long> recordsToDelete()
recordsToDelete simply requests all the running StreamTasks for the purgable offsets of the repartition topics (of a topology).
|
Note
|
recordsToDelete is used exclusively when TaskManager is requested to attempt to purge (delete) committed records.
|
closeAllRestoringTasks Method
RuntimeException closeAllRestoringTasks()
closeAllRestoringTasks…FIXME
|
Note
|
closeAllRestoringTasks is used exclusively when TaskManager is requested to suspend all (active and standby) stream tasks and state.
|
clear Method
void clear()
clear requests the parent AssignedTasks to clear.
In the end, clear clears up (removes all elements from) the restoring, restoringByPartition, and restoredPartitions internal registries.
|
Note
|
clear is used exclusively when AssignedTasks is requested to close.
|
Describing Itself (Textual Representation) — toString Method
String toString(String indent)
|
Note
|
toString is part of the AssignedTasks Contract to describe itself.
|
toString requests the parent AssignedTasks to describe itself and then describe (with the StreamTasks from the restoring registry and Restoring: name).
FIXME toString in action
allTasks Method
List<StreamTask> allTasks()
|
Note
|
allTasks is part of the AssignedTasks Contract to get all stream processor tasks.
|
allAssignedTaskIds Method
Set<TaskId> allAssignedTaskIds()
|
Note
|
allAssignedTaskIds is part of the AssignedTasks Contract to get all assigned TaskIds.
|
allAssignedTaskIds requests the parent AssignedTasks for the assigned task IDs and then adds the restoring task IDs.
Internal Properties
| Name | Description |
|---|---|
|
|
|
|
|
|
|
Lookup table of StreamTasks by TaskId (
|
|