ConsumerCoordinator
ConsumerCoordinator
is a concrete AbstractCoordinator that…FIXME
KafkaConsumer
uses the following Consumer configuration properties to create a ConsumerCoordinator
.
Configuration Property | ConsumerCoordinator’s Property |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
With autoCommitEnabled enabled (i.e. enable.auto.commit is true
), ConsumerCoordinator
does:
ConsumerCoordinator
uses zero or more PartitionAssignors for the following:
-
Looking up the PartitionAssignor by name (when onJoinComplete and performAssignment)
Name | Description |
---|---|
|
Used in rejoinNeededOrPending
|
|
Java’s java.util.concurrent.ConcurrentLinkedQueue of
|
|
Default: Turned on in performAssignment and off in onJoinPrepare Used exclusively for onJoinComplete (to reset the assignmentSnapshot when off) |
|
Used in rejoinNeededOrPending and performAssignment |
Tip
|
Enable Add the following line to
Refer to Logging. |
Sending Asynchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsAsync
Method
void maybeAutoCommitOffsetsAsync(long now)
maybeAutoCommitOffsetsAsync
…FIXME
Note
|
|
Sending Synchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsSync
Internal Method
void maybeAutoCommitOffsetsSync(long timeoutMs)
maybeAutoCommitOffsetsSync
…FIXME
Note
|
maybeAutoCommitOffsetsSync is used when ConsumerCoordinator is requested to close or onJoinPrepare
|
doAutoCommitOffsetsAsync
Internal Method
void doAutoCommitOffsetsAsync()
doAutoCommitOffsetsAsync
…FIXME
Note
|
doAutoCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to maybeAutoCommitOffsetsAsync.
|
commitOffsetsAsync
Method
void commitOffsetsAsync(
final Map<TopicPartition, OffsetAndMetadata> offsets,
final OffsetCommitCallback callback)
commitOffsetsAsync
…FIXME
Note
|
|
commitOffsetsSync
Method
boolean commitOffsetsSync(
Map<TopicPartition,
OffsetAndMetadata> offsets,
long timeoutMs)
commitOffsetsSync
…FIXME
Note
|
commitOffsetsSync is used when…FIXME
|
refreshCommittedOffsetsIfNeeded
Method
void refreshCommittedOffsetsIfNeeded()
refreshCommittedOffsetsIfNeeded
…FIXME
Note
|
refreshCommittedOffsetsIfNeeded is used when…FIXME
|
onJoinComplete
Callback
void onJoinComplete(
int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer)
Note
|
onJoinComplete is part of AbstractCoordinator Contract to…FIXME.
|
onJoinComplete
…FIXME
onJoinPrepare
Method
void onJoinPrepare(int generation, String memberId)
Note
|
onJoinPrepare is part of AbstractCoordinator Contract to…FIXME.
|
onJoinPrepare
…FIXME
Performing Partition Assignment (using PartitionAssignor) — performAssignment
Method
Map<String, ByteBuffer> performAssignment(
String leaderId,
String assignmentStrategy,
Map<String, ByteBuffer> allSubscriptions)
Note
|
performAssignment is part of AbstractCoordinator Contract to perform partition assignment (i.e. assign partitions to the members of a consumer group).
|
performAssignment
tries to find the PartitionAssignor by the given assignmentStrategy
.
performAssignment
deserializes Subscriptions
(from the given allSubscriptions
) for every entry (and creates a Map<String, Subscription>
).
performAssignment
requests the SubscriptionState to add the topics to group subscription (i.e. add all the topics that the consumer group is subscribed to to receive topic metadata updates).
performAssignment
requests the Metadata to setTopics to be the groupSubscription of the SubscriptionState.
performAssignment
requests the ConsumerNetworkClient to update the cluster metadata.
performAssignment
turns the isLeader internal flag on.
performAssignment
prints out the following DEBUG message to the logs:
Performing assignment using strategy [assignor] with subscriptions [subscriptions]
performAssignment
requests the PartitionAssignor to assign partitions to the members of the consumer group (with the current cluster metadata and subscriptions) and gets the partition assignment back (as Map<String, Assignment>
).
Note
|
Here performAssignment does some customizations that a user-customized assignor could request. It is not a very interesting code to spend your time on.
|
performAssignment
sets the assignmentSnapshot to be metadataSnapshot.
performAssignment
prints out the following DEBUG message to the logs:
Finished assignment for group: [assignment]
In the end, performAssignment
takes the partition assignment, encodes the Assignment
per group member to create a groupAssignment
(i.e. the Map<String, ByteBuffer>
that is returned).
performAssignment
throws an IllegalStateException
when no PartitionAssignor could be found:
Coordinator selected invalid assignment protocol: [assignmentStrategy]
maybeLeaveGroup
Method
void maybeLeaveGroup()
maybeLeaveGroup
…FIXME
Note
|
maybeLeaveGroup is used when…FIXME
|
updatePatternSubscription
Method
void updatePatternSubscription(Cluster cluster)
updatePatternSubscription
…FIXME
Note
|
updatePatternSubscription is used when…FIXME
|
needRejoin
Method
boolean needRejoin()
Note
|
needRejoin is part of the AbstractCoordinator Contract to…FIXME.
|
needRejoin
…FIXME
timeToNextPoll
Method
long timeToNextPoll(long now)
timeToNextPoll
…FIXME
Note
|
timeToNextPoll is used when…FIXME
|
Polling for Group Coordinator Events — poll
Method
boolean poll(Timer timer)
poll
first invokeCompletedOffsetCommitCallbacks.
poll
branches off per whether the SubscriptionState is partitionsAutoAssigned or not.
Caution
|
FIXME What does partitionsAutoAssigned mean exactly?
|
In partitionsAutoAssigned, poll
pollHeartbeat.
poll
returns false
if coordinatorUnknown and ensureCoordinatorReady failed (false
).
poll
…FIXME
Note
|
poll is used exclusively when KafkaConsumer is requested to updateAssignmentMetadataIfNeeded.
|
Registering Metadata.Listener — addMetadataListener
Internal Method
void addMetadataListener()
addMetadataListener
requests the Metadata to add a new Metadata Update Listener that intercepts onMetadataUpdate events and does the following:
-
FIXME
addMetadataListener
throws a TopicAuthorizationException
for any unauthorized topics (i.e. when the given Cluster
has at least one topic in unauthorizedTopics).
FIXME
Note
|
addMetadataListener is used exclusively when ConsumerCoordinator is created.
|
fetchCommittedOffsets
Method
Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(
Set<TopicPartition> partitions)
fetchCommittedOffsets
…FIXME
Note
|
fetchCommittedOffsets is used when…FIXME
|
Creating ConsumerCoordinator Instance
ConsumerCoordinator
takes the following when created:
ConsumerCoordinator
initializes the internal registries and counters.
In the end, ConsumerCoordinator
requests the Metadata to update and addMetadataListener.
rejoinNeededOrPending
Method
boolean rejoinNeededOrPending()
Note
|
rejoinNeededOrPending is part of the AbstractCoordinator Contract to…FIXME.
|
rejoinNeededOrPending
…FIXME
Sending OffsetCommitRequest to Group Coordinator (Kafka Broker) — sendOffsetCommitRequest
Internal Method
RequestFuture<Void> sendOffsetCommitRequest(
final Map<TopicPartition, OffsetAndMetadata> offsets)
sendOffsetCommitRequest
gets the node of the consumer group.
sendOffsetCommitRequest
creates a OffsetCommitRequest.Builder for the group ID and an offset data based on the given offsets.
sendOffsetCommitRequest
prints out the following TRACE message to the logs:
Sending OffsetCommit request with [offsets] to coordinator [coordinator]
sendOffsetCommitRequest
requests the ConsumerNetworkClient to send an OffsetCommitRequest
to the group coordinator and then creates a new OffsetCommitResponseHandler to handle a response.
sendOffsetCommitRequest
returns immediately when there is no offsets to send.
sendOffsetCommitRequest
returns immediately with a COORDINATOR_NOT_AVAILABLE
failure when the consumer group coordinator is not available.
Note
|
sendOffsetCommitRequest is used when ConsumerCoordinator is requested to doCommitOffsetsAsync and commitOffsetsSync.
|
sendOffsetFetchRequest
Internal Method
RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(
Set<TopicPartition> partitions)
sendOffsetFetchRequest
…FIXME
Note
|
sendOffsetFetchRequest is used when…FIXME
|
invokeCompletedOffsetCommitCallbacks
Internal Method
void invokeCompletedOffsetCommitCallbacks()
invokeCompletedOffsetCommitCallbacks
takes (polls) every OffsetCommitCompletion
from the completedOffsetCommits internal registry and requests it to invoke OffsetCommitCallback.onComplete.
Note
|
invokeCompletedOffsetCommitCallbacks is used when ConsumerCoordinator is requested to poll, close, commitOffsetsAsync, and commitOffsetsSync.
|
Finding PartitionAssignor by Name — lookupAssignor
Internal Method
PartitionAssignor lookupAssignor(String name)
lookupAssignor
tries to find the PartitionAssignor by the given name
in the assignors.
If not found, lookupAssignor
returns null
.
Note
|
lookupAssignor is used when ConsumerCoordinator is requested to onJoinComplete and performAssignment.
|
doCommitOffsetsAsync
Internal Method
void doCommitOffsetsAsync(
final Map<TopicPartition, OffsetAndMetadata> offsets,
final OffsetCommitCallback callback)
doCommitOffsetsAsync
…FIXME
Note
|
doCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to commitOffsetsAsync.
|