void close(final boolean clean)
AssignedTasks
AssignedTasks is the abstraction of collections of tasks to manage status of multiple tasks as a whole.
AssignedTasks uses the following internal registries to determine the status of a task:
AssignedTasks gives the toString method to list the tasks by their status.
| 
 Note 
 | 
AssignedTasks is a Java abstract class and cannot be created directly. It is created indirectly when the concrete AssignedTasks are.
 | 
| Task | Description | 
|---|---|
  | 
Manages StandbyTasks  | 
  | 
Manages StreamTasks  | 
 Closing Zombie Task — closeZombieTask Method
RuntimeException closeZombieTask(final T task)
closeZombieTask…FIXME
| 
 Note 
 | 
closeZombieTask is used when…FIXME
 | 
 Removing All Task References — clear Method
void clear()
clear simply removes all entries from the internal registries: runningByPartition, restoringByPartition, running, created, suspended and restoredPartitions.
| 
 Note 
 | 
clear is used exclusively when AssignedTasks is requested to close.
 | 
 Suspending Tasks — suspendTasks Internal Method
RuntimeException suspendTasks(final Collection<T> tasks)
suspendTasks…FIXME
| 
 Note 
 | 
suspendTasks is used exclusively when AssignedTasks is requested to suspend all active tasks.
 | 
 Checking If There Is At Least One Running Task — hasRunningTasks Method
boolean hasRunningTasks()
hasRunningTasks simply checks whether there is at least one task registered in running internal registry or not.
 Marking Task As Ready For Execution and Processing Records — transitionToRunning Internal Method
void transitionToRunning(final T task)
transitionToRunning simply records the given task as available for processing records (transitions the task to the running state).
Internally, transitionToRunning prints out the following DEBUG message to the logs:
Transitioning [taskTypeName] [taskId] to running
transitionToRunning marks the task as running (by adding it to the running internal registry).
| 
 Note 
 | 
Registering a task in the running internal registry is the only way to mark the task as running and ready for processing records. | 
transitionToRunning requests the task to initialize the topology of processor nodes.
transitionToRunning registers the partitions and the changelog partitions of the task (in the runningByPartition internal registry).
| 
 Note 
 | 
 
 
  | 
 Adding New Task — addNewTask Method
void addNewTask(final T task)
addNewTask just adds the input task in created internal registry.
| 
 Note 
 | 
addNewTask is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions (that in turn triggers addStandbyTasks and addStreamTasks).
 | 
 Initializing New Tasks — initializeNewTasks Method
void initializeNewTasks()
initializeNewTasks initializes new tasks (that are in the created state), i.e. moves tasks from the created internal registry to either restoring or running registries per whether a task has state stores that may need restoring or not, respectively.
| 
 Note 
 | 
initializeNewTasks does nothing unless there is at least one task in the created internal registry.
 | 
initializeNewTasks prints out the following DEBUG message to the logs:
Initializing [taskTypeName]s [created]
initializeNewTasks walks over all tasks in the created internal registry.
initializeNewTasks requests a task to initialize state stores (of the topology).
If the above yielded true, initializeNewTasks marks the task as ready for execution and processing records.
Otherwise, when the above state store initialization yielded false, initializeNewTasks prints out the following DEBUG message to the logs and the concrete AssignedStreamsTasks is requested to register the StreamTask as restoring.
Transitioning [taskTypeName] [taskId] to restoring
In the end, initializeNewTasks removes the task (after being processed successfully) from the created internal registry.
In case of LockException, initializeNewTasks prints out the following TRACE message to the logs:
Could not create [taskTypeName] [taskId] due to [message]; will retry
| 
 Note 
 | 
initializeNewTasks is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.
 | 
 Closing Non-Assigned Suspended Tasks — closeNonAssignedSuspendedTasks Method
