void close(final boolean clean)
AssignedTasks
AssignedTasks
is the abstraction of collections of tasks to manage status of multiple tasks as a whole.
AssignedTasks
uses the following internal registries to determine the status of a task:
AssignedTasks
gives the toString method to list the tasks by their status.
Note
|
AssignedTasks is a Java abstract class and cannot be created directly. It is created indirectly when the concrete AssignedTasks are.
|
Task | Description |
---|---|
|
Manages StandbyTasks |
|
Manages StreamTasks |
Closing Zombie Task — closeZombieTask
Method
RuntimeException closeZombieTask(final T task)
closeZombieTask
…FIXME
Note
|
closeZombieTask is used when…FIXME
|
Removing All Task References — clear
Method
void clear()
clear
simply removes all entries from the internal registries: runningByPartition, restoringByPartition, running, created, suspended and restoredPartitions.
Note
|
clear is used exclusively when AssignedTasks is requested to close.
|
Suspending Tasks — suspendTasks
Internal Method
RuntimeException suspendTasks(final Collection<T> tasks)
suspendTasks
…FIXME
Note
|
suspendTasks is used exclusively when AssignedTasks is requested to suspend all active tasks.
|
Checking If There Is At Least One Running Task — hasRunningTasks
Method
boolean hasRunningTasks()
hasRunningTasks
simply checks whether there is at least one task registered in running internal registry or not.
Marking Task As Ready For Execution and Processing Records — transitionToRunning
Internal Method
void transitionToRunning(final T task)
transitionToRunning
simply records the given task as available for processing records (transitions the task to the running state).
Internally, transitionToRunning
prints out the following DEBUG message to the logs:
Transitioning [taskTypeName] [taskId] to running
transitionToRunning
marks the task as running (by adding it to the running internal registry).
Note
|
Registering a task in the running internal registry is the only way to mark the task as running and ready for processing records. |
transitionToRunning
requests the task to initialize the topology of processor nodes.
transitionToRunning
registers the partitions and the changelog partitions of the task (in the runningByPartition internal registry).
Note
|
|
Adding New Task — addNewTask
Method
void addNewTask(final T task)
addNewTask
just adds the input task
in created internal registry.
Note
|
addNewTask is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions (that in turn triggers addStandbyTasks and addStreamTasks).
|
Initializing New Tasks — initializeNewTasks
Method
void initializeNewTasks()
initializeNewTasks
initializes new tasks (that are in the created state), i.e. moves tasks from the created internal registry to either restoring or running registries per whether a task has state stores that may need restoring or not, respectively.
Note
|
initializeNewTasks does nothing unless there is at least one task in the created internal registry.
|
initializeNewTasks
prints out the following DEBUG message to the logs:
Initializing [taskTypeName]s [created]
initializeNewTasks
walks over all tasks in the created internal registry.
initializeNewTasks
requests a task to initialize state stores (of the topology).
If the above yielded true
, initializeNewTasks
marks the task as ready for execution and processing records.
Otherwise, when the above state store initialization yielded false
, initializeNewTasks
prints out the following DEBUG message to the logs and the concrete AssignedStreamsTasks
is requested to register the StreamTask as restoring.
Transitioning [taskTypeName] [taskId] to restoring
In the end, initializeNewTasks
removes the task (after being processed successfully) from the created internal registry.
In case of LockException
, initializeNewTasks
prints out the following TRACE message to the logs:
Could not create [taskTypeName] [taskId] due to [message]; will retry
Note
|
initializeNewTasks is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.
|
Closing Non-Assigned Suspended Tasks — closeNonAssignedSuspendedTasks
Method
void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment)
closeNonAssignedSuspendedTasks
closes non-assigned tasks that were suspended, but are no longer assigned to the Kafka Streams instance or the partitions of the task and the assignment do not match.
Internally, closeNonAssignedSuspendedTasks
takes the suspended tasks and for every task checks if either condition holds:
-
newAssignment
does not contain the id of the suspended task -
The partitions of the suspended task are not equal the partitions in
newAssignment
for the task id
If either condition holds, closeNonAssignedSuspendedTasks
prints out the following DEBUG message to the logs, requests the task to closeSuspended (with the clean
flag enabled) and in the end removes the task from suspended tasks.
Closing suspended and not re-assigned [taskType] [id]
In case of a Exception
, closeNonAssignedSuspendedTasks
prints out the following ERROR message to the logs followed by the exception message.
Failed to remove suspended [taskType] [id] due to the following error
Note
|
closeNonAssignedSuspendedTasks is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions.
|
Attempting to Resume Suspended Task — maybeResumeSuspendedTask
Method
boolean maybeResumeSuspendedTask(
final TaskId taskId,
final Set<TopicPartition> partitions)
maybeResumeSuspendedTask
returns true
after successful transitioning a task (by its taskId
) from suspended to running state when the partitions of the suspended task and the input ones are equal. Otherwise, maybeResumeSuspendedTask
reports an exception (RuntimeException
or TaskMigratedException
) or returns false
.
Internally, maybeResumeSuspendedTask
branches off per whether the task (for the given TaskId) is suspended or not.
If not, maybeResumeSuspendedTask
returns false
.
If the task is suspended, maybeResumeSuspendedTask
prints out the following TRACE message to the logs:
found suspended [taskTypeName] [taskId]
maybeResumeSuspendedTask
checks whether the partitions of the task are exactly the input partitions
.
If the partitions do not match, maybeResumeSuspendedTask
prints out the following WARN message to the logs:
couldn't resume task [taskId] assigned partitions [partitions], task partitions [partitions]
If however the partitions are equal, maybeResumeSuspendedTask
removes the task (by the input taskId
) from suspended registry and requests the task to resume.
maybeResumeSuspendedTask
schedules the task for execution and prints out the following TRACE message to the logs:
resuming suspended [taskTypeName] [taskId]
maybeResumeSuspendedTask
returns true
.
In case of TaskMigratedException
, maybeResumeSuspendedTask
closeZombieTask. If it gives a RuntimeException
, maybeResumeSuspendedTask
re-throws it. Otherwise, maybeResumeSuspendedTask
removes the task (by the input taskId
) from suspended registry (again?!) and re-throws the initial TaskMigratedException
.
Note
|
maybeResumeSuspendedTask is used when TaskManager is requested to create processor tasks for assigned topic partitions (and register new standby and stream tasks).
|
Describing Itself (Textual Representation) — toString
Method
String toString(String indent)
toString
gives a text representation and describes the following:
FIXME toString in action
Note
|
toString is used exclusively when TaskManager is requested to describe itself.
|
describe
Internal Method
void describe(
StringBuilder builder,
Collection<T> tasks,
String indent,
String name)
describe
simply requests every task in the input tasks
to describe itself and uses the indent
and name
to create a text representation.
FIXME toString in action
Note
|
describe is used exclusively when AssignedTasks is requested for a text representation.
|
Getting Partitions of New Tasks with State Store — uninitializedPartitions
Method
Set<TopicPartition> uninitializedPartitions()
uninitializedPartitions
gives the partitions of the new tasks (from created registry) that have state store.
Note
|
uninitializedPartitions gives an empty set of partitions if created is empty, i.e. has no tasks registered.
|
Note
|
uninitializedPartitions is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions.
|
Suspending All Active Tasks — suspend
Method
RuntimeException suspend()
suspend
prints out the following TRACE message to the logs and suspendTasks (from running).
Suspending running [taskTypeName] [runningTaskIds]
suspend
prints out the following TRACE message to the logs and closeNonRunningTasks (from restoring).
Close restoring [taskTypeName] [restoring]
suspend
prints out the following TRACE message to the logs and closeNonRunningTasks (from created).
Close created [taskTypeName] [created]
suspend
removes all task ids from previousActiveTasks and adds the task ids from running.
In the end, suspend
removes all entries from running, restoring, created, runningByPartition and restoringByPartition.
Note
|
suspend is used exclusively when TaskManager is requested to suspend all active and standby stream tasks and state.
|
closeNonRunningTasks
Internal Method
RuntimeException closeNonRunningTasks(final Collection<T> tasks)
closeNonRunningTasks
closes every task in the given tasks
one by one (with clean
and isZombie
flags off).
In case of a RuntimeException
, closeNonRunningTasks
prints out the following ERROR to the logs followed by the exception.
Failed to close [taskTypeName], [id]"
Note
|
closeNonRunningTasks is used exclusively when AssignedTasks is requested to suspend all active tasks (and the input tasks are restoring and created).
|
Executing Task Action with Every Running Task — applyToRunningTasks
Method
void applyToRunningTasks(final TaskAction<T> action)
applyToRunningTasks
applies the input action
to every running task.
applyToRunningTasks
throws the first RuntimeException
if thrown.
Note
|
|
applyToRunningTasks
and TaskMigratedException
In case of a TaskMigratedException
, applyToRunningTasks
prints out the following INFO message to the logs:
Failed to commit [taskTypeName] [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
applyToRunningTasks
closeZombieTask. If it gives a RuntimeException
, applyToRunningTasks
re-throws it. Otherwise, applyToRunningTasks
removes the task (from the iterator but what about running?) and re-throws the initial TaskMigratedException
.
applyToRunningTasks
and RuntimeException
In case of a RuntimeException
, applyToRunningTasks
prints out the following ERROR message to the logs followed by the exception.
Failed to [actionName] [taskTypeName] [taskId] due to the following error:
applyToRunningTasks
records the RuntimeException
for a later re-throwing.
Creating AssignedTasks Instance
AssignedTasks
takes the following when created:
AssignedTasks
initializes the internal properties.
Registering Task for (State Store) Restoring — addToRestoring
Internal Method
void addToRestoring(final T task)
addToRestoring
records the task’s partitions and changelogPartitions in the restoringByPartition internal registry.
Note
|
addToRestoring is used exclusively when AssignedTasks is requested to initialize new tasks (and the task is a StreamTask and has state stores that need restoring).
|
Committing Running Tasks — commit
Method
int commit()
commit
…FIXME
Note
|
commit is used exclusively when TaskManager is requested to commitAll.
|
Checking Whether All Tasks Are Running — allTasksRunning
Method
boolean allTasksRunning()
allTasksRunning
is positive (true
) when there are no created and suspended tasks. Otherwise, allTasksRunning
is negative (false
).
Note
|
allTasksRunning is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.
|
runningTaskFor
Method
T runningTaskFor(TopicPartition partition)
runningTaskFor
simply looks up the task (T
) for the given partition (in the runningByPartition internal registry).
Note
|
runningTaskFor is used when TaskManager is requested to look up the StreamTask and StandbyTask for a given partition.
|
allAssignedTaskIds
Method
Set<TaskId> allAssignedTaskIds()
allAssignedTaskIds
…FIXME
Note
|
allAssignedTaskIds is used when…FIXME
|
Internal Properties
Name | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
Used when…FIXME Tasks IDs are added or removed as follows:
|
|
Kafka partitions per task (that processes records) |
|