List<ProtocolMetadata> metadata()
AbstractCoordinator Contract
AbstractCoordinator is the base of Coordinators that FIXME.
|
Note
|
AbstractCoordinator is a Java abstract class and cannot be created directly. It is created indirectly for the concrete Coordinators.
|
|
Note
|
ConsumerCoordinator is the one and only known implementation of the AbstractCoordinator Contract in Apache Kafka’s Consumer API. |
| Method | Description |
|---|---|
|
Used exclusively when |
|
Used exclusively when |
|
Used exclusively when |
|
Performs partition assignment (i.e. assigns partitions to the members of a consumer group) Used exclusively when |
|
Used exclusively when |
| Name | Description |
|---|---|
|
|
|
RequestFuture<Void> for sendFindCoordinatorRequest when lookupCoordinator Reset in clearFindCoordinatorFuture Used in lookupCoordinator and when |
|
|
|
Initialized and immediately started when |
|
RequestFuture<ByteBuffer> for sending a JoinGroupRequest to the group coordinator when initiateJoinGroup Reset in resetJoinGroupFuture Used in rejoinNeededOrPending and initiateJoinGroup |
|
Flag to control whether to execute onJoinPrepare while performing joinGroupIfNeeded |
|
Flag to control whether to rejoinNeededOrPending. Enabled ( Turned off ( Turned on ( |
|
|
|
Possible states:
Default: |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating AbstractCoordinator Instance
AbstractCoordinator takes the following when created:
|
Note
|
AbstractCoordinator is a Java abstract class and cannot be created directly. It is created indirectly for the concrete Coordinators.
|
joinGroupIfNeeded Method
boolean joinGroupIfNeeded(
Timer timer)
joinGroupIfNeeded…FIXME
|
Note
|
joinGroupIfNeeded is used exclusively when AbstractCoordinator is requested to ensureActiveGroup.
|
Attempting to Join Consumer Group — initiateJoinGroup Internal Method
RequestFuture<ByteBuffer> initiateJoinGroup()
initiateJoinGroup disables the HeartbeatThread.
initiateJoinGroup sets the state as REBALANCING.
initiateJoinGroup sends a JoinGroupRequest to the group coordinator (and initializes the joinFuture).
initiateJoinGroup registers a RequestFutureListener to handle a response.
When the response is successful, initiateJoinGroup does the following:
-
Prints out the following INFO message to the logs:
Successfully joined group with generation [generationId] -
Sets the state as
STABLE(and the rejoinNeeded flag off) -
Requests the HeartbeatThread to enable itself.
When the response is a failure, initiateJoinGroup simply sets the state as UNJOINED
initiateJoinGroup exits immediately when the joinFuture has already been initialized (is not null) and gives it back.
|
Note
|
initiateJoinGroup is used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded.
|
ensureActiveGroup Method
void ensureActiveGroup() (1)
boolean ensureActiveGroup(final Timer timer)
-
Calls the other
ensureActiveGroupin an infinitewhileloop with an unexpiring timer
ensureActiveGroup…FIXME
|
Note
|
ensureActiveGroup is used exclusively when ConsumerCoordinator is requested to poll for coordinator events.
|
Discovering Current Coordinator for Consumer Group — lookupCoordinator Method
RequestFuture<Void> lookupCoordinator()
lookupCoordinator uses the RequestFuture internal registry to know whether it was requested earlier but have not finished yet. In other words, the RequestFuture is available when looking up a coordinator of a consumer group is in progress.
If the RequestFuture internal registry is available (not a null), lookupCoordinator returns it immediately.
Otherwise, when the RequestFuture internal registry is uninitialized (null), lookupCoordinator requests the ConsumerNetworkClient for the least-loaded Kafka broker. lookupCoordinator then sends a FindCoordinatorRequest to the Kafka broker.
lookupCoordinator prints out the following DEBUG message to the logs when no nodes are available and finishes with a RequestFuture.noBrokersAvailable failure.
No broker available to send FindCoordinator request
|
Note
|
|
ensureCoordinatorReady Method
boolean ensureCoordinatorReady(
Timer timer)
ensureCoordinatorReady…FIXME
|
Note
|
|
Sending JoinGroupRequest to Group Coordinator (Kafka Broker) — sendJoinGroupRequest Method
RequestFuture<ByteBuffer> sendJoinGroupRequest()
sendJoinGroupRequest prints out the following INFO message to the logs:
(Re-)joining group
sendJoinGroupRequest creates a new JoinGroupRequest.Builder (for the groupId, the sessionTimeoutMs, the member ID, consumer protocol type, the supported ProtocolMetadata, and the rebalanceTimeoutMs).
sendJoinGroupRequest prints out the following DEBUG message to the logs:
Sending JoinGroup ([requestBuilder]) to coordinator [coordinator]
In the end, sendJoinGroupRequest requests the ConsumerNetworkClient to send the JoinGroupRequest to the group coordinator node and then creates a new JoinGroupResponseHandler to handle a response.
sendJoinGroupRequest returns immediately with a RequestFuture.coordinatorNotAvailable() when the coordinator has not been discovered yet.
|
Note
|
sendJoinGroupRequest is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.
|
Sending SyncGroupRequest (to Kafka Broker) — sendSyncGroupRequest Internal Method
RequestFuture<ByteBuffer> sendSyncGroupRequest(
SyncGroupRequest.Builder requestBuilder)
sendSyncGroupRequest…FIXME
|
Note
|
sendSyncGroupRequest is used when…FIXME
|
Sending FindCoordinatorRequest (to Kafka Broker) — sendFindCoordinatorRequest Internal Method
RequestFuture<Void> sendFindCoordinatorRequest(Node node)
sendFindCoordinatorRequest prints out the following DEBUG message to the logs:
Sending FindCoordinator request to broker [node]
sendFindCoordinatorRequest creates a new FindCoordinatorRequest with GROUP type and the groupId.
In the end, sendFindCoordinatorRequest requests the ConsumerNetworkClient to send the FindCoordinatorRequest (to the given Kafka node) and then creates a new FindCoordinatorResponseHandler to handle a response.
|
Note
|
sendFindCoordinatorRequest is used exclusively when AbstractCoordinator is requested to lookupCoordinator.
|
maybeLeaveGroup Method
void maybeLeaveGroup()
maybeLeaveGroup…FIXME
|
Note
|
maybeLeaveGroup is used when…FIXME
|
Sending HeartbeatRequest to Group Coordinator (Kafka Broker) — sendHeartbeatRequest Method
RequestFuture<Void> sendHeartbeatRequest()
sendHeartbeatRequest prints out the following DEBUG message to the logs:
Sending Heartbeat request to coordinator [coordinator]
sendHeartbeatRequest creates a new HeartbeatRequest.Builder (for the groupId, generation and member IDs).
In the end, sendHeartbeatRequest requests the ConsumerNetworkClient to send the HeartbeatRequest to the group coordinator node and then creates a new HeartbeatResponseHandler to handle a response.
|
Note
|
sendHeartbeatRequest is used exclusively when HeartbeatThread is requested to run.
|
Performing Partition Assignment and Notifying Group Coordinator — onJoinLeader Internal Method
RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse)
onJoinLeader performs partition assignment with the leader ID, the group protocol, and the member metadata (all from the given JoinGroupResponse).
onJoinLeader creates a new SyncGroupRequest.Builder (with the group, generation, and member IDs as well as the group partition assignment).
onJoinLeader prints out the following DEBUG message to the logs:
Sending leader SyncGroup to coordinator [coordinator]: [requestBuilder]
In the end, onJoinLeader sends a SyncGroupRequest (to the group coordinator node).
|
Note
|
onJoinLeader is used exclusively when JoinGroupResponseHandler is requested to handle a successful response from the group coordinator (and the consumer is the leader of the consumer group).
|
resetJoinGroupFuture Internal Method
void resetJoinGroupFuture()
resetJoinGroupFuture simply resets the joinFuture internal registry (i.e. sets it to null).
|
Note
|
resetJoinGroupFuture is used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded.
|
Clearing FindCoordinatorFuture — clearFindCoordinatorFuture Internal Method
void clearFindCoordinatorFuture()
clearFindCoordinatorFuture simply resets the findCoordinatorFuture internal registry (to be null).
rejoinNeededOrPending Internal Method
boolean rejoinNeededOrPending()
rejoinNeededOrPending is positive (true) when rejoinNeeded and joinFuture is initialized (i.e. not null).
|
Note
|
|
pollHeartbeat Method
void pollHeartbeat(long now)
pollHeartbeat…FIXME
|
Note
|
pollHeartbeat is used exclusively when ConsumerCoordinator is requested to poll for Coordinator events.
|
Starting HeartbeatThread — startHeartbeatThreadIfNeeded Internal Method
void startHeartbeatThreadIfNeeded()
startHeartbeatThreadIfNeeded…FIXME
|
Note
|
startHeartbeatThreadIfNeeded is used exclusively when AbstractCoordinator is requested to ensureActiveGroup.
|
needRejoin Method
boolean needRejoin()
needRejoin simply returns the rejoinNeeded flag.
|
Note
|
needRejoin is used when…FIXME
|
requestRejoin Method
void requestRejoin()
requestRejoin simply turns the rejoinNeeded flag on.
|
Note
|
|
resetGeneration Method
void resetGeneration()
resetGeneration simply resets the following internal registries:
-
generation becomes
NO_GENERATION -
rejoinNeeded is turned on
-
state is
UNJOINED
|
Note
|
|
coordinatorUnknown Method
boolean coordinatorUnknown()
coordinatorUnknown…FIXME
|
Note
|
coordinatorUnknown is used when…FIXME
|
disableHeartbeatThread Internal Method
void disableHeartbeatThread()
disableHeartbeatThread simply requests the HeartbeatThread to disable (if available, i.e. not null).
|
Note
|
disableHeartbeatThread is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.
|