log4j.logger.org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor=ALL
StreamsPartitionAssignor — Dynamic Partition Assignment Strategy
StreamsPartitionAssignor is a custom PartitionAssignor (from the Kafka Consumer API) that is used to assign partitions dynamically to the stream processor threads of a Kafka Streams application (identified by the required StreamsConfig.APPLICATION_ID_CONFIG configuration property with the number of stream processor threads per StreamsConfig.NUM_STREAM_THREADS_CONFIG configuration property).
|
Tip
|
Read up Kafka Client-side Assignment Proposal on the group management in Apache Kafka’s Consumer API. Read up on |
From Kafka Client-side Assignment Proposal (with extra formatting of mine):
To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization.
Group Membership phase is used to set the active members of the group and to elect a group leader.
State Synchronization phase is used to enable the group leader to synchronize member state in the group (in other words to assign each member’s state).
From the perspective of the consumer, group membership phase is used to collect member subscriptions, while state synchronization phase is used to propagate partition assignments.
The elected leader in the join group phase is responsible for setting the assignments for the whole group.
Given the above, there will be just one elected StreamsPartitionAssignor among the members of a Kafka Streams application (so you should see the log messages from just a single thread).
StreamsPartitionAssignor (as a fully-qualified class name) is registered under partition.assignment.strategy (ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) configuration property when StreamsConfig is requested for the configuration for the main Kafka Consumer (when StreamThread is requested to create a new StreamThread instance and requests the KafkaClientSupplier for a Kafka Consumer).
StreamsPartitionAssignor is a Kafka Configurable that is instantiated by reflection with configuration parameters. That is when StreamsPartitionAssignor finds the current TaskManager (using TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property set when the StreamThread is created).
StreamsPartitionAssignor uses stream-thread [client.id] for the logPrefix (that uses client.id configuration property).
StreamsPartitionAssignor uses num.standby.replicas configuration property for the number of standby replicas per processing task when performing partition assignment (and requesting StickyTaskAssignor to assign tasks).
|
Tip
|
Enable Add the following line to Refer to Application Logging Using log4j. |
subscription Method
Subscription subscription(Set<String> topics)
|
Note
|
subscription is part of Kafka Consumer’s PartitionAssignor contract to return a local member’s subscription for the topics subscribed to (e.g. using KafkaConsumer.subscribe).
|
subscription requests the TaskManager for the previously active tasks.
subscription requests the TaskManager for the cached task IDs (aka standbyTasks) and removes all of the previously active tasks.
subscription requests the TaskManager to updateSubscriptionsFromMetadata with the given topics.
In the end, subscription creates a SubscriptionInfo (with the usedSubscriptionMetadataVersion, the process ID from the TaskManager, previously active and standby tasks and the userEndPoint), requests the SubscriptionInfo for the encoded version and creates a Subscription (with the topics and the encoded version).
Handling Partition Assignment From Group Leader — onAssignment Method
void onAssignment(Assignment assignment)
|
Note
|
|
onAssignment takes the partitions (from the given assignment) and sorts them by topic and partition.
onAssignment requests AssignmentInfo helper object to decode the additional metadata (i.e. the userData from the input assignment).
|
Caution
|
FIXME Finish me |
In the end, onAssignment requests the TaskManager for the following:
-
Notifying StreamsMetadataState about Cluster Metadata Changes
-
Setting task assignment metadata with active and standby tasks
-
updateSubscriptionsFromAssignment with the assigned partitions
onAssignment reports an TaskAssignmentException if the numbers of partitions and active tasks are not equal.
Number of assigned partitions [partitions] is not equal to the number of active taskIds [activeTasks], assignmentInfo=[info]
Initialization (Configuring Partition Assignor) — configure Method
void configure(final Map<String, ?> configs)
|
Note
|
configure is part of Kafka Consumer’s Configurable contract for classes that are instantiated by reflection and need to take configuration parameters.
|
configure creates a InternalStreamsConfig for the given configs.
configure initializes the logPrefix (with CLIENT_ID_CONFIG configuration property).
configure uses UPGRADE_FROM_CONFIG configuration property for…FIXME
configure sets the TaskManager per TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property (in the given configs). If not found or of a different type than TaskManager, configure throws a KafkaException:
TaskManager is not specified
[className] is not an instance of org.apache.kafka.streams.processor.internals.TaskManager
configure sets the assignmentErrorCode per ASSIGNMENT_ERROR_CODE internal property (in the given configs). If not found or of a different type than AtomicInteger, configure throws a KafkaException:
assignmentErrorCode is not specified
[className] is not an instance of java.util.concurrent.atomic.AtomicInteger
configure sets the numStandbyReplicas per NUM_STANDBY_REPLICAS_CONFIG configuration property (in the given configs).
configure sets the PartitionGrouper per PARTITION_GROUPER_CLASS_CONFIG configuration property (in the given configs).
configure sets the userEndPoint if APPLICATION_SERVER_CONFIG configuration property was defined.
configure creates a new InternalTopicManager (with the Kafka AdminClient of the TaskManager and the InternalStreamsConfig created earlier).
configure creates a new CopartitionedTopicsValidator (with the logPrefix).
prepareTopic Internal Method
void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions)
prepareTopic prints out the following DEBUG message to the logs:
Starting to validate internal topics [topicPartitions] in partition assignor.
For every InternalTopicMetadata (in the given topicPartitions collection), prepareTopic makes sure that the number of partition is defined, i.e. is 0 or more. If not, prepareTopic throws a StreamsException:
stream-thread [client.id] Topic [[name]] number of partitions not defined
In the end, prepareTopic requests the InternalTopicManager to makeReady the topics and prints out the following DEBUG message to the logs:
Completed validating internal topics [topicPartitions] in partition assignor.
|
Note
|
prepareTopic is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment (and finds repartition source and change log topics).
|
Performing Group Assignment (Assigning Tasks To Consumer Clients) — assign Method
Map<String, Assignment> assign(
Cluster metadata,
Map<String, Subscription> subscriptions)
|
Note
|
assign is part of Kafka Consumer’s PartitionAssignor contract to perform group assignment given the member subscriptions and current cluster metadata.
|
|
Note
|
The input Map<String, Subscription> contains bindings of a consumer ID and the Kafka Subscription with a list of topics (their names) and a user data encoded (as a java.nio.ByteBuffer).
|
assign constructs the client metadata (as Map<UUID, ClientMetadata>) from the decoded subscription info (from the user data).
-
assigntakes consumer IDs with subscriptions (fromsubscriptions). -
assignrequestsSubscriptionInfoto decode the user data of the subscription (aka metadata) and makes sure that the version is supported, i.e. up to 2 currently. -
assignfinds the client metadata (by the process ID) and creates one if not available. -
assignrequests theClientMetadatato addConsumer.
assign prints out the following INFO message to the logs only if the minimal version received is smaller than the latest supported version (i.e. 4).
Downgrading metadata to version [minReceivedMetadataVersion]. Latest supported version is 4.
assign prints out the following DEBUG message to the logs:
Constructed client metadata [clientsMetadata] from the member subscriptions.
|
Caution
|
FIXME |
assign reports a IllegalStateException when the subscription version is unsupported.
Unknown metadata version: [usedVersion]; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION
Creating Empty Partition Assignment with Error Code — errorAssignment Method
Map<String, Assignment> errorAssignment(
final Map<UUID, ClientMetadata> clientsMetadata,
final String topic,
final int errorCode)
errorAssignment prints out the following ERROR message to the logs:
[topic] is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.
In the end, errorAssignment returns a new map of client IDs with empty Assignment per consumers in the ClientMetadata from the input clientsMetadata.
|
Note
|
errorAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.
|
versionProbingAssignment Internal Method
Map<String, Assignment> versionProbingAssignment(
final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final Set<String> futureConsumers,
final int minUserMetadataVersion)
versionProbingAssignment…FIXME
|
Note
|
versionProbingAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.
|
computeNewAssignment Internal Method
Map<String, Assignment> computeNewAssignment(
final Map<UUID, ClientMetadata> clientsMetadata,
final Map<TaskId, Set<TopicPartition>> partitionsForTask,
final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
final int minUserMetadataVersion)
computeNewAssignment…FIXME
|
Note
|
computeNewAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.
|
processVersionOneAssignment Internal Method
void processVersionOneAssignment(
AssignmentInfo info,
List<TopicPartition> partitions,
Map<TaskId, Set<TopicPartition>> activeTasks)
processVersionOneAssignment…FIXME
|
Note
|
processVersionOneAssignment is used when StreamsPartitionAssignor is requested to handle partition assignment from a group leader (for version 1) and process partition assignment (version 2).
|
processVersionTwoAssignment Internal Method
void processVersionTwoAssignment(
AssignmentInfo info,
List<TopicPartition> partitions,
Map<TaskId, Set<TopicPartition>> activeTasks,
Map<TopicPartition, PartitionInfo> topicToPartitionInfo)
processVersionTwoAssignment…FIXME
|
Note
|
processVersionTwoAssignment is used when…FIXME
|
Internal Properties
| Name | Description | ||
|---|---|---|---|
|
|||
|
|||
|
Initialized when Used exclusively when |
||
|
|||
|
All bindings removed (cleared) and then populated in assign
|
||
|
|||
|
|||
|
User-defined endpoint in the format of Used when |