void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment)
closeNonAssignedSuspendedTasks closes non-assigned tasks that were suspended, but are no longer assigned to the Kafka Streams instance or the partitions of the task and the assignment do not match.
Internally, closeNonAssignedSuspendedTasks takes the suspended tasks and for every task checks if either condition holds:
- 
newAssignmentdoes not contain the id of the suspended task - 
The partitions of the suspended task are not equal the partitions in
newAssignmentfor the task id 
If either condition holds, closeNonAssignedSuspendedTasks prints out the following DEBUG message to the logs, requests the task to closeSuspended (with the clean flag enabled) and in the end removes the task from suspended tasks.
Closing suspended and not re-assigned [taskType] [id]
In case of a Exception, closeNonAssignedSuspendedTasks prints out the following ERROR message to the logs followed by the exception message.
Failed to remove suspended [taskType] [id] due to the following error
| 
 Note 
 | 
closeNonAssignedSuspendedTasks is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions.
 | 
 Attempting to Resume Suspended Task — maybeResumeSuspendedTask Method
boolean maybeResumeSuspendedTask(
  final TaskId taskId,
  final Set<TopicPartition> partitions)
maybeResumeSuspendedTask returns true after successful transitioning a task (by its taskId) from suspended to running state when the partitions of the suspended task and the input ones are equal. Otherwise, maybeResumeSuspendedTask reports an exception (RuntimeException or TaskMigratedException) or returns false.
Internally, maybeResumeSuspendedTask branches off per whether the task (for the given TaskId) is suspended or not.
If not, maybeResumeSuspendedTask returns false.
If the task is suspended, maybeResumeSuspendedTask prints out the following TRACE message to the logs:
found suspended [taskTypeName] [taskId]
maybeResumeSuspendedTask checks whether the partitions of the task are exactly the input partitions.
If the partitions do not match, maybeResumeSuspendedTask prints out the following WARN message to the logs:
couldn't resume task [taskId] assigned partitions [partitions], task partitions [partitions]
If however the partitions are equal, maybeResumeSuspendedTask removes the task (by the input taskId) from suspended registry and requests the task to resume.
maybeResumeSuspendedTask schedules the task for execution and prints out the following TRACE message to the logs:
resuming suspended [taskTypeName] [taskId]
maybeResumeSuspendedTask returns true.
In case of TaskMigratedException, maybeResumeSuspendedTask closeZombieTask. If it gives a RuntimeException, maybeResumeSuspendedTask re-throws it. Otherwise, maybeResumeSuspendedTask removes the task (by the input taskId) from suspended registry (again?!) and re-throws the initial TaskMigratedException.
| 
 Note 
 | 
maybeResumeSuspendedTask is used when TaskManager is requested to create processor tasks for assigned topic partitions (and register new standby and stream tasks).
 | 
 Describing Itself (Textual Representation) — toString Method
String toString(String indent)
toString gives a text representation and describes the following:
FIXME toString in action
| 
 Note 
 | 
toString is used exclusively when TaskManager is requested to describe itself.
 | 
 describe Internal Method
void describe(
  StringBuilder builder,
  Collection<T> tasks,
  String indent,
  String name)
describe simply requests every task in the input tasks to describe itself and uses the indent and name to create a text representation.
FIXME toString in action
| 
 Note 
 | 
describe is used exclusively when AssignedTasks is requested for a text representation.
 | 
 Getting Partitions of New Tasks with State Store — uninitializedPartitions Method
Set<TopicPartition> uninitializedPartitions()
uninitializedPartitions gives the partitions of the new tasks (from created registry) that have state store.
| 
 Note 
 | 
uninitializedPartitions gives an empty set of partitions if created is empty, i.e. has no tasks registered.
 | 
| 
 Note 
 | 
uninitializedPartitions is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions.
 | 
 Suspending All Active Tasks — suspend Method
RuntimeException suspend()
suspend prints out the following TRACE message to the logs and suspendTasks (from running).
Suspending running [taskTypeName] [runningTaskIds]
suspend prints out the following TRACE message to the logs and closeNonRunningTasks (from restoring).
Close restoring [taskTypeName] [restoring]
suspend prints out the following TRACE message to the logs and closeNonRunningTasks (from created).
Close created [taskTypeName] [created]
suspend removes all task ids from previousActiveTasks and adds the task ids from running.
In the end, suspend removes all entries from running, restoring, created, runningByPartition and restoringByPartition.
| 
 Note 
 | 
suspend is used exclusively when TaskManager is requested to suspend all active and standby stream tasks and state.
 | 
 closeNonRunningTasks Internal Method
RuntimeException closeNonRunningTasks(final Collection<T> tasks)
closeNonRunningTasks closes every task in the given tasks one by one (with clean and isZombie flags off).
In case of a RuntimeException, closeNonRunningTasks prints out the following ERROR to the logs followed by the exception.
Failed to close [taskTypeName], [id]"
| 
 Note 
 | 
closeNonRunningTasks is used exclusively when AssignedTasks is requested to suspend all active tasks (and the input tasks are restoring and created).
 | 
 Executing Task Action with Every Running Task  — applyToRunningTasks Method
void applyToRunningTasks(final TaskAction<T> action)
applyToRunningTasks applies the input action to every running task.
applyToRunningTasks throws the first RuntimeException if thrown.
| 
 Note 
 | 
 
 
  | 
 applyToRunningTasks and TaskMigratedException
In case of a TaskMigratedException, applyToRunningTasks prints out the following INFO message to the logs:
Failed to commit [taskTypeName] [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.
applyToRunningTasks closeZombieTask. If it gives a RuntimeException, applyToRunningTasks re-throws it. Otherwise, applyToRunningTasks removes the task (from the iterator but what about running?) and re-throws the initial TaskMigratedException.
 applyToRunningTasks and RuntimeException
In case of a RuntimeException, applyToRunningTasks prints out the following ERROR message to the logs followed by the exception.
Failed to [actionName] [taskTypeName] [taskId] due to the following error:
applyToRunningTasks records the RuntimeException for a later re-throwing.
Creating AssignedTasks Instance
AssignedTasks takes the following when created:
AssignedTasks initializes the internal properties.
 Registering Task for (State Store) Restoring — addToRestoring Internal Method
void addToRestoring(final T task)
addToRestoring records the task’s partitions and changelogPartitions in the restoringByPartition internal registry.
| 
 Note 
 | 
addToRestoring is used exclusively when AssignedTasks is requested to initialize new tasks (and the task is a StreamTask and has state stores that need restoring).
 | 
 Committing Running Tasks — commit Method
int commit()
commit…FIXME
| 
 Note 
 | 
commit is used exclusively when TaskManager is requested to commitAll.
 | 
 Checking Whether All Tasks Are Running — allTasksRunning Method
boolean allTasksRunning()
allTasksRunning is positive (true) when there are no created and suspended tasks. Otherwise, allTasksRunning is negative (false).
| 
 Note 
 | 
allTasksRunning is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.
 | 
 runningTaskFor Method
T runningTaskFor(TopicPartition partition)
runningTaskFor simply looks up the task (T) for the given partition (in the runningByPartition internal registry).
| 
 Note 
 | 
runningTaskFor is used when TaskManager is requested to look up the StreamTask and StandbyTask for a given partition.
 | 
 allAssignedTaskIds Method
Set<TaskId> allAssignedTaskIds()
allAssignedTaskIds…FIXME
| 
 Note 
 | 
allAssignedTaskIds is used when…FIXME
 | 
Internal Properties
| Name | Description | 
|---|---|
  | 
   | 
  | 
  | 
  | 
|
  | 
|
  | 
|
  | 
 Used when…FIXME Tasks IDs are added or removed as follows: 
  | 
  | 
Kafka partitions per task (that processes records)  | 
  |