AbstractCoordinator Contract

AbstractCoordinator is the base of Coordinators that FIXME.

Note
AbstractCoordinator is a Java abstract class and cannot be created directly. It is created indirectly for the concrete Coordinators.
Note
ConsumerCoordinator is the one and only known implementation of the AbstractCoordinator Contract in Apache Kafka’s Consumer API.
Table 1. AbstractCoordinator Contract (Abstract Methods Only)
Method Description

metadata

List<ProtocolMetadata> metadata()

Used exclusively when AbstractCoordinator is requested to send a JoinGroupRequest to the group coordinator

onJoinComplete

void onJoinComplete(
  int generation,
  String memberId,
  String protocol,
  ByteBuffer memberAssignment)

Used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded (and the request to initiateJoinGroup succeeded)

onJoinPrepare

void onJoinPrepare(
  int generation,
  String memberId)

Used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded (and the needsJoinPrepare flag is on)

performAssignment

Map<String, ByteBuffer> performAssignment(
  String leaderId,
  String protocol,
  Map<String, ByteBuffer> allMemberMetadata)

Performs partition assignment (i.e. assigns partitions to the members of a consumer group)

Used exclusively when AbstractCoordinator is requested to perform partition assignment and notify the group coordinator

protocolType

String protocolType()

Used exclusively when AbstractCoordinator is requested to send a JoinGroupRequest to the group coordinator

Table 2. AbstractCoordinator’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

coordinator

Group Coordinator (as a Kafka Node)

Initialized when FindCoordinatorResponseHandler is requested to onSuccess

findCoordinatorFuture

RequestFuture<Void> for sendFindCoordinatorRequest when lookupCoordinator

Reset in clearFindCoordinatorFuture

Used in lookupCoordinator and when HeartbeatThread is requested to run

generation

Defaults to NO_GENERATION

heartbeatThread

Initialized and immediately started when AbstractCoordinator is requested to start a HeartbeatThread

joinFuture

RequestFuture<ByteBuffer> for sending a JoinGroupRequest to the group coordinator when initiateJoinGroup

Reset in resetJoinGroupFuture

Used in rejoinNeededOrPending and initiateJoinGroup

needsJoinPrepare

Flag to control whether to execute onJoinPrepare while performing joinGroupIfNeeded

rejoinNeeded

boolean rejoinNeeded

Flag to control whether to rejoinNeededOrPending. Enabled (true) by default.

Turned off (false) when the joinFuture completed successfully in initiateJoinGroup

Turned on (true) in resetGeneration and requestRejoin

sensors

state

State of the member of a consumer group

Possible states:

Default: UNJOINED

Tip

Enable WARN, INFO, or DEBUG logging level for org.apache.kafka.clients.consumer.internals.AbstractCoordinator logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.consumer.internals.AbstractCoordinator=DEBUG

Refer to Logging.

Creating AbstractCoordinator Instance

AbstractCoordinator takes the following when created:

  • LogContext

  • ConsumerNetworkClient

  • Group ID

  • rebalanceTimeoutMs

  • sessionTimeoutMs

  • heartbeatIntervalMs (or simply a Heartbeat instance)

  • Metrics

  • Metric group prefix

  • Time

  • retryBackoffMs

  • leaveGroupOnClose flag

Note
AbstractCoordinator is a Java abstract class and cannot be created directly. It is created indirectly for the concrete Coordinators.

joinGroupIfNeeded Method

boolean joinGroupIfNeeded(final Timer timer)

joinGroupIfNeeded…​FIXME

Note
joinGroupIfNeeded is used exclusively when AbstractCoordinator is requested to ensureActiveGroup.

Attempting to Join Consumer Group — initiateJoinGroup Internal Method

RequestFuture<ByteBuffer> initiateJoinGroup()

initiateJoinGroup disables the HeartbeatThread.

initiateJoinGroup sets the state as REBALANCING.

initiateJoinGroup sends a JoinGroupRequest to the group coordinator (and initializes the joinFuture).

initiateJoinGroup registers a RequestFutureListener to handle a response.

