log4j.logger.org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor=ALL
StreamsPartitionAssignor — Dynamic Partition Assignment Strategy
StreamsPartitionAssignor
is a custom PartitionAssignor
(from the Kafka Consumer API) that is used to assign partitions dynamically to the stream processor threads of a Kafka Streams application (identified by the required StreamsConfig.APPLICATION_ID_CONFIG configuration property with the number of stream processor threads per StreamsConfig.NUM_STREAM_THREADS_CONFIG configuration property).
Tip
|
Read up Kafka Client-side Assignment Proposal on the group management in Apache Kafka’s Consumer API. Read up on |
From Kafka Client-side Assignment Proposal (with extra formatting of mine):
To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization.
Group Membership phase is used to set the active members of the group and to elect a group leader.
State Synchronization phase is used to enable the group leader to synchronize member state in the group (in other words to assign each member’s state).
From the perspective of the consumer, group membership phase is used to collect member subscriptions, while state synchronization phase is used to propagate partition assignments.
The elected leader in the join group phase is responsible for setting the assignments for the whole group.
Given the above, there will be just one elected StreamsPartitionAssignor
among the members of a Kafka Streams application (so you should see the log messages from just a single thread).
StreamsPartitionAssignor
(as a fully-qualified class name) is registered under partition.assignment.strategy (ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
) configuration property when StreamsConfig
is requested for the configuration for the main Kafka Consumer (when StreamThread
is requested to create a new StreamThread instance and requests the KafkaClientSupplier
for a Kafka Consumer).
StreamsPartitionAssignor
is a Kafka Configurable that is instantiated by reflection with configuration parameters. That is when StreamsPartitionAssignor
finds the current TaskManager (using TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property set when the StreamThread
is created).
StreamsPartitionAssignor
uses stream-thread [client.id] for the logPrefix
(that uses client.id configuration property).
StreamsPartitionAssignor
uses num.standby.replicas configuration property for the number of standby replicas per processing task when performing partition assignment (and requesting StickyTaskAssignor
to assign tasks).
Tip
|
Enable Add the following line to Refer to Application Logging Using log4j. |
subscription
Method
Subscription subscription(Set<String> topics)
Note
|
subscription is part of Kafka Consumer’s PartitionAssignor contract to return a local member’s subscription for the topics subscribed to (e.g. using KafkaConsumer.subscribe ).
|
subscription
requests the TaskManager for the previously active tasks.
subscription
requests the TaskManager for the cached task IDs (aka standbyTasks) and removes all of the previously active tasks.
subscription
requests the TaskManager to updateSubscriptionsFromMetadata with the given topics
.
In the end, subscription
creates a SubscriptionInfo (with the usedSubscriptionMetadataVersion, the process ID from the TaskManager, previously active and standby tasks and the userEndPoint), requests the SubscriptionInfo
for the encoded version and creates a Subscription
(with the topics and the encoded version).
Handling Partition Assignment From Group Leader — onAssignment
Method
void onAssignment(Assignment assignment)
Note
|
|
onAssignment
takes the partitions (from the given assignment
) and sorts them by topic and partition.
onAssignment
requests AssignmentInfo
helper object to decode the additional metadata (i.e. the userData
from the input assignment
).
Caution
|
FIXME Finish me |
In the end, onAssignment
requests the TaskManager for the following:
-
Notifying StreamsMetadataState about Cluster Metadata Changes
-
Setting task assignment metadata with active and standby tasks
-
updateSubscriptionsFromAssignment with the assigned partitions
onAssignment
reports an TaskAssignmentException
if the numbers of partitions and active tasks are not equal.
Number of assigned partitions [partitions] is not equal to the number of active taskIds [activeTasks], assignmentInfo=[info]
Initialization (Configuring Partition Assignor) — configure
Method
void configure(final Map<String, ?> configs)
Note
|
configure is part of Kafka Consumer’s Configurable contract for classes that are instantiated by reflection and need to take configuration parameters.
|
configure
creates a InternalStreamsConfig for the given configs
.
configure
initializes the logPrefix (with CLIENT_ID_CONFIG configuration property).
configure
uses UPGRADE_FROM_CONFIG configuration property for…FIXME
configure
sets the TaskManager per TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property (in the given configs
). If not found or of a different type than TaskManager, configure
throws a KafkaException
:
TaskManager is not specified
[className] is not an instance of org.apache.kafka.streams.processor.internals.TaskManager
configure
sets the assignmentErrorCode per ASSIGNMENT_ERROR_CODE internal property (in the given configs
). If not found or of a different type than AtomicInteger
, configure
throws a KafkaException
:
assignmentErrorCode is not specified
[className] is not an instance of java.util.concurrent.atomic.AtomicInteger
configure
sets the numStandbyReplicas per NUM_STANDBY_REPLICAS_CONFIG configuration property (in the given configs
).
configure
sets the PartitionGrouper per PARTITION_GROUPER_CLASS_CONFIG configuration property (in the given configs
).
configure
sets the userEndPoint if APPLICATION_SERVER_CONFIG configuration property was defined.
configure
creates a new InternalTopicManager (with the Kafka AdminClient of the TaskManager and the InternalStreamsConfig
created earlier).
configure
creates a new CopartitionedTopicsValidator (with the logPrefix).
prepareTopic
Internal Method
void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions)
prepareTopic
prints out the following DEBUG message to the logs:
Starting to validate internal topics [topicPartitions] in partition assignor.
For every InternalTopicMetadata
(in the given topicPartitions
collection), prepareTopic
makes sure that the number of partition is defined, i.e. is 0
or more. If not, prepareTopic
throws a StreamsException
:
stream-thread [client.id] Topic [[name]] number of partitions not defined
In the end, prepareTopic
requests the InternalTopicManager to makeReady the topics and prints out the following DEBUG message to the logs:
Completed validating internal topics [topicPartitions] in partition assignor.
Note
|
prepareTopic is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment (and finds repartition source and change log topics).
|
Performing Group Assignment (Assigning Tasks To Consumer Clients) — assign
Method
Map<String, Assignment> assign(
Cluster metadata,
Map<String, Subscription> subscriptions)
Note
|
assign is part of Kafka Consumer’s PartitionAssignor contract to perform group assignment given the member subscriptions and current cluster metadata.
|
Note
|
The input Map<String, Subscription> contains bindings of a consumer ID and the Kafka Subscription with a list of topics (their names) and a user data encoded (as a java.nio.ByteBuffer).
|
assign
constructs the client metadata (as Map<UUID, ClientMetadata>
) from the decoded subscription info (from the user data).
-
assign
takes consumer IDs with subscriptions (fromsubscriptions
). -
assign
requestsSubscriptionInfo
to decode the user data of the subscription (aka metadata) and makes sure that the version is supported, i.e. up to 2 currently. -
assign
finds the client metadata (by the process ID) and creates one if not available. -
assign
requests theClientMetadata
to addConsumer.
assign
prints out the following INFO message to the logs only if the minimal version received is smaller than the latest supported version (i.e. 4
).
Downgrading metadata to version [minReceivedMetadataVersion]. Latest supported version is 4.
assign
prints out the following DEBUG message to the logs:
Constructed client metadata [clientsMetadata] from the member subscriptions.
Caution
|
FIXME |
assign
reports a IllegalStateException
when the subscription version is unsupported.
Unknown metadata version: [usedVersion]; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION
Creating Empty Partition Assignment with Error Code — errorAssignment
Method
Map<String, Assignment> errorAssignment(
final Map<UUID, ClientMetadata> clientsMetadata,
final String topic,
final int errorCode)
errorAssignment
prints out the following ERROR message to the logs:
[topic] is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.
In the end, errorAssignment
returns a new map of client IDs with empty Assignment
per consumers in the ClientMetadata from the input clientsMetadata
.
Note
|
errorAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.
|
versionProbingAssignment
Internal Method
Map<String, Assignment> versionProbingAssignment(
final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Set<String> futureConsumers,
final int minUserMetadataVersion)
versionProbingAssignment
…FIXME
Note
|
versionProbingAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.
|
computeNewAssignment
Internal Method
Map<String, Assignment> computeNewAssignment(
final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final int minUserMetadataVersion)
computeNewAssignment
…FIXME
Note
|
computeNewAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.
|
processVersionOneAssignment
Internal Method
void processVersionOneAssignment(
AssignmentInfo info,
List<TopicPartition> partitions,
Map<TaskId, Set<TopicPartition>> activeTasks)
processVersionOneAssignment
…FIXME
Note
|
processVersionOneAssignment is used when StreamsPartitionAssignor is requested to handle partition assignment from a group leader (for version 1) and process partition assignment (version 2).
|
processVersionTwoAssignment
Internal Method
void processVersionTwoAssignment(
AssignmentInfo info,
List<TopicPartition> partitions,
Map<TaskId, Set<TopicPartition>> activeTasks,
Map<TopicPartition, PartitionInfo> topicToPartitionInfo)
processVersionTwoAssignment
…FIXME
Note
|
processVersionTwoAssignment is used when…FIXME
|
Internal Properties
Name | Description | ||
---|---|---|---|
|
|||
|
|||
|
Initialized when Used exclusively when |
||
|
|||
|
All bindings removed (cleared) and then populated in assign
|
||
|
|||
|
|||
|
User-defined endpoint in the format of Used when |