KafkaStreams, StreamThreads, StreamTasks and StandbyTasks
KafkaStreams, StreamThreads and StreamsConfig.NUM_STREAM_THREADS_CONFIG
A KafkaStreams instance uses StreamThreads for stream processing.
The number of StreamThreads
is controlled by StreamsConfig.NUM_STREAM_THREADS_CONFIG (num.stream.threads
) configuration property.
StreamThread, Kafka Consumer and Consumer Group
StreamThread
is given a Kafka Consumer (Consumer<byte[], byte[]>
) using KafkaClientSupplier when created.
StreamThread
uses the Kafka consumer to act as a regular Kafka consumer (e.g. subscribes to the source topics of a topology or poll records) and simply becomes a Kafka client.
When KafkaClientSupplier
is requested for a Kafka Consumer, StreamThread
passes on the consumer configuration with ConsumerConfig.GROUP_ID_CONFIG
for consumer group ID and CommonClientConfigs.CLIENT_ID_CONFIG
for client ID.
ConsumerConfig.GROUP_ID_CONFIG
is exactly StreamsConfig.APPLICATION_ID_CONFIG.
In summary, using the required StreamsConfig.APPLICATION_ID_CONFIG for group ID among all StreamThreads
of a Kafka Streams application creates a consumer group. Dynamic partition assignment is controlled using Kafka protocol via StreamsPartitionAssignor among all StreamThreads
.
StreamThread, TaskManager and StreamTasks
Every StreamThread
manages its own TaskManager (with the factories to create stream and standby tasks).
When requested to poll records once and process them using active stream tasks, StreamThread
requests the TaskManager
for active stream tasks for every partition with records (see StreamThread.addRecordsToTasks).
A StreamTask
is responsible for processing records (one at a time) from the assigned partitions (using record buffers (partition queues of RecordQueues per TopicPartition)).
When created, StreamTask
creates a PartitionGroup (with RecordQueues). A RecordQueue
is created with the SourceNode of the ProcessorTopology for the topic of the partition.
A RecordQueue
is simply a partition and a SourceNode
.
When requested to process a single record, StreamTask
requests the PartitionGroup
for the next stamped record (record with timestamp) and the RecordQueue and simply requests the corresponding SourceNode
to process the record.
When requested to process a record, a SourceNode
simply requests the ProcessorContext to forward it (down the topology) and so the record goes (visits / is pushed down to) every node in the topology.
StreamThread
uses TaskManager for processing records using AssignedStreamsTasks (that uses the running StreamTasks).
Every loop of StreamThread (through the TaskManager
and the AssignedStreamsTasks
), every running StreamTask
is requested to process a single record (the next StampedRecord from the PartitionGroup).
When requested for a StreamTask, TaskCreator
is given partitions.
Note
|
Explore the relationship between topic groups, partitions and tasks (builder.build(taskId.topicGroupId) while creating a StreamTask ) which is TaskManager.addStreamTask.
|
The number of StreamTask
is exactly the number of partitions assigned (as StreamTasks
simply create a consumer group and so every partition can only be consumed by one and exactly one consumer group member).
-
Number of StreamTasks = # TaskCreator.createTask = # AbstractTaskCreator.createTasks = TaskManager.addStreamTasks
StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG (num.standby.replicas
) is the number of standby replicas for each task.
-
StickyTaskAssignor.assignActive = number of tasks per StreamThread
-
StickyTaskAssignor.assign
StreamsPartitionAssignor.assign —> TaskManager.builder().topicGroups() —> PartitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata)
At this step = Map<TaskId, Set<TopicPartition>> partitionsForTask
Enable DEBUG logging level for org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor
DEBUG Assigning tasks {} to clients {} with number of replicas {}
Step 1. StreamsPartitionAssignor.onAssignment(final Assignment assignment) —> TaskManager.setAssignmentMetadata(Map<TaskId, Set<TopicPartition>> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks)
Step 2. RebalanceListener.onPartitionsAssigned(Collection<TopicPartition> assignment) —> TaskManager.createTasks(assignment)
StreamsPartitionAssignor
makes sure that the number of assigned partitions to a Kafka Streams application instance is exactly the same as number of active tasks.