ConsumerCoordinator
ConsumerCoordinator is a concrete AbstractCoordinator that…FIXME
KafkaConsumer uses the following Consumer configuration properties to create a ConsumerCoordinator.
| Configuration Property | ConsumerCoordinator’s Property |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
With autoCommitEnabled enabled (i.e. enable.auto.commit is true), ConsumerCoordinator does:
ConsumerCoordinator uses zero or more PartitionAssignors for the following:
-
Looking up the PartitionAssignor by name (when onJoinComplete and performAssignment)
| Name | Description |
|---|---|
|
Used in rejoinNeededOrPending
|
|
Java’s java.util.concurrent.ConcurrentLinkedQueue of
|
|
Default: Turned on in performAssignment and off in onJoinPrepare Used exclusively for onJoinComplete (to reset the assignmentSnapshot when off) |
|
Used in rejoinNeededOrPending and performAssignment |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Sending Asynchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsAsync Method
void maybeAutoCommitOffsetsAsync(long now)
maybeAutoCommitOffsetsAsync…FIXME
|
Note
|
|
Sending Synchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsSync Internal Method
void maybeAutoCommitOffsetsSync(long timeoutMs)
maybeAutoCommitOffsetsSync…FIXME
|
Note
|
maybeAutoCommitOffsetsSync is used when ConsumerCoordinator is requested to close or onJoinPrepare
|
doAutoCommitOffsetsAsync Internal Method
void doAutoCommitOffsetsAsync()
doAutoCommitOffsetsAsync…FIXME
|
Note
|
doAutoCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to maybeAutoCommitOffsetsAsync.
|
commitOffsetsAsync Method
void commitOffsetsAsync(
final Map<TopicPartition, OffsetAndMetadata> offsets,
final OffsetCommitCallback callback)
commitOffsetsAsync…FIXME
|
Note
|
|
commitOffsetsSync Method
boolean commitOffsetsSync(
Map<TopicPartition,
OffsetAndMetadata> offsets,
long timeoutMs)
commitOffsetsSync…FIXME
|
Note
|
commitOffsetsSync is used when…FIXME
|
refreshCommittedOffsetsIfNeeded Method
void refreshCommittedOffsetsIfNeeded()
refreshCommittedOffsetsIfNeeded…FIXME
|
Note
|
refreshCommittedOffsetsIfNeeded is used when…FIXME
|
onJoinComplete Callback
void onJoinComplete(
int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer)
|
Note
|
onJoinComplete is part of AbstractCoordinator Contract to…FIXME.
|
onJoinComplete…FIXME
onJoinPrepare Method
void onJoinPrepare(int generation, String memberId)
|
Note
|
onJoinPrepare is part of AbstractCoordinator Contract to…FIXME.
|
onJoinPrepare…FIXME
Performing Partition Assignment (using PartitionAssignor) — performAssignment Method
Map<String, ByteBuffer> performAssignment(
String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions)
|
Note
|
performAssignment is part of AbstractCoordinator Contract to perform partition assignment (i.e. assign partitions to the members of a consumer group).
|
performAssignment tries to find the PartitionAssignor by the given assignmentStrategy.
performAssignment deserializes Subscriptions (from the given allSubscriptions) for every entry (and creates a Map<String, Subscription>).
performAssignment requests the SubscriptionState to add the topics to group subscription (i.e. add all the topics that the consumer group is subscribed to to receive topic metadata updates).
performAssignment requests the Metadata to setTopics to be the groupSubscription of the SubscriptionState.
performAssignment requests the ConsumerNetworkClient to update the cluster metadata.
performAssignment turns the isLeader internal flag on.
performAssignment prints out the following DEBUG message to the logs:
Performing assignment using strategy [assignor] with subscriptions [subscriptions]
performAssignment requests the PartitionAssignor to assign partitions to the members of the consumer group (with the current cluster metadata and subscriptions) and gets the partition assignment back (as Map<String, Assignment>).
|
Note
|
Here performAssignment does some customizations that a user-customized assignor could request. It is not a very interesting code to spend your time on.
|
performAssignment sets the assignmentSnapshot to be metadataSnapshot.
performAssignment prints out the following DEBUG message to the logs:
Finished assignment for group: [assignment]
In the end, performAssignment takes the partition assignment, encodes the Assignment per group member to create a groupAssignment (i.e. the Map<String, ByteBuffer> that is returned).
performAssignment throws an IllegalStateException when no PartitionAssignor could be found:
Coordinator selected invalid assignment protocol: [assignmentStrategy]
maybeLeaveGroup Method
void maybeLeaveGroup()
maybeLeaveGroup…FIXME
|
Note
|
maybeLeaveGroup is used when…FIXME
|
updatePatternSubscription Method
void updatePatternSubscription(Cluster cluster)
updatePatternSubscription…FIXME
|
Note
|
updatePatternSubscription is used when…FIXME
|
needRejoin Method
boolean needRejoin()
|
Note
|
needRejoin is part of the AbstractCoordinator Contract to…FIXME.
|
needRejoin…FIXME
timeToNextPoll Method
long timeToNextPoll(long now)
timeToNextPoll…FIXME
|
Note
|
timeToNextPoll is used when…FIXME
|
Polling for Group Coordinator Events — poll Method
boolean poll(Timer timer)
poll first invokeCompletedOffsetCommitCallbacks.
poll branches off per whether the SubscriptionState is partitionsAutoAssigned or not.
|
Caution
|
FIXME What does partitionsAutoAssigned mean exactly?
|
In partitionsAutoAssigned, poll pollHeartbeat.
poll returns false if coordinatorUnknown and ensureCoordinatorReady failed (false).
poll…FIXME
|
Note
|
poll is used exclusively when KafkaConsumer is requested to updateAssignmentMetadataIfNeeded.
|
Registering Metadata.Listener — addMetadataListener Internal Method
void addMetadataListener()
addMetadataListener requests the Metadata to add a new Metadata Update Listener that intercepts onMetadataUpdate events and does the following:
-
FIXME
addMetadataListener throws a TopicAuthorizationException for any unauthorized topics (i.e. when the given Cluster has at least one topic in unauthorizedTopics).
FIXME
|
Note
|
addMetadataListener is used exclusively when ConsumerCoordinator is created.
|
fetchCommittedOffsets Method
Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(
Set<TopicPartition> partitions)
fetchCommittedOffsets…FIXME
|
Note
|
fetchCommittedOffsets is used when…FIXME
|
Creating ConsumerCoordinator Instance
ConsumerCoordinator takes the following when created:
ConsumerCoordinator initializes the internal registries and counters.
In the end, ConsumerCoordinator requests the Metadata to update and addMetadataListener.
rejoinNeededOrPending Method
boolean rejoinNeededOrPending()
|
Note
|
rejoinNeededOrPending is part of the AbstractCoordinator Contract to…FIXME.
|
rejoinNeededOrPending…FIXME
Sending OffsetCommitRequest to Group Coordinator (Kafka Broker) — sendOffsetCommitRequest Internal Method
RequestFuture<Void> sendOffsetCommitRequest(
final Map<TopicPartition, OffsetAndMetadata> offsets)
sendOffsetCommitRequest gets the node of the consumer group.
sendOffsetCommitRequest creates a OffsetCommitRequest.Builder for the group ID and an offset data based on the given offsets.
sendOffsetCommitRequest prints out the following TRACE message to the logs:
Sending OffsetCommit request with [offsets] to coordinator [coordinator]
sendOffsetCommitRequest requests the ConsumerNetworkClient to send an OffsetCommitRequest to the group coordinator and then creates a new OffsetCommitResponseHandler to handle a response.
sendOffsetCommitRequest returns immediately when there is no offsets to send.
sendOffsetCommitRequest returns immediately with a COORDINATOR_NOT_AVAILABLE failure when the consumer group coordinator is not available.
|
Note
|
sendOffsetCommitRequest is used when ConsumerCoordinator is requested to doCommitOffsetsAsync and commitOffsetsSync.
|
sendOffsetFetchRequest Internal Method
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(
Set<TopicPartition> partitions)
sendOffsetFetchRequest…FIXME
|
Note
|
sendOffsetFetchRequest is used when…FIXME
|
invokeCompletedOffsetCommitCallbacks Internal Method
void invokeCompletedOffsetCommitCallbacks()
invokeCompletedOffsetCommitCallbacks takes (polls) every OffsetCommitCompletion from the completedOffsetCommits internal registry and requests it to invoke OffsetCommitCallback.onComplete.
|
Note
|
invokeCompletedOffsetCommitCallbacks is used when ConsumerCoordinator is requested to poll, close, commitOffsetsAsync, and commitOffsetsSync.
|
Finding PartitionAssignor by Name — lookupAssignor Internal Method
PartitionAssignor lookupAssignor(String name)
lookupAssignor tries to find the PartitionAssignor by the given name in the assignors.
If not found, lookupAssignor returns null.
|
Note
|
lookupAssignor is used when ConsumerCoordinator is requested to onJoinComplete and performAssignment.
|
doCommitOffsetsAsync Internal Method
void doCommitOffsetsAsync(
final Map<TopicPartition, OffsetAndMetadata> offsets,
final OffsetCommitCallback callback)
doCommitOffsetsAsync…FIXME
|
Note
|
doCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to commitOffsetsAsync.
|