ConsumerCoordinator

ConsumerCoordinator is a concrete AbstractCoordinator that…​FIXME

ConsumerCoordinator is created exclusively when KafkaConsumer is created.

ConsumerCoordinator.png
Figure 1. ConsumerCoordinator and KafkaConsumer

KafkaConsumer uses the following Consumer configuration properties to create a ConsumerCoordinator.

Table 1. ConsumerCoordinator’s Consumer Configuration Properties
Configuration Property ConsumerCoordinator’s Property

max.poll.interval.ms

rebalanceTimeoutMs

session.timeout.ms

sessionTimeoutMs

heartbeat.interval.ms

heartbeatIntervalMs

retry.backoff.ms

retryBackoffMs

enable.auto.commit

autoCommitEnabled

auto.commit.interval.ms

autoCommitIntervalMs

exclude.internal.topics

excludeInternalTopics

internal.leave.group.on.close

leaveGroupOnClose

With autoCommitEnabled enabled (i.e. enable.auto.commit is true), ConsumerCoordinator does:

ConsumerCoordinator uses zero or more PartitionAssignors for the following:

Table 2. ConsumerCoordinator’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

assignmentSnapshot

MetadataSnapshot

completedOffsetCommits

Java’s java.util.concurrent.ConcurrentLinkedQueue of OffsetCommitCompletions with user-defined OffsetCommitCallbacks

OffsetCommitCallbacks are executed in invokeCompletedOffsetCommitCallbacks

OffsetCommitCompletion are added in commitOffsetsAsync (and also directly in doCommitOffsetsAsync)

isLeader

Flag of whether the ConsumerCoordinator is a leader (true) or not (false). Defaults to false

Turned on in performAssignment and off in onJoinPrepare

Used exclusively for onJoinComplete (to reset the assignmentSnapshot when off)

metadataSnapshot

MetadataSnapshot metadataSnapshot

MetadataSnapshot (for the SubscriptionState and the cluster metadata from the Metadata)

Sending Asynchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsAsync Method

void maybeAutoCommitOffsetsAsync(long now)

maybeAutoCommitOffsetsAsync…​FIXME

Note

maybeAutoCommitOffsetsAsync is used when:

Sending Synchronous Auto-Commit of Offsets — maybeAutoCommitOffsetsSync Internal Method

void maybeAutoCommitOffsetsSync(long timeoutMs)

maybeAutoCommitOffsetsSync…​FIXME

Note
maybeAutoCommitOffsetsSync is used when ConsumerCoordinator is requested to close or onJoinPrepare

doAutoCommitOffsetsAsync Internal Method

void doAutoCommitOffsetsAsync()

doAutoCommitOffsetsAsync…​FIXME

Note
doAutoCommitOffsetsAsync is used exclusively when ConsumerCoordinator is requested to maybeAutoCommitOffsetsAsync.

close Method

void close(long timeoutMs)

close…​FIXME

Note
close is used when…​FIXME

commitOffsetsAsync Method

void commitOffsetsAsync(
  final Map<TopicPartition, OffsetAndMetadata> offsets,
  final OffsetCommitCallback callback)

commitOffsetsAsync…​FIXME

Note

commitOffsetsAsync is used when:

commitOffsetsSync Method

boolean commitOffsetsSync(
  Map<TopicPartition,
  OffsetAndMetadata> offsets,
  long timeoutMs)

commitOffsetsSync…​FIXME

Note
commitOffsetsSync is used when…​FIXME

refreshCommittedOffsetsIfNeeded Method

void refreshCommittedOffsetsIfNeeded()

refreshCommittedOffsetsIfNeeded…​FIXME

Note
refreshCommittedOffsetsIfNeeded is used when…​FIXME

onJoinComplete Callback

void onJoinComplete(
  int generation,
  String memberId,
  String assignmentStrategy,
  ByteBuffer assignmentBuffer)
Note
onJoinComplete is part of AbstractCoordinator Contract to…​FIXME.

onJoinComplete…​FIXME

onJoinPrepare Method

void onJoinPrepare(int generation, String memberId)
Note
onJoinPrepare is part of AbstractCoordinator Contract to…​FIXME.

onJoinPrepare…​FIXME

performAssignment Method

Map<String, ByteBuffer> performAssignment(
  String leaderId,
  String assignmentStrategy,
  Map<String, ByteBuffer> allSubscriptions)
Note
performAssignment is part of AbstractCoordinator Contract to…​FIXME.

performAssignment…​FIXME

maybeLeaveGroup Method

void maybeLeaveGroup()

maybeLeaveGroup…​FIXME

Note
maybeLeaveGroup is used when…​FIXME

updatePatternSubscription Method

void updatePatternSubscription(Cluster cluster)

updatePatternSubscription…​FIXME

Note
updatePatternSubscription is used when…​FIXME

needRejoin Method

boolean needRejoin()
Note
needRejoin is part of the AbstractCoordinator Contract to…​FIXME.

needRejoin…​FIXME

timeToNextPoll Method

long timeToNextPoll(long now)

timeToNextPoll…​FIXME

Note
timeToNextPoll is used when…​FIXME

Polling for Group Coordinator Events — poll Method

boolean poll(Timer timer)

poll branches off per whether the SubscriptionState is partitionsAutoAssigned or not.

Caution
FIXME What does partitionsAutoAssigned mean exactly?

poll returns false if coordinatorUnknown and ensureCoordinatorReady failed (false).

poll…​FIXME

Note
poll is used exclusively when KafkaConsumer is requested to updateAssignmentMetadataIfNeeded.

Registering Metadata.Listener — addMetadataListener Internal Method

void addMetadataListener()

addMetadataListener requests the Metadata to add a new Metadata Update Listener that intercepts onMetadataUpdate events and does the following:

  • FIXME

addMetadataListener throws a TopicAuthorizationException for any unauthorized topics (i.e. when the given Cluster has at least one topic in unauthorizedTopics).

FIXME
Note
addMetadataListener is used exclusively when ConsumerCoordinator is created.

fetchCommittedOffsets Method

Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions)

fetchCommittedOffsets…​FIXME

Note
fetchCommittedOffsets is used when…​FIXME

Creating ConsumerCoordinator Instance

ConsumerCoordinator takes the following when created:

ConsumerCoordinator initializes the internal registries and counters.

In the end, ConsumerCoordinator requests the Metadata to update and addMetadataListener.

rejoinNeededOrPending Method

boolean rejoinNeededOrPending()
Note
rejoinNeededOrPending is part of the AbstractCoordinator Contract to…​FIXME.

rejoinNeededOrPending…​FIXME

sendOffsetCommitRequest Internal Method

RequestFuture<Void> sendOffsetCommitRequest(
  final Map<TopicPartition, OffsetAndMetadata> offsets)

sendOffsetCommitRequest…​FIXME

Note
sendOffsetCommitRequest is used when…​FIXME

sendOffsetFetchRequest Internal Method

RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(
  Set<TopicPartition> partitions)

sendOffsetFetchRequest…​FIXME

Note
sendOffsetFetchRequest is used when…​FIXME

invokeCompletedOffsetCommitCallbacks Internal Method

void invokeCompletedOffsetCommitCallbacks()

invokeCompletedOffsetCommitCallbacks takes (polls) every OffsetCommitCompletion from the completedOffsetCommits internal registry and requests it to invoke OffsetCommitCallback.onComplete.

Note
invokeCompletedOffsetCommitCallbacks is used when ConsumerCoordinator is requested to poll, close, commitOffsetsAsync, and commitOffsetsSync.

results matching ""

    No results matching ""