KafkaStreams, StreamThreads, StreamTasks and StandbyTasks

KafkaStreams, StreamThreads and StreamsConfig.NUM_STREAM_THREADS_CONFIG

A KafkaStreams instance uses StreamThreads for stream processing.

The number of StreamThreads is controlled by StreamsConfig.NUM_STREAM_THREADS_CONFIG (num.stream.threads) configuration property.

StreamThread, Kafka Consumer and Consumer Group

StreamThread is given a Kafka Consumer (Consumer<byte[], byte[]>) using KafkaClientSupplier when created.

StreamThread uses the Kafka consumer to act as a regular Kafka consumer (e.g. subscribes to the source topics of a topology or poll records) and simply becomes a Kafka client.

When KafkaClientSupplier is requested for a Kafka Consumer, StreamThread passes on the consumer configuration with ConsumerConfig.GROUP_ID_CONFIG for consumer group ID and CommonClientConfigs.CLIENT_ID_CONFIG for client ID.

ConsumerConfig.GROUP_ID_CONFIG is exactly StreamsConfig.APPLICATION_ID_CONFIG.

In summary, using the required StreamsConfig.APPLICATION_ID_CONFIG for group ID among all StreamThreads of a Kafka Streams application creates a consumer group. Dynamic partition assignment is controlled using Kafka protocol via StreamsPartitionAssignor among all StreamThreads.

StreamThread, TaskManager and StreamTasks

Every StreamThread manages its own TaskManager (with the factories to create stream and standby tasks).

When requested to poll records once and process them using active stream tasks, StreamThread requests the TaskManager for active stream tasks for every partition with records (see StreamThread.addRecordsToTasks).

A StreamTask is responsible for processing records (one at a time) from the assigned partitions (using record buffers (partition queues of RecordQueues per TopicPartition)).

When created, StreamTask creates a PartitionGroup (with RecordQueues). A RecordQueue is created with the SourceNode of the ProcessorTopology for the topic of the partition.

A RecordQueue is simply a partition and a SourceNode.

When requested to process a single record, StreamTask requests the PartitionGroup for the next stamped record (record with timestamp) and the RecordQueue and simply requests the corresponding SourceNode to process the record.

When requested to process a record, a SourceNode simply requests the ProcessorContext to forward it (down the topology) and so the record goes (visits / is pushed down to) every node in the topology.

StreamThread uses TaskManager for processing records using AssignedStreamsTasks (that uses the running StreamTasks).

Every loop of StreamThread (through the TaskManager and the AssignedStreamsTasks), every running StreamTask is requested to process a single record (the next StampedRecord from the PartitionGroup).

When requested for a StreamTask, TaskCreator is given partitions.

Note
Explore the relationship between topic groups, partitions and tasks (builder.build(taskId.topicGroupId) while creating a StreamTask) which is TaskManager.addStreamTask.

The number of StreamTask is exactly the number of partitions assigned (as StreamTasks simply create a consumer group and so every partition can only be consumed by one and exactly one consumer group member).

  • Number of StreamTasks = # TaskCreator.createTask = # AbstractTaskCreator.createTasks = TaskManager.addStreamTasks

StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG (num.standby.replicas) is the number of standby replicas for each task.

  • StickyTaskAssignor.assignActive = number of tasks per StreamThread

  • StickyTaskAssignor.assign

StreamsPartitionAssignor.assign —> TaskManager.builder().topicGroups() —> PartitionGrouper.partitionGroups(sourceTopicsByGroup, fullMetadata)

At this step = Map<TaskId, Set<TopicPartition>> partitionsForTask

Enable DEBUG logging level for org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor

DEBUG Assigning tasks {} to clients {} with number of replicas {}

Step 1. StreamsPartitionAssignor.onAssignment(final Assignment assignment) —> TaskManager.setAssignmentMetadata(Map<TaskId, Set<TopicPartition>> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks)

Step 2. RebalanceListener.onPartitionsAssigned(Collection<TopicPartition> assignment) —> TaskManager.createTasks(assignment)

results matching ""

    No results matching ""