StreamsPartitionAssignor — Dynamic Partition Assignment Strategy

StreamsPartitionAssignor is a custom PartitionAssignor (from the Kafka Consumer API) that is used to assign partitions dynamically to the stream processor threads of a Kafka Streams application (identified by the required StreamsConfig.APPLICATION_ID_CONFIG configuration property with the number of stream processor threads per StreamsConfig.NUM_STREAM_THREADS_CONFIG configuration property).

Tip

PartitionAssignor is used for a dynamic partition assignment and distributing partition ownership across the members of a consumer group.

Read up Kafka Client-side Assignment Proposal on the group management in Apache Kafka’s Consumer API.

Read up on PartitionAssignor Contract in The Internals of Apache Kafka.

From Kafka Client-side Assignment Proposal (with extra formatting of mine):

To support client-side assignment, we propose to split the group management protocol into two phases: group membership and state synchronization.

Group Membership phase is used to set the active members of the group and to elect a group leader.

State Synchronization phase is used to enable the group leader to synchronize member state in the group (in other words to assign each member’s state).

From the perspective of the consumer, group membership phase is used to collect member subscriptions, while state synchronization phase is used to propagate partition assignments.

The elected leader in the join group phase is responsible for setting the assignments for the whole group.

Given the above, there will be just one elected StreamsPartitionAssignor among the members of a Kafka Streams application (so you should see the log messages from just a single thread).

StreamsPartitionAssignor (as a fully-qualified class name) is registered under partition.assignment.strategy (ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG) configuration property when StreamsConfig is requested for the configuration for the main Kafka Consumer (when StreamThread is requested to create a new StreamThread instance and requests the KafkaClientSupplier for a Kafka Consumer).

StreamsPartitionAssignor is a Kafka Configurable that is instantiated by reflection with configuration parameters. That is when StreamsPartitionAssignor finds the current TaskManager (using TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property set when the StreamThread is created).

StreamsPartitionAssignor uses stream-thread [client.id] for the logPrefix (that uses client.id configuration property).

StreamsPartitionAssignor uses num.standby.replicas configuration property for the number of standby replicas per processing task when performing partition assignment (and requesting StickyTaskAssignor to assign tasks).

Tip

Enable ALL logging level for org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor=ALL

subscription Method

Subscription subscription(Set<String> topics)
Note
subscription is part of Kafka Consumer’s PartitionAssignor contract to return a local member’s subscription for the topics subscribed to (e.g. using KafkaConsumer.subscribe).

subscription requests the TaskManager for the previously active tasks.

subscription requests the TaskManager for the cached task IDs (aka standbyTasks) and removes all of the previously active tasks.

subscription requests the TaskManager to updateSubscriptionsFromMetadata with the given topics.

In the end, subscription creates a SubscriptionInfo (with the usedSubscriptionMetadataVersion, the process ID from the TaskManager, previously active and standby tasks and the userEndPoint), requests the SubscriptionInfo for the encoded version and creates a Subscription (with the topics and the encoded version).

Handling Partition Assignment From Group Leader — onAssignment Method

void onAssignment(Assignment assignment)
Note

onAssignment is part of Kafka Consumer’s PartitionAssignor contract for a group member to handle assignment from the leader.

onAssignment is executed when a group member has successfully joined a group.

kafka streams StreamsPartitionAssignor onAssignment.png
Figure 1. StreamsPartitionAssignor.onAssignment

onAssignment takes the partitions (from the given assignment) and sorts them by topic and partition.

onAssignment requests AssignmentInfo helper object to decode the additional metadata (i.e. the userData from the input assignment).

Caution
FIXME Finish me

In the end, onAssignment requests the TaskManager for the following:

onAssignment reports an TaskAssignmentException if the numbers of partitions and active tasks are not equal.

Number of assigned partitions [partitions] is not equal to the number of active taskIds [activeTasks], assignmentInfo=[info]

Initialization (Configuring Partition Assignor) — configure Method

void configure(final Map<String, ?> configs)
Note
configure is part of Kafka Consumer’s Configurable contract for classes that are instantiated by reflection and need to take configuration parameters.

configure creates a InternalStreamsConfig for the given configs.

configure initializes the logPrefix (with CLIENT_ID_CONFIG configuration property).

configure uses UPGRADE_FROM_CONFIG configuration property for…​FIXME

configure sets the TaskManager per TASK_MANAGER_FOR_PARTITION_ASSIGNOR internal property (in the given configs). If not found or of a different type than TaskManager, configure throws a KafkaException:

TaskManager is not specified
[className] is not an instance of org.apache.kafka.streams.processor.internals.TaskManager

configure sets the assignmentErrorCode per ASSIGNMENT_ERROR_CODE internal property (in the given configs). If not found or of a different type than AtomicInteger, configure throws a KafkaException:

assignmentErrorCode is not specified
[className] is not an instance of java.util.concurrent.atomic.AtomicInteger

configure sets the numStandbyReplicas per NUM_STANDBY_REPLICAS_CONFIG configuration property (in the given configs).

configure sets the PartitionGrouper per PARTITION_GROUPER_CLASS_CONFIG configuration property (in the given configs).

configure sets the userEndPoint if APPLICATION_SERVER_CONFIG configuration property was defined.

configure creates a new InternalTopicManager (with the Kafka AdminClient of the TaskManager and the InternalStreamsConfig created earlier).

configure creates a new CopartitionedTopicsValidator (with the logPrefix).

prepareTopic Internal Method

void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitions)

prepareTopic prints out the following DEBUG message to the logs:

Starting to validate internal topics [topicPartitions] in partition assignor.

For every InternalTopicMetadata (in the given topicPartitions collection), prepareTopic makes sure that the number of partition is defined, i.e. is 0 or more. If not, prepareTopic throws a StreamsException:

stream-thread [client.id] Topic [[name]] number of partitions not defined

In the end, prepareTopic requests the InternalTopicManager to makeReady the topics and prints out the following DEBUG message to the logs:

Completed validating internal topics [topicPartitions] in partition assignor.
Note
prepareTopic is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment (and finds repartition source and change log topics).

Performing Group Assignment (Assigning Tasks To Consumer Clients) — assign Method

Map<String, Assignment> assign(
  Cluster metadata,
  Map<String, Subscription> subscriptions)
Note
assign is part of Kafka Consumer’s PartitionAssignor contract to perform group assignment given the member subscriptions and current cluster metadata.
Note
The input Map<String, Subscription> contains bindings of a consumer ID and the Kafka Subscription with a list of topics (their names) and a user data encoded (as a java.nio.ByteBuffer).

assign constructs the client metadata (as Map<UUID, ClientMetadata>) from the decoded subscription info (from the user data).

  1. assign takes consumer IDs with subscriptions (from subscriptions).

  2. assign requests SubscriptionInfo to decode the user data of the subscription (aka metadata) and makes sure that the version is supported, i.e. up to 2 currently.

  3. assign finds the client metadata (by the process ID) and creates one if not available.

  4. assign requests the ClientMetadata to addConsumer.

assign prints out the following INFO message to the logs only if the minimal version received is smaller than the latest supported version (i.e. 4).

Downgrading metadata to version [minReceivedMetadataVersion]. Latest supported version is 4.

assign prints out the following DEBUG message to the logs:

Constructed client metadata [clientsMetadata] from the member subscriptions.
Caution
FIXME

assign reports a IllegalStateException when the subscription version is unsupported.

Unknown metadata version: [usedVersion]; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION

Creating Empty Partition Assignment with Error Code — errorAssignment Method

Map<String, Assignment> errorAssignment(
  final Map<UUID, ClientMetadata> clientsMetadata,
  final String topic,
  final int errorCode)

errorAssignment prints out the following ERROR message to the logs:

[topic] is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.

In the end, errorAssignment returns a new map of client IDs with empty Assignment per consumers in the ClientMetadata from the input clientsMetadata.

Note
errorAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.

versionProbingAssignment Internal Method

Map<String, Assignment> versionProbingAssignment(
  final Map<UUID, ClientMetadata> clientsMetadata,
  final Map<TaskId, Set<TopicPartition>> partitionsForTask,
  final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
  final Set<String> futureConsumers,
  final int minUserMetadataVersion)

versionProbingAssignment…​FIXME

Note
versionProbingAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.

computeNewAssignment Internal Method

Map<String, Assignment> computeNewAssignment(
  final Map<UUID, ClientMetadata> clientsMetadata,
  final Map<TaskId, Set<TopicPartition>> partitionsForTask,
  final Map<HostInfo, Set<TopicPartition>> partitionsByHostState,
  final int minUserMetadataVersion)

computeNewAssignment…​FIXME

Note
computeNewAssignment is used exclusively when StreamsPartitionAssignor is requested to perform partition assignment.

processVersionOneAssignment Internal Method

void processVersionOneAssignment(
  AssignmentInfo info,
  List<TopicPartition> partitions,
  Map<TaskId, Set<TopicPartition>> activeTasks)

processVersionOneAssignment…​FIXME

Note
processVersionOneAssignment is used when StreamsPartitionAssignor is requested to handle partition assignment from a group leader (for version 1) and process partition assignment (version 2).

processVersionTwoAssignment Internal Method

void processVersionTwoAssignment(
  AssignmentInfo info,
  List<TopicPartition> partitions,
  Map<TaskId, Set<TopicPartition>> activeTasks,
  Map<TopicPartition, PartitionInfo> topicToPartitionInfo)

processVersionTwoAssignment…​FIXME

Note
processVersionTwoAssignment is used when…​FIXME

Internal Properties

Name Description

assignmentErrorCode

copartitionedTopicsValidator

internalTopicManager

InternalTopicManager

Initialized when StreamsPartitionAssignor is requested to configure itself

Used exclusively when StreamsPartitionAssignor is requested to prepareTopic

partitionGrouper

supportedVersions

Supported versions

All bindings removed (cleared) and then populated in assign

Note
supportedVersions does not seem to be used except to hold the entries.

taskManager

usedSubscriptionMetadataVersion

userEndPoint

User-defined endpoint in the format of host:port as configured by application.server configuration property (default: empty).

Used when StreamsPartitionAssignor is requested to subscription (and create a SubscriptionInfo)

results matching ""

    No results matching ""