GroupCoordinator
GroupCoordinator
is used (in KafkaApis) for handleGroupImmigration, handleGroupEmigration, handleDeletedPartitions, handleCommitOffsets, handleFetchOffsets, handleDescribeGroup, handleListGroups, handleJoinGroup, handleSyncGroup, handleDeleteGroups, handleLeaveGroup, handleHeartbeat, handleTxnCommitOffsets and scheduleHandleTxnCompletion (that all simply request the GroupMetadataManager to handle them).
GroupCoordinator
is also used for partitionFor.
GroupCoordinator
is created and immediately started when KafkaServer
is requested to start up for the only purpose of creating the KafkaApis.
GroupCoordinator
manages the GroupMetadataManager. GroupCoordinator
uses isActive
flag to control whether the GroupMetadataManager was requested to start up (when GroupCoordinator
was) that is used in handleListGroups and validateGroupStatus.
GroupCoordinator
uses [GroupCoordinator [brokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating GroupCoordinator Instance — apply
Factory Method
apply(
config: KafkaConfig,
zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
time: Time): GroupCoordinator (1)
apply(
config: KafkaConfig,
zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
time: Time): GroupCoordinator
-
Creates a
DelayedOperationPurgatory
and aDelayedOperationPurgatory
apply
…FIXME
Note
|
apply is used exclusively when KafkaServer is requested to start up.
|
Creating GroupCoordinator Instance
GroupCoordinator
takes the following to be created:
GroupCoordinator
initializes the internal registries and counters.
Starting Up — startup
Method
startup(enableMetadataExpiration: Boolean = true): Unit
startup
first prints out the following INFO message to the logs:
Starting up.
startup
requests the GroupMetadataManager to startup (with the given enableMetadataExpiration
flag).
startup
turns the isActive internal flag on.
In the end, startup
prints out the following INFO message to the logs:
Startup complete.
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
partitionFor
Method
partitionFor(group: String): Int
partitionFor
simply requests the GroupMetadataManager to partitionFor the given group
ID.
Note
|
|
handleCommitOffsets
Method
handleCommitOffsets(
groupId: String,
memberId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit
handleCommitOffsets
firstly validateGroupStatus (for the given groupId
and OFFSET_COMMIT
API key).
handleCommitOffsets
requests the GroupMetadataManager to get the metadata of the group (by the given groupId
) and then doCommitOffsets.
If the GroupMetadataManager could not getGroup, handleCommitOffsets
…FIXME
In case of an error while validateGroupStatus, handleCommitOffsets
…FIXME
Note
|
handleCommitOffsets is used exclusively when KafkaApis is requested to handle an OffsetCommitRequest.
|
doCommitOffsets
Internal Method
doCommitOffsets(
group: GroupMetadata,
memberId: String,
generationId: Int,
producerId: Long,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit
doCommitOffsets
…FIXME
Note
|
doCommitOffsets is used when GroupCoordinator is requested to handleTxnCommitOffsets and handleCommitOffsets.
|
onCompleteJoin
Method
onCompleteJoin(group: GroupMetadata): Unit
onCompleteJoin
…FIXME
Note
|
onCompleteJoin is used exclusively when DelayedJoin delayed operation is requested to onComplete .
|
doSyncGroup
Internal Method
doSyncGroup(
group: GroupMetadata,
generationId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit
doSyncGroup
…FIXME
Note
|
doSyncGroup is used when…FIXME
|
handleDescribeGroup
Method
handleDescribeGroup(groupId: String): (Errors, GroupSummary)
handleDescribeGroup
…FIXME
Note
|
handleDescribeGroup is used exclusively when KafkaApis is requested to handleDescribeGroupRequest.
|
handleGroupImmigration
Method
handleGroupImmigration(offsetTopicPartitionId: Int): Unit
handleGroupImmigration
simply requests the GroupMetadataManager to scheduleLoadGroupAndOffsets (for the given offset and with the onGroupLoaded callback)
Note
|
handleGroupImmigration is used exclusively when KafkaApis is requested to handle a LeaderAndIsrRequest.
|
handleGroupEmigration
Method
handleGroupEmigration(offsetTopicPartitionId: Int): Unit
handleGroupEmigration
simply requests the GroupMetadataManager to removeGroupsForPartition (for the given offset and with the onGroupUnloaded callback).
Note
|
handleGroupEmigration is used when KafkaApis is requested to handle a LeaderAndIsrRequest and a StopReplicaRequest.
|
Handling Deleted Partitions — handleDeletedPartitions
Method
handleDeletedPartitions(
topicPartitions: Seq[TopicPartition]): Unit
handleDeletedPartitions
simply requests the GroupMetadataManager to cleanupGroupMetadata and…FIXME
Note
|
handleDeletedPartitions is used when…FIXME
|
handleFetchOffsets
Method
handleFetchOffsets(
groupId: String,
partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData])
handleFetchOffsets
…FIXME
Note
|
handleFetchOffsets is used when…FIXME
|
handleListGroups
Method
handleListGroups(): (Errors, List[GroupOverview])
handleListGroups
…FIXME
Note
|
handleListGroups is used when…FIXME
|
handleJoinGroup
Method
handleJoinGroup(
groupId: String,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit
handleJoinGroup
starts by validating the status of the group and the coordinator itself. In case of an error, handleJoinGroup
uses the given JoinCallback
to report it back and returns.
handleJoinGroup
validates the group configuration, namely the given sessionTimeoutMs
. In case of an error, handleJoinGroup
uses the given JoinCallback
to report a INVALID_SESSION_TIMEOUT
error back and returns.
handleJoinGroup
requests the GroupMetadataManager to getGroup by the given groupId
.
If the group could not be found and the given memberId
is defined (i.e. not empty), handleJoinGroup
uses the given JoinCallback
to report a UNKNOWN_MEMBER_ID
error back and returns.
If the group could not be found and the given memberId
is undefined (i.e. empty) or simply the group is available, handleJoinGroup
requests the GroupMetadataManager to addGroup followed by doJoinGroup.
Note
|
handleJoinGroup is used exclusively when KafkaApis is requested to handle a JoinGroupRequest.
|
handleSyncGroup
Method
handleSyncGroup(
groupId: String,
generation: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit
handleSyncGroup
…FIXME
Note
|
handleSyncGroup is used when…FIXME
|
handleDeleteGroups
Method
handleDeleteGroups(groupIds: Set[String]): Map[String, Errors]
handleDeleteGroups
…FIXME
Note
|
handleDeleteGroups is used when…FIXME
|
handleHeartbeat
Method
handleHeartbeat(
groupId: String,
memberId: String,
generationId: Int,
responseCallback: Errors => Unit)
handleHeartbeat
…FIXME
Note
|
handleHeartbeat is used when…FIXME
|
handleLeaveGroup
Method
handleLeaveGroup(
groupId: String,
memberId: String,
responseCallback: Errors => Unit): Unit
handleLeaveGroup
…FIXME
Note
|
handleLeaveGroup is used when…FIXME
|
scheduleHandleTxnCompletion
Method
scheduleHandleTxnCompletion(
producerId: Long,
offsetsPartitions: Iterable[TopicPartition],
transactionResult: TransactionResult): Unit
scheduleHandleTxnCompletion
…FIXME
Note
|
scheduleHandleTxnCompletion is used exclusively when KafkaApis is requested to handleWriteTxnMarkersRequest.
|
handleTxnCommitOffsets
Method
handleTxnCommitOffsets(
groupId: String,
producerId: Long,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit
handleTxnCommitOffsets
…FIXME
Note
|
handleTxnCommitOffsets is used when…FIXME
|
onGroupLoaded
Internal Callback
onGroupLoaded(group: GroupMetadata): Unit
onGroupLoaded
…FIXME
Note
|
onGroupLoaded is used when…FIXME
|
onGroupUnloaded
Internal Callback
onGroupUnloaded(group: GroupMetadata): Unit
onGroupUnloaded
…FIXME
Note
|
onGroupUnloaded is used when…FIXME
|
Validating Group Status — validateGroupStatus
Internal Method
validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors]
validateGroupStatus
…FIXME
Note
|
validateGroupStatus is used when…FIXME
|
doJoinGroup
Internal Method
doJoinGroup(
group: GroupMetadata,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit
doJoinGroup
…FIXME
Note
|
doJoinGroup is used exclusively when GroupCoordinator is requested to handleJoinGroup.
|
prepareRebalance
Internal Method
prepareRebalance(group: GroupMetadata, reason: String): Unit
prepareRebalance
…FIXME
Note
|
prepareRebalance is used exclusively when GroupCoordinator is requested to maybePrepareRebalance.
|
maybePrepareRebalance
Internal Method
maybePrepareRebalance(group: GroupMetadata, reason: String): Unit
maybePrepareRebalance
…FIXME
Note
|
maybePrepareRebalance is used exclusively when GroupCoordinator is requested to…FIXME
|
addMemberAndRebalance
Internal Method
addMemberAndRebalance(
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): MemberMetadata
addMemberAndRebalance
…FIXME
Note
|
addMemberAndRebalance is used exclusively when GroupCoordinator is requested to doJoinGroup.
|
removeMemberAndUpdateGroup
Internal Method
removeMemberAndUpdateGroup(
group: GroupMetadata,
member: MemberMetadata,
reason: String): Unit
removeMemberAndUpdateGroup
…FIXME
Note
|
removeMemberAndUpdateGroup is used when GroupCoordinator is requested to handleLeaveGroup and onExpireHeartbeat.
|