AssignedTasks

AssignedTasks is the abstraction of collections of tasks to manage status of multiple tasks as a whole.

AssignedTasks uses the following internal registries to determine the status of a task:

  • running for tasks that are considered running (that AssignedStreamsTasks uses when requested to process)

  • created for tasks that are considered new

  • suspended for tasks that are considered suspended

  • restoring for tasks that are considered restoring

AssignedTasks gives the toString method to list the tasks by their status.

Note
AssignedTasks is a Java abstract class and cannot be created directly. It is created indirectly when the concrete AssignedTasks are.
Table 1. AssignedTasks
Task Description

AssignedStandbyTasks

Manages StandbyTasks

AssignedStreamsTasks

Manages StreamTasks

close Method

void close(final boolean clean)

close…​FIXME

Note
close is used when…​FIXME

Closing Zombie Task — closeZombieTask Method

RuntimeException closeZombieTask(final T task)

closeZombieTask…​FIXME

Note
closeZombieTask is used when…​FIXME

Removing All Task References — clear Method

void clear()

clear simply removes all entries from the internal registries: runningByPartition, restoringByPartition, running, created, suspended and restoredPartitions.

Note
clear is used exclusively when AssignedTasks is requested to close.

Suspending Tasks — suspendTasks Internal Method

RuntimeException suspendTasks(final Collection<T> tasks)

suspendTasks…​FIXME

Note
suspendTasks is used exclusively when AssignedTasks is requested to suspend all active tasks.

Checking If There Is At Least One Running Task — hasRunningTasks Method

boolean hasRunningTasks()

hasRunningTasks simply checks whether there is at least one task registered in running internal registry or not.

Note
hasRunningTasks is used exclusively when TaskManager is requested to check if there are any running or standby tasks registered.

Marking Task As Ready For Execution and Processing Records — transitionToRunning Internal Method

void transitionToRunning(final T task)

transitionToRunning simply records the given task as available for processing records (transitions the task to the running state).

Internally, transitionToRunning prints out the following DEBUG message to the logs:

Transitioning [taskTypeName] [taskId] to running

transitionToRunning marks the task as running (by adding it to the running internal registry).

Note
Registering a task in the running internal registry is the only way to mark the task as running and ready for processing records.

transitionToRunning requests the task to initialize the topology of processor nodes.

transitionToRunning registers the partitions and the changelog partitions of the task (in the runningByPartition internal registry).

Note

transitionToRunning is used when:

Adding New Task — addNewTask Method

void addNewTask(final T task)

addNewTask just adds the input task in created internal registry.

kafka streams AssignedTasks addNewTask.png
Figure 1. AssignedTasks and Adding New Task
Note
addNewTask is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions (that in turn triggers addStandbyTasks and addStreamTasks).

Initializing New Tasks — initializeNewTasks Method

void initializeNewTasks()

initializeNewTasks initializes new tasks (that are in the created state), i.e. moves tasks from the created internal registry to either restoring or running registries per whether a task has state stores that may need restoring or not, respectively.

kafka streams AssignedTasks initializeNewTasks.png
Figure 2. AssignedTasks.initializeNewTasks
Note
initializeNewTasks does nothing unless there is at least one task in the created internal registry.

initializeNewTasks prints out the following DEBUG message to the logs:

Initializing [taskTypeName]s [created]

initializeNewTasks walks over all tasks in the created internal registry.

initializeNewTasks requests a task to initialize state stores (of the topology).

If the above yielded true, initializeNewTasks marks the task as ready for execution and processing records.

Otherwise, when the above state store initialization yielded false, initializeNewTasks prints out the following DEBUG message to the logs and the concrete AssignedStreamsTasks is requested to register the StreamTask as restoring.

Transitioning [taskTypeName] [taskId] to restoring

In the end, initializeNewTasks removes the task (after being processed successfully) from the created internal registry.

In case of LockException, initializeNewTasks prints out the following TRACE message to the logs:

Could not create [taskTypeName] [taskId] due to [message]; will retry
Note
initializeNewTasks is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.

Closing Non-Assigned Suspended Tasks — closeNonAssignedSuspendedTasks Method

