List<ProtocolMetadata> metadata()
AbstractCoordinator Contract
AbstractCoordinator
is the base of Coordinators that FIXME.
Note
|
AbstractCoordinator is a Java abstract class and cannot be created directly. It is created indirectly for the concrete Coordinators.
|
Note
|
ConsumerCoordinator is the one and only known implementation of the AbstractCoordinator Contract in Apache Kafka’s Consumer API. |
Method | Description |
---|---|
|
Used exclusively when |
|
Used exclusively when |
|
Used exclusively when |
|
Performs partition assignment (i.e. assigns partitions to the members of a consumer group) Used exclusively when |
|
Used exclusively when |
Name | Description |
---|---|
|
|
|
RequestFuture<Void> for sendFindCoordinatorRequest when lookupCoordinator Reset in clearFindCoordinatorFuture Used in lookupCoordinator and when |
|
|
|
Initialized and immediately started when |
|
RequestFuture<ByteBuffer> for sending a JoinGroupRequest to the group coordinator when initiateJoinGroup Reset in resetJoinGroupFuture Used in rejoinNeededOrPending and initiateJoinGroup |
|
Flag to control whether to execute onJoinPrepare while performing joinGroupIfNeeded |
|
Flag to control whether to rejoinNeededOrPending. Enabled ( Turned off ( Turned on ( |
|
|
|
Possible states:
Default: |
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating AbstractCoordinator Instance
AbstractCoordinator
takes the following when created:
Note
|
AbstractCoordinator is a Java abstract class and cannot be created directly. It is created indirectly for the concrete Coordinators.
|
joinGroupIfNeeded
Method
boolean joinGroupIfNeeded(
Timer timer)
joinGroupIfNeeded
…FIXME
Note
|
joinGroupIfNeeded is used exclusively when AbstractCoordinator is requested to ensureActiveGroup.
|
Attempting to Join Consumer Group — initiateJoinGroup
Internal Method
RequestFuture<ByteBuffer> initiateJoinGroup()
initiateJoinGroup
disables the HeartbeatThread.
initiateJoinGroup
sets the state as REBALANCING
.
initiateJoinGroup
sends a JoinGroupRequest to the group coordinator (and initializes the joinFuture).
initiateJoinGroup
registers a RequestFutureListener to handle a response.
When the response is successful, initiateJoinGroup
does the following:
-
Prints out the following INFO message to the logs:
Successfully joined group with generation [generationId]
-
Sets the state as
STABLE
(and the rejoinNeeded flag off) -
Requests the HeartbeatThread to enable itself.
When the response is a failure, initiateJoinGroup
simply sets the state as UNJOINED
initiateJoinGroup
exits immediately when the joinFuture has already been initialized (is not null
) and gives it back.
Note
|
initiateJoinGroup is used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded.
|
ensureActiveGroup
Method
void ensureActiveGroup() (1)
boolean ensureActiveGroup(final Timer timer)
-
Calls the other
ensureActiveGroup
in an infinitewhile
loop with an unexpiring timer
ensureActiveGroup
…FIXME
Note
|
ensureActiveGroup is used exclusively when ConsumerCoordinator is requested to poll for coordinator events.
|
Discovering Current Coordinator for Consumer Group — lookupCoordinator
Method
RequestFuture<Void> lookupCoordinator()
lookupCoordinator
uses the RequestFuture internal registry to know whether it was requested earlier but have not finished yet. In other words, the RequestFuture is available when looking up a coordinator of a consumer group is in progress.
If the RequestFuture internal registry is available (not a null
), lookupCoordinator
returns it immediately.
Otherwise, when the RequestFuture internal registry is uninitialized (null
), lookupCoordinator
requests the ConsumerNetworkClient for the least-loaded Kafka broker. lookupCoordinator
then sends a FindCoordinatorRequest to the Kafka broker.
lookupCoordinator
prints out the following DEBUG message to the logs when no nodes are available and finishes with a RequestFuture.noBrokersAvailable
failure.
No broker available to send FindCoordinator request
Note
|
|
ensureCoordinatorReady
Method
boolean ensureCoordinatorReady(
Timer timer)
ensureCoordinatorReady
…FIXME
Note
|
|
Sending JoinGroupRequest to Group Coordinator (Kafka Broker) — sendJoinGroupRequest
Method
RequestFuture<ByteBuffer> sendJoinGroupRequest()
sendJoinGroupRequest
prints out the following INFO message to the logs:
(Re-)joining group
sendJoinGroupRequest
creates a new JoinGroupRequest.Builder (for the groupId, the sessionTimeoutMs, the member ID, consumer protocol type, the supported ProtocolMetadata, and the rebalanceTimeoutMs).
sendJoinGroupRequest
prints out the following DEBUG message to the logs:
Sending JoinGroup ([requestBuilder]) to coordinator [coordinator]
In the end, sendJoinGroupRequest
requests the ConsumerNetworkClient to send the JoinGroupRequest
to the group coordinator node and then creates a new JoinGroupResponseHandler to handle a response.
sendJoinGroupRequest
returns immediately with a RequestFuture.coordinatorNotAvailable()
when the coordinator has not been discovered yet.
Note
|
sendJoinGroupRequest is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.
|
Sending SyncGroupRequest (to Kafka Broker) — sendSyncGroupRequest
Internal Method
RequestFuture<ByteBuffer> sendSyncGroupRequest(
SyncGroupRequest.Builder requestBuilder)
sendSyncGroupRequest
…FIXME
Note
|
sendSyncGroupRequest is used when…FIXME
|
Sending FindCoordinatorRequest (to Kafka Broker) — sendFindCoordinatorRequest
Internal Method
RequestFuture<Void> sendFindCoordinatorRequest(Node node)
sendFindCoordinatorRequest
prints out the following DEBUG message to the logs:
Sending FindCoordinator request to broker [node]
sendFindCoordinatorRequest
creates a new FindCoordinatorRequest with GROUP
type and the groupId.
In the end, sendFindCoordinatorRequest
requests the ConsumerNetworkClient to send the FindCoordinatorRequest
(to the given Kafka node) and then creates a new FindCoordinatorResponseHandler to handle a response.
Note
|
sendFindCoordinatorRequest is used exclusively when AbstractCoordinator is requested to lookupCoordinator.
|
maybeLeaveGroup
Method
void maybeLeaveGroup()
maybeLeaveGroup
…FIXME
Note
|
maybeLeaveGroup is used when…FIXME
|
Sending HeartbeatRequest to Group Coordinator (Kafka Broker) — sendHeartbeatRequest
Method
RequestFuture<Void> sendHeartbeatRequest()
sendHeartbeatRequest
prints out the following DEBUG message to the logs:
Sending Heartbeat request to coordinator [coordinator]
sendHeartbeatRequest
creates a new HeartbeatRequest.Builder (for the groupId, generation and member IDs).
In the end, sendHeartbeatRequest
requests the ConsumerNetworkClient to send the HeartbeatRequest
to the group coordinator node and then creates a new HeartbeatResponseHandler to handle a response.
Note
|
sendHeartbeatRequest is used exclusively when HeartbeatThread is requested to run.
|
Performing Partition Assignment and Notifying Group Coordinator — onJoinLeader
Internal Method
RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse)
onJoinLeader
performs partition assignment with the leader ID, the group protocol, and the member metadata (all from the given JoinGroupResponse).
onJoinLeader
creates a new SyncGroupRequest.Builder (with the group, generation, and member IDs as well as the group partition assignment).
onJoinLeader
prints out the following DEBUG message to the logs:
Sending leader SyncGroup to coordinator [coordinator]: [requestBuilder]
In the end, onJoinLeader
sends a SyncGroupRequest (to the group coordinator node).
Note
|
onJoinLeader is used exclusively when JoinGroupResponseHandler is requested to handle a successful response from the group coordinator (and the consumer is the leader of the consumer group).
|
resetJoinGroupFuture
Internal Method
void resetJoinGroupFuture()
resetJoinGroupFuture
simply resets the joinFuture internal registry (i.e. sets it to null
).
Note
|
resetJoinGroupFuture is used exclusively when AbstractCoordinator is requested to joinGroupIfNeeded.
|
Clearing FindCoordinatorFuture — clearFindCoordinatorFuture
Internal Method
void clearFindCoordinatorFuture()
clearFindCoordinatorFuture
simply resets the findCoordinatorFuture internal registry (to be null
).
rejoinNeededOrPending
Internal Method
boolean rejoinNeededOrPending()
rejoinNeededOrPending
is positive (true
) when rejoinNeeded and joinFuture is initialized (i.e. not null
).
Note
|
|
pollHeartbeat
Method
void pollHeartbeat(long now)
pollHeartbeat
…FIXME
Note
|
pollHeartbeat is used exclusively when ConsumerCoordinator is requested to poll for Coordinator events.
|
Starting HeartbeatThread — startHeartbeatThreadIfNeeded
Internal Method
void startHeartbeatThreadIfNeeded()
startHeartbeatThreadIfNeeded
…FIXME
Note
|
startHeartbeatThreadIfNeeded is used exclusively when AbstractCoordinator is requested to ensureActiveGroup.
|
needRejoin
Method
boolean needRejoin()
needRejoin
simply returns the rejoinNeeded flag.
Note
|
needRejoin is used when…FIXME
|
requestRejoin
Method
void requestRejoin()
requestRejoin
simply turns the rejoinNeeded flag on.
Note
|
|
resetGeneration
Method
void resetGeneration()
resetGeneration
simply resets the following internal registries:
-
generation becomes
NO_GENERATION
-
rejoinNeeded is turned on
-
state is
UNJOINED
Note
|
|
coordinatorUnknown
Method
boolean coordinatorUnknown()
coordinatorUnknown
…FIXME
Note
|
coordinatorUnknown is used when…FIXME
|
disableHeartbeatThread
Internal Method
void disableHeartbeatThread()
disableHeartbeatThread
simply requests the HeartbeatThread to disable (if available, i.e. not null
).
Note
|
disableHeartbeatThread is used exclusively when AbstractCoordinator is requested to initiateJoinGroup.
|