ConsumerCoordinator

ConsumerCoordinator is a concrete AbstractCoordinator that…​FIXME

ConsumerCoordinator is created exclusively when KafkaConsumer is created.

ConsumerCoordinator.png
Figure 1. ConsumerCoordinator and KafkaConsumer

KafkaConsumer uses the following Consumer configuration properties to create a ConsumerCoordinator.

Table 1. ConsumerCoordinator’s Consumer Configuration Properties
Configuration Property ConsumerCoordinator’s Property

max.poll.interval.ms

rebalanceTimeoutMs

session.timeout.ms

sessionTimeoutMs

heartbeat.interval.ms

heartbeatIntervalMs

retry.backoff.ms

retryBackoffMs

enable.auto.commit

autoCommitEnabled

auto.commit.interval.ms

autoCommitIntervalMs

exclude.internal.topics

excludeInternalTopics

internal.leave.group.on.close

leaveGroupOnClose

With autoCommitEnabled enabled (i.e. enable.auto.commit is true), ConsumerCoordinator does:

ConsumerCoordinator uses zero or more PartitionAssignors for the following:

Table 2. ConsumerCoordinator’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

assignmentSnapshot

MetadataSnapshot

completedOffsetCommits

Java’s java.util.concurrent.ConcurrentLinkedQueue of OffsetCommitCompletions with user-defined OffsetCommitCallbacks

OffsetCommitCallbacks are executed in invokeCompletedOffsetCommitCallbacks

OffsetCommitCompletion are added in commitOffsetsAsync (and also directly in doCommitOffsetsAsync)

isLeader

Flag that says whether the ConsumerCoordinator is a leader (true) or not (false).

Default: false

Turned on in performAssignment and off in onJoinPrepare

Used exclusively for onJoinComplete (to reset the assignmentSnapshot when off)

metadataSnapshot

MetadataSnapshot metadataSnapshot

MetadataSnapshot (for the SubscriptionState and the cluster metadata from the Metadata)

Tip

Enable ERROR, WARN, INFO, DEBUG, or TRACE logging level for org.apache.kafka.clients.consumer.internals.ConsumerCoordinator logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator=TRACE

Refer to Logging.

Sending Asynchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsAsync Method

void maybeAutoCommitOffsetsAsync(long now)

maybeAutoCommitOffsetsAsync…​FIXME

Note

maybeAutoCommitOffsetsAsync is used when:

Sending Synchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsSync Internal Method

void maybeAutoCommitOffsetsSync(long timeoutMs)

maybeAutoCommitOffsetsSync…​FIXME

Note
maybeAutoCommitOffsetsSync is used when ConsumerCoordinator is requested to close or onJoinPrepare

doAutoCommitOffsetsAsync Internal Method

void doAutoCommitOffsetsAsync()

doAutoCommitOffsetsAsync…​FIXME

Note
doAutoCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to maybeAutoCommitOffsetsAsync.

close Method

void close(long timeoutMs)

close…​FIXME

Note
close is used when…​FIXME

commitOffsetsAsync Method

void commitOffsetsAsync(
  final Map<TopicPartition, OffsetAndMetadata> offsets,
  final OffsetCommitCallback callback)

commitOffsetsAsync…​FIXME

Note

commitOffsetsAsync is used when:

commitOffsetsSync Method

boolean commitOffsetsSync(
  Map<TopicPartition,
  OffsetAndMetadata> offsets,
  long timeoutMs)

commitOffsetsSync…​FIXME

Note
commitOffsetsSync is used when…​FIXME

refreshCommittedOffsetsIfNeeded Method

void refreshCommittedOffsetsIfNeeded()

refreshCommittedOffsetsIfNeeded…​FIXME

Note
refreshCommittedOffsetsIfNeeded is used when…​FIXME

onJoinComplete Callback

void onJoinComplete(
  int generation,
  String memberId,
  String assignmentStrategy,
  ByteBuffer assignmentBuffer)
Note
onJoinComplete is part of AbstractCoordinator Contract to…​FIXME.

onJoinComplete…​FIXME

onJoinPrepare Method

void onJoinPrepare(int generation, String memberId)
Note
onJoinPrepare is part of AbstractCoordinator Contract to…​FIXME.

onJoinPrepare…​FIXME

Performing Partition Assignment (using PartitionAssignor) — performAssignment Method

Map<String, ByteBuffer> performAssignment(
  String leaderId,
  String assignmentStrategy,
  Map<String, ByteBuffer> allSubscriptions)
Note
performAssignment is part of AbstractCoordinator Contract to perform partition assignment (i.e. assign partitions to the members of a consumer group).

performAssignment tries to find the PartitionAssignor by the given assignmentStrategy.

performAssignment deserializes Subscriptions (from the given allSubscriptions) for every entry (and creates a Map<String, Subscription>).

performAssignment requests the SubscriptionState to add the topics to group subscription (i.e. add all the topics that the consumer group is subscribed to to receive topic metadata updates).

performAssignment requests the Metadata to setTopics to be the groupSubscription of the SubscriptionState.

performAssignment requests the ConsumerNetworkClient to update the cluster metadata.

performAssignment turns the isLeader internal flag on.

performAssignment prints out the following DEBUG message to the logs:

Performing assignment using strategy [assignor] with subscriptions [subscriptions]

performAssignment requests the PartitionAssignor to assign partitions to the members of the consumer group (with the current cluster metadata and subscriptions) and gets the partition assignment back (as Map<String, Assignment>).

Note
Here performAssignment does some customizations that a user-customized assignor could request. It is not a very interesting code to spend your time on.

performAssignment sets the assignmentSnapshot to be metadataSnapshot.

performAssignment prints out the following DEBUG message to the logs:

Finished assignment for group: [assignment]

In the end, performAssignment takes the partition assignment, encodes the Assignment per group member to create a groupAssignment (i.e. the Map<String, ByteBuffer> that is returned).

performAssignment throws an IllegalStateException when no PartitionAssignor could be found:

Coordinator selected invalid assignment protocol: [assignmentStrategy]

maybeLeaveGroup Method

void maybeLeaveGroup()

maybeLeaveGroup…​FIXME

Note
maybeLeaveGroup is used when…​FIXME

updatePatternSubscription Method

void updatePatternSubscription(Cluster cluster)

updatePatternSubscription…​FIXME

Note
updatePatternSubscription is used when…​FIXME

needRejoin Method

boolean needRejoin()
Note
needRejoin is part of the AbstractCoordinator Contract to…​FIXME.

needRejoin…​FIXME

timeToNextPoll Method

long timeToNextPoll(long now)

timeToNextPoll…​FIXME

Note
timeToNextPoll is used when…​FIXME

Polling for Group Coordinator Events — poll Method

boolean poll(Timer timer)

poll branches off per whether the SubscriptionState is partitionsAutoAssigned or not.

Caution
FIXME What does partitionsAutoAssigned mean exactly?

poll returns false if coordinatorUnknown and ensureCoordinatorReady failed (false).

poll…​FIXME

Note
poll is used exclusively when KafkaConsumer is requested to updateAssignmentMetadataIfNeeded.

Registering Metadata.Listener — addMetadataListener Internal Method

void addMetadataListener()

addMetadataListener requests the Metadata to add a new Metadata Update Listener that intercepts onMetadataUpdate events and does the following:

  • FIXME

addMetadataListener throws a TopicAuthorizationException for any unauthorized topics (i.e. when the given Cluster has at least one topic in unauthorizedTopics).

FIXME
Note
addMetadataListener is used exclusively when ConsumerCoordinator is created.

fetchCommittedOffsets Method

Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(
  Set<TopicPartition> partitions)

fetchCommittedOffsets…​FIXME

Note
fetchCommittedOffsets is used when…​FIXME

Creating ConsumerCoordinator Instance

ConsumerCoordinator takes the following when created:

ConsumerCoordinator initializes the internal registries and counters.

In the end, ConsumerCoordinator requests the Metadata to update and addMetadataListener.

rejoinNeededOrPending Method

boolean rejoinNeededOrPending()
Note
rejoinNeededOrPending is part of the AbstractCoordinator Contract to…​FIXME.

rejoinNeededOrPending…​FIXME

Sending OffsetCommitRequest to Group Coordinator (Kafka Broker) — sendOffsetCommitRequest Internal Method

RequestFuture<Void> sendOffsetCommitRequest(
  final Map<TopicPartition, OffsetAndMetadata> offsets)

sendOffsetCommitRequest gets the node of the consumer group.

sendOffsetCommitRequest creates a OffsetCommitRequest.Builder for the group ID and an offset data based on the given offsets.

sendOffsetCommitRequest prints out the following TRACE message to the logs:

Sending OffsetCommit request with [offsets] to coordinator [coordinator]

sendOffsetCommitRequest requests the ConsumerNetworkClient to send an OffsetCommitRequest to the group coordinator and then creates a new OffsetCommitResponseHandler to handle a response.

sendOffsetCommitRequest returns immediately when there is no offsets to send.

sendOffsetCommitRequest returns immediately with a COORDINATOR_NOT_AVAILABLE failure when the consumer group coordinator is not available.

Note
sendOffsetCommitRequest is used when ConsumerCoordinator is requested to doCommitOffsetsAsync and commitOffsetsSync.

sendOffsetFetchRequest Internal Method

RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(
  Set<TopicPartition> partitions)

sendOffsetFetchRequest…​FIXME

Note
sendOffsetFetchRequest is used when…​FIXME

invokeCompletedOffsetCommitCallbacks Internal Method

void invokeCompletedOffsetCommitCallbacks()

invokeCompletedOffsetCommitCallbacks takes (polls) every OffsetCommitCompletion from the completedOffsetCommits internal registry and requests it to invoke OffsetCommitCallback.onComplete.

Note
invokeCompletedOffsetCommitCallbacks is used when ConsumerCoordinator is requested to poll, close, commitOffsetsAsync, and commitOffsetsSync.

Finding PartitionAssignor by Name — lookupAssignor Internal Method

PartitionAssignor lookupAssignor(String name)

lookupAssignor tries to find the PartitionAssignor by the given name in the assignors.

If not found, lookupAssignor returns null.

Note
lookupAssignor is used when ConsumerCoordinator is requested to onJoinComplete and performAssignment.

doCommitOffsetsAsync Internal Method

void doCommitOffsetsAsync(
  final Map<TopicPartition, OffsetAndMetadata> offsets,
  final OffsetCommitCallback callback)

doCommitOffsetsAsync…​FIXME

Note
doCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to commitOffsetsAsync.

results matching ""

    No results matching ""