void closeNonAssignedSuspendedTasks(final Map<TaskId, Set<TopicPartition>> newAssignment)

closeNonAssignedSuspendedTasks closes non-assigned tasks that were suspended, but are no longer assigned to the Kafka Streams instance or the partitions of the task and the assignment do not match.

Internally, closeNonAssignedSuspendedTasks takes the suspended tasks and for every task checks if either condition holds:

  1. newAssignment does not contain the id of the suspended task

  2. The partitions of the suspended task are not equal the partitions in newAssignment for the task id

If either condition holds, closeNonAssignedSuspendedTasks prints out the following DEBUG message to the logs, requests the task to closeSuspended (with the clean flag enabled) and in the end removes the task from suspended tasks.

Closing suspended and not re-assigned [taskType] [id]

In case of a Exception, closeNonAssignedSuspendedTasks prints out the following ERROR message to the logs followed by the exception message.

Failed to remove suspended [taskType] [id] due to the following error
Note
closeNonAssignedSuspendedTasks is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions.

Attempting to Resume Suspended Task — maybeResumeSuspendedTask Method

boolean maybeResumeSuspendedTask(
  final TaskId taskId,
  final Set<TopicPartition> partitions)

maybeResumeSuspendedTask returns true after successful transitioning a task (by its taskId) from suspended to running state when the partitions of the suspended task and the input ones are equal. Otherwise, maybeResumeSuspendedTask reports an exception (RuntimeException or TaskMigratedException) or returns false.

Internally, maybeResumeSuspendedTask branches off per whether the task (for the given TaskId) is suspended or not.

If not, maybeResumeSuspendedTask returns false.

If the task is suspended, maybeResumeSuspendedTask prints out the following TRACE message to the logs:

found suspended [taskTypeName] [taskId]

maybeResumeSuspendedTask checks whether the partitions of the task are exactly the input partitions.

If the partitions do not match, maybeResumeSuspendedTask prints out the following WARN message to the logs:

couldn't resume task [taskId] assigned partitions [partitions], task partitions [partitions]

If however the partitions are equal, maybeResumeSuspendedTask removes the task (by the input taskId) from suspended registry and requests the task to resume.

maybeResumeSuspendedTask schedules the task for execution and prints out the following TRACE message to the logs:

resuming suspended [taskTypeName] [taskId]

maybeResumeSuspendedTask returns true.

In case of TaskMigratedException, maybeResumeSuspendedTask closeZombieTask. If it gives a RuntimeException, maybeResumeSuspendedTask re-throws it. Otherwise, maybeResumeSuspendedTask removes the task (by the input taskId) from suspended registry (again?!) and re-throws the initial TaskMigratedException.

Note
maybeResumeSuspendedTask is used when TaskManager is requested to create processor tasks for assigned topic partitions (and register new standby and stream tasks).

Describing Itself (Textual Representation) — toString Method

String toString(String indent)

toString gives a text representation and describes the following:

FIXME toString in action
Note
toString is used exclusively when TaskManager is requested to describe itself.

describe Internal Method

void describe(
  StringBuilder builder,
  Collection<T> tasks,
  String indent,
  String name)

describe simply requests every task in the input tasks to describe itself and uses the indent and name to create a text representation.

FIXME toString in action
Note
describe is used exclusively when AssignedTasks is requested for a text representation.

Getting Partitions of New Tasks with State Store — uninitializedPartitions Method

Set<TopicPartition> uninitializedPartitions()

uninitializedPartitions gives the partitions of the new tasks (from created registry) that have state store.

Note
uninitializedPartitions gives an empty set of partitions if created is empty, i.e. has no tasks registered.
Note
uninitializedPartitions is used exclusively when TaskManager is requested to create processor tasks for assigned topic partitions.

Suspending All Active Tasks — suspend Method

RuntimeException suspend()

suspend prints out the following TRACE message to the logs and suspendTasks (from running).

Suspending running [taskTypeName] [runningTaskIds]

suspend prints out the following TRACE message to the logs and closeNonRunningTasks (from restoring).

Close restoring [taskTypeName] [restoring]

suspend prints out the following TRACE message to the logs and closeNonRunningTasks (from created).

Close created [taskTypeName] [created]

suspend removes all task ids from previousActiveTasks and adds the task ids from running.

In the end, suspend removes all entries from running, restoring, created, runningByPartition and restoringByPartition.

Note
suspend is used exclusively when TaskManager is requested to suspend all active and standby stream tasks and state.

closeNonRunningTasks Internal Method

RuntimeException closeNonRunningTasks(final Collection<T> tasks)

closeNonRunningTasks closes every task in the given tasks one by one (with clean and isZombie flags off).

In case of a RuntimeException, closeNonRunningTasks prints out the following ERROR to the logs followed by the exception.

Failed to close [taskTypeName], [id]"
Note
closeNonRunningTasks is used exclusively when AssignedTasks is requested to suspend all active tasks (and the input tasks are restoring and created).

Executing Task Action with Every Running Task  — applyToRunningTasks Method

void applyToRunningTasks(final TaskAction<T> action)

applyToRunningTasks applies the input action to every running task.

applyToRunningTasks throws the first RuntimeException if thrown.

Note

applyToRunningTasks is used when:

  • AssignedStreamsTasks is requested to maybeCommit

  • AssignedTasks is requested to commit

applyToRunningTasks and TaskMigratedException

In case of a TaskMigratedException, applyToRunningTasks prints out the following INFO message to the logs:

Failed to commit [taskTypeName] [taskId] since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.

applyToRunningTasks closeZombieTask. If it gives a RuntimeException, applyToRunningTasks re-throws it. Otherwise, applyToRunningTasks removes the task (from the iterator but what about running?) and re-throws the initial TaskMigratedException.

applyToRunningTasks and RuntimeException

In case of a RuntimeException, applyToRunningTasks prints out the following ERROR message to the logs followed by the exception.

Failed to [actionName] [taskTypeName] [taskId] due to the following error:

applyToRunningTasks records the RuntimeException for a later re-throwing.

Creating AssignedTasks Instance

AssignedTasks takes the following when created:

  • LogContext

  • taskTypeName

AssignedTasks initializes the internal properties.

Registering Task for (State Store) Restoring — addToRestoring Internal Method

void addToRestoring(final T task)

addToRestoring records the input task in the restoring internal registry.

addToRestoring records the task’s partitions and changelogPartitions in the restoringByPartition internal registry.

Note
addToRestoring is used exclusively when AssignedTasks is requested to initialize new tasks (and the task is a StreamTask and has state stores that need restoring).

Committing Running Tasks — commit Method

int commit()

commit…​FIXME

Note
commit is used exclusively when TaskManager is requested to commitAll.

Checking Whether All Tasks Are Running — allTasksRunning Method

boolean allTasksRunning()

allTasksRunning is positive (true) when there are no created and suspended tasks. Otherwise, allTasksRunning is negative (false).

Note
allTasksRunning is used exclusively when TaskManager is requested to updateNewAndRestoringTasks.

runningTaskFor Method

T runningTaskFor(TopicPartition partition)

runningTaskFor simply looks up the task (T) for the given partition (in the runningByPartition internal registry).

Note
runningTaskFor is used when TaskManager is requested to look up the StreamTask and StandbyTask for a given partition.

allTasks Method

List<T> allTasks()

allTasks…​FIXME

Note
allTasks is used when…​FIXME

allAssignedTaskIds Method

Set<TaskId> allAssignedTaskIds()

allAssignedTaskIds…​FIXME

Note
allAssignedTaskIds is used when…​FIXME

Internal Properties

Name Description

commitAction

TaskAction that requests running tasks to commit at commit

created

New tasks by their ids

previousActiveTasks

Previously active tasks

restoring

log

running

Running tasks by TaskId (Map<TaskId, T>)

running is a java.util.concurrent.ConcurrentHashMap, i.e. ConcurrentHashMap<TaskId, Task>.

Used when…​FIXME

Tasks IDs are added or removed as follows:

runningByPartition

Kafka partitions per task (that processes records)

suspended

results matching ""

    No results matching ""