Committed active task [id] per user request in
AssignedStreamsTasks — AssignedTasks For StreamTasks
AssignedStreamsTasks
is a concrete AssignedTasks of StreamTasks that…FIXME
AssignedStreamsTasks
is created for a StreamThread (when KafkaStreams
is created).
AssignedStreamsTasks
is a RestoringTasks
that…FIXME
It appears that AssignedStreamsTasks
simply operates on the running tasks (i.e. the tasks that are in running internal registry). When requested to process or punctuate AssignedStreamsTasks
simply walks over the running
internal registry and triggers execution of every task.
AssignedStreamsTasks
uses the maybeCommit task action (TaskAction<StreamTask>
) that is used in maybeCommit. The task action takes a stream task and checks if the task needs a commit. If so, the action does the following:
AssignedStreamsTasks
takes a LogContext
when created.
AssignedStreamsTasks
uses stream task for taskTypeName.
Tip
|
Enable Add the following line to
Refer to Application Logging Using log4j. |
Processing Records Using Running Stream Tasks (One Record per Task) — process
Method
int process()
process
requests every running stream task (that is processable) to process a single record.
In the end, process
returns how many stream tasks processed a single record.
Note
|
process is used exclusively when TaskManager is requested to process records by the running stream tasks (one record per task).
|
process and TaskMigratedException
In case of a TaskMigratedException
, process
prints out the following INFO message to the logs:
Failed to process stream task [id] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
process
then closes the task (considering the task a zombie). If this reports a RuntimeException
, process
re-throws it.
process
removes the task from running and throws the TaskMigratedException
.
Committing Running Stream Tasks that Requested It — maybeCommit
Method
int maybeCommit()
maybeCommit
resets the committed internal counter (to 0
) and executes the maybeCommitAction task action to every running task (that modifies committed).
In the end, maybeCommit
gives the number of running stream tasks that needed a commit.
Note
|
maybeCommit is used exclusively when TaskManager is requested to maybeCommitActiveTasks.
|
Punctuating Running Stream Tasks (by Stream and System Time) — punctuate
Method
int punctuate()
punctuate
walks over the running stream tasks and requests them to attempt to punctuate by stream and system time.
For every successfully executed punctuation, punctuate
increments an internal punctuated
counter.
In the end, punctuate
returns the internal punctuated
counter.
In case of a TaskMigratedException
, punctuate
prints out the following INFO message to the logs, closes the zombie task, and possibly removes the task from the running stream tasks.
Failed to punctuate stream task [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
In case of a KafkaException
, punctuate
prints out the following ERROR message to the logs and re-throws the exception.
Failed to punctuate stream task [taskId] due to the following error:
Note
|
punctuate is used exclusively when TaskManager is requested to punctuate stream tasks.
|
updateRestored
Method
void updateRestored(Collection<TopicPartition> restored)
updateRestored
prints out the following TRACE message to the logs:
Stream task changelog partitions that have completed restoring so far: [restored]
updateRestored
adds all of the restored
partitions to the restoredPartitions internal registry unless they are already present.
For every pair (of TaskId and StreamTask) in the restoring internal registry, updateRestored
requests the changelog partitions of the StreamTask
and checks whether they are all in the restoredPartitions internal registry or not.
If the changelog partitions are all in the restoredPartitions registry, updateRestored
transitionToRunning the task, removes the task from the restoring registry and then prints out the following TRACE message to the logs:
Stream task [id] completed restoration as all its changelog partitions [changelogPartitions] have been applied to restore state
If the changelog partitions are not all in the restoredPartitions registry, updateRestored
simply prints out the following TRACE message to the logs:
Stream task [id] cannot resume processing yet since some of its changelog partitions have not completed restoring: [outstandingPartitions]
In the end, updateRestored
removes all of the elements from the restoredPartitions internal registry when allTasksRunning.
Note
|
updateRestored is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.
|
addToRestoring
Method
void addToRestoring(StreamTask task)
addToRestoring
adds the given StreamTask to the restoring internal registry.
addToRestoring
adds the partitions of the given StreamTask to the restoringByPartition internal registry.
addToRestoring
adds the changelog partitions of the given StreamTask to the restoringByPartition internal registry.
Note
|
addToRestoring is used exclusively when AssignedTasks is requested to initialize new tasks (when TaskManager is requested to updateNewAndRestoringTasks).
|
Checking Whether All StreamTasks Are Running — allTasksRunning
Method
boolean allTasksRunning()
Note
|
allTasksRunning is part of the AssignedTasks Contract to check whether all tasks are running or not.
|
allTasksRunning
is positive (true
) when all StreamTasks are running of the parent AssignedTasks
and there are no restoring tasks. Otherwise, allTasksRunning
is negative (false
).
maybeCommitPerUserRequested
Method
int maybeCommitPerUserRequested()
maybeCommitPerUserRequested
returns how many running stream tasks have been requested to commit.
Internally, maybeCommitPerUserRequested
walks over the running stream tasks and requests them to commit when the commit was requested or needed.
For every commit, maybeCommitPerUserRequested
increments an internal committed
counter and prints out the following DEBUG message to the logs:
Committed active task [taskId] per user request in
In the end, maybeCommitPerUserRequested
returns the internal committed
counter.
In case of a TaskMigratedException
, maybeCommitPerUserRequested
prints out the following INFO message to the logs, closes the zombie task, and possibly removes the task from the running stream tasks.
Failed to commit [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
In case of a RuntimeException
, maybeCommitPerUserRequested
prints out the following ERROR message to the logs and re-throws the exception.
Failed to commit StreamTask [taskId] due to the following error:
Note
|
maybeCommitPerUserRequested is used exclusively when TaskManager is requested to maybeCommitActiveTasksPerUserRequested.
|
Purgable Offsets of Repartition Topics (of Topology) — recordsToDelete
Method
Map<TopicPartition, Long> recordsToDelete()
recordsToDelete
simply requests all the running StreamTasks for the purgable offsets of the repartition topics (of a topology).
Note
|
recordsToDelete is used exclusively when TaskManager is requested to attempt to purge (delete) committed records.
|
closeAllRestoringTasks
Method
RuntimeException closeAllRestoringTasks()
closeAllRestoringTasks
…FIXME
Note
|
closeAllRestoringTasks is used exclusively when TaskManager is requested to suspend all (active and standby) stream tasks and state.
|
clear
Method
void clear()
clear
requests the parent AssignedTasks
to clear.
In the end, clear
clears up (removes all elements from) the restoring, restoringByPartition, and restoredPartitions internal registries.
Note
|
clear is used exclusively when AssignedTasks is requested to close.
|
Describing Itself (Textual Representation) — toString
Method
String toString(String indent)
Note
|
toString is part of the AssignedTasks Contract to describe itself.
|
toString
requests the parent AssignedTasks
to describe itself and then describe (with the StreamTasks from the restoring registry and Restoring:
name).
FIXME toString in action
allTasks
Method
List<StreamTask> allTasks()
Note
|
allTasks is part of the AssignedTasks Contract to get all stream processor tasks.
|
allAssignedTaskIds
Method
Set<TaskId> allAssignedTaskIds()
Note
|
allAssignedTaskIds is part of the AssignedTasks Contract to get all assigned TaskIds.
|
allAssignedTaskIds
requests the parent AssignedTasks
for the assigned task IDs and then adds the restoring task IDs.
Internal Properties
Name | Description |
---|---|
|
|
|
|
|
|
|
Lookup table of StreamTasks by TaskId (
|
|