When the response is successful, initiateJoinGroup does the following:

  1. Prints out the following INFO message to the logs:

    Successfully joined group with generation [generationId]
  2. Sets the state as STABLE (and the rejoinNeeded flag off)

  3. Requests the HeartbeatThread to enable itself.

When the response is a failure, initiateJoinGroup simply sets the state as UNJOINED

initiateJoinGroup exits immediately when the joinFuture has already been initialized (is not null) and gives it back.

Note
initiateJoinGroup is used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded.

ensureActiveGroup Method

void ensureActiveGroup() (1)
boolean ensureActiveGroup(final Timer timer)
  1. Calls the other ensureActiveGroup in an infinite while loop with an unexpiring timer

ensureActiveGroup…​FIXME

Note
ensureActiveGroup is used exclusively when ConsumerCoordinator is requested to poll for coordinator events.

Discovering Current Coordinator for Consumer Group — lookupCoordinator Method

RequestFuture<Void> lookupCoordinator()

lookupCoordinator uses the RequestFuture internal registry to know whether it was requested earlier but have not finished yet. In other words, the RequestFuture is available when looking up a coordinator of a consumer group is in progress.

If the RequestFuture internal registry is available (not a null), lookupCoordinator returns it immediately.

Otherwise, when the RequestFuture internal registry is uninitialized (null), lookupCoordinator requests the ConsumerNetworkClient for the least-loaded Kafka broker. lookupCoordinator then sends a FindCoordinatorRequest to the Kafka broker.

lookupCoordinator prints out the following DEBUG message to the logs when no nodes are available and finishes with a RequestFuture.noBrokersAvailable failure.

No broker available to send FindCoordinator request
Note

lookupCoordinator is used when:

ensureCoordinatorReady Method

boolean ensureCoordinatorReady(final Timer timer)

ensureCoordinatorReady…​FIXME

Note

ensureCoordinatorReady is used when:

Sending JoinGroupRequest to Group Coordinator (Kafka Broker) — sendJoinGroupRequest Method

RequestFuture<ByteBuffer> sendJoinGroupRequest()

sendJoinGroupRequest prints out the following INFO message to the logs:

(Re-)joining group

sendJoinGroupRequest creates a new JoinGroupRequest.Builder (for the groupId, the sessionTimeoutMs, the member ID, consumer protocol type, the supported ProtocolMetadata, and the rebalanceTimeoutMs).

sendJoinGroupRequest prints out the following DEBUG message to the logs:

Sending JoinGroup ([requestBuilder]) to coordinator [coordinator]

In the end, sendJoinGroupRequest requests the ConsumerNetworkClient to send the JoinGroupRequest to the group coordinator node and then creates a new JoinGroupResponseHandler to handle a response.

sendJoinGroupRequest returns immediately with a RequestFuture.coordinatorNotAvailable() when the coordinator has not been discovered yet.

Note
sendJoinGroupRequest is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.

Sending SyncGroupRequest (to Kafka Broker) — sendSyncGroupRequest Internal Method

RequestFuture<ByteBuffer> sendSyncGroupRequest(
  SyncGroupRequest.Builder requestBuilder)

sendSyncGroupRequest…​FIXME

Note
sendSyncGroupRequest is used when…​FIXME

Sending FindCoordinatorRequest (to Kafka Broker) — sendFindCoordinatorRequest Internal Method

RequestFuture<Void> sendFindCoordinatorRequest(Node node)

sendFindCoordinatorRequest prints out the following DEBUG message to the logs:

Sending FindCoordinator request to broker [node]

sendFindCoordinatorRequest creates a new FindCoordinatorRequest with GROUP type and the groupId.

In the end, sendFindCoordinatorRequest requests the ConsumerNetworkClient to send the FindCoordinatorRequest (to the given Kafka node) and then creates a new FindCoordinatorResponseHandler to handle a response.

Note
sendFindCoordinatorRequest is used exclusively when AbstractCoordinator is requested to lookupCoordinator.

maybeLeaveGroup Method

void maybeLeaveGroup()

maybeLeaveGroup…​FIXME

Note
maybeLeaveGroup is used when…​FIXME

Sending HeartbeatRequest to Group Coordinator (Kafka Broker) — sendHeartbeatRequest Method

RequestFuture<Void> sendHeartbeatRequest()

sendHeartbeatRequest prints out the following DEBUG message to the logs:

Sending Heartbeat request to coordinator [coordinator]

sendHeartbeatRequest creates a new HeartbeatRequest.Builder (for the groupId, generation and member IDs).

In the end, sendHeartbeatRequest requests the ConsumerNetworkClient to send the HeartbeatRequest to the group coordinator node and then creates a new HeartbeatResponseHandler to handle a response.

Note
sendHeartbeatRequest is used exclusively when HeartbeatThread is requested to run.

Performing Partition Assignment and Notifying Group Coordinator — onJoinLeader Internal Method

RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse)

onJoinLeader performs partition assignment with the leader ID, the group protocol, and the member metadata (all from the given JoinGroupResponse).

onJoinLeader creates a new SyncGroupRequest.Builder (with the group, generation, and member IDs as well as the group partition assignment).

onJoinLeader prints out the following DEBUG message to the logs:

Sending leader SyncGroup to coordinator [coordinator]: [requestBuilder]

In the end, onJoinLeader sends a SyncGroupRequest (to the group coordinator node).

Note
onJoinLeader is used exclusively when JoinGroupResponseHandler is requested to handle a successful response from the group coordinator (and the consumer is the leader of the consumer group).

resetJoinGroupFuture Internal Method

void resetJoinGroupFuture()

resetJoinGroupFuture simply resets the joinFuture internal registry (i.e. sets it to null).

Note
resetJoinGroupFuture is used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded.

Clearing FindCoordinatorFuture — clearFindCoordinatorFuture Internal Method

void clearFindCoordinatorFuture()

clearFindCoordinatorFuture simply resets the findCoordinatorFuture internal registry (to be null).

Note
clearFindCoordinatorFuture is used exclusively when FindCoordinatorResponseHandler is requested to onSuccess and onFailure.

rejoinNeededOrPending Internal Method

boolean rejoinNeededOrPending()

rejoinNeededOrPending is positive (true) when rejoinNeeded and joinFuture is initialized (i.e. not null).

Note

rejoinNeededOrPending is used when:

pollHeartbeat Method

void pollHeartbeat(long now)

pollHeartbeat…​FIXME

Note
pollHeartbeat is used exclusively when ConsumerCoordinator is requested to poll for Coordinator events.

Starting HeartbeatThread — startHeartbeatThreadIfNeeded Internal Method

void startHeartbeatThreadIfNeeded()

startHeartbeatThreadIfNeeded…​FIXME

Note
startHeartbeatThreadIfNeeded is used exclusively when AbstractCoordinator is requested to ensureActiveGroup.

needRejoin Method

boolean needRejoin()

needRejoin simply returns the rejoinNeeded flag.

Note
needRejoin is used when…​FIXME

requestRejoin Method

void requestRejoin()

requestRejoin simply turns the rejoinNeeded flag on.

Note

requestRejoin is used when:

  • HeartbeatResponseHandler is requested to handle a response that the consumer group is rebalancing

  • SyncGroupResponseHandler is requested to handle a response with an error

resetGeneration Method

void resetGeneration()

resetGeneration simply resets the following internal registries:

Note

resetGeneration is used when:

coordinatorUnknown Method

boolean coordinatorUnknown()

coordinatorUnknown…​FIXME

Note
coordinatorUnknown is used when…​FIXME

disableHeartbeatThread Internal Method

void disableHeartbeatThread()

disableHeartbeatThread simply requests the HeartbeatThread to disable (if available, i.e. not null).

Note
disableHeartbeatThread is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.

checkAndGetCoordinator Method

Node checkAndGetCoordinator()

checkAndGetCoordinator…​FIXME

Note
checkAndGetCoordinator is used when…​FIXME

results matching ""

    No results matching ""