GroupCoordinator
GroupCoordinator is used (in KafkaApis) for handleGroupImmigration, handleGroupEmigration, handleDeletedPartitions, handleCommitOffsets, handleFetchOffsets, handleDescribeGroup, handleListGroups, handleJoinGroup, handleSyncGroup, handleDeleteGroups, handleLeaveGroup, handleHeartbeat, handleTxnCommitOffsets and scheduleHandleTxnCompletion (that all simply request the GroupMetadataManager to handle them).
GroupCoordinator is also used for partitionFor.
GroupCoordinator is created and immediately started when KafkaServer is requested to start up for the only purpose of creating the KafkaApis.
GroupCoordinator manages the GroupMetadataManager. GroupCoordinator uses isActive flag to control whether the GroupMetadataManager was requested to start up (when GroupCoordinator was) that is used in handleListGroups and validateGroupStatus.
GroupCoordinator uses [GroupCoordinator [brokerId]] as the logging prefix (aka logIdent).
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating GroupCoordinator Instance — apply Factory Method
apply(
config: KafkaConfig,
zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
time: Time): GroupCoordinator (1)
apply(
config: KafkaConfig,
zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
time: Time): GroupCoordinator
-
Creates a
DelayedOperationPurgatoryand aDelayedOperationPurgatory
apply…FIXME
|
Note
|
apply is used exclusively when KafkaServer is requested to start up.
|
Creating GroupCoordinator Instance
GroupCoordinator takes the following to be created:
GroupCoordinator initializes the internal registries and counters.
Starting Up — startup Method
startup(enableMetadataExpiration: Boolean = true): Unit
startup first prints out the following INFO message to the logs:
Starting up.
startup requests the GroupMetadataManager to startup (with the given enableMetadataExpiration flag).
startup turns the isActive internal flag on.
In the end, startup prints out the following INFO message to the logs:
Startup complete.
|
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
partitionFor Method
partitionFor(group: String): Int
partitionFor simply requests the GroupMetadataManager to partitionFor the given group ID.
|
Note
|
|
handleCommitOffsets Method
handleCommitOffsets(
groupId: String,
memberId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit
handleCommitOffsets firstly validateGroupStatus (for the given groupId and OFFSET_COMMIT API key).
handleCommitOffsets requests the GroupMetadataManager to get the metadata of the group (by the given groupId) and then doCommitOffsets.
If the GroupMetadataManager could not getGroup, handleCommitOffsets…FIXME
In case of an error while validateGroupStatus, handleCommitOffsets…FIXME
|
Note
|
handleCommitOffsets is used exclusively when KafkaApis is requested to handle an OffsetCommitRequest.
|
doCommitOffsets Internal Method
doCommitOffsets(
group: GroupMetadata,
memberId: String,
generationId: Int,
producerId: Long,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit
doCommitOffsets…FIXME
|
Note
|
doCommitOffsets is used when GroupCoordinator is requested to handleTxnCommitOffsets and handleCommitOffsets.
|
onCompleteJoin Method
onCompleteJoin(group: GroupMetadata): Unit
onCompleteJoin…FIXME
|
Note
|
onCompleteJoin is used exclusively when DelayedJoin delayed operation is requested to onComplete.
|
doSyncGroup Internal Method
doSyncGroup(
group: GroupMetadata,
generationId: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit
doSyncGroup…FIXME
|
Note
|
doSyncGroup is used when…FIXME
|
handleDescribeGroup Method
handleDescribeGroup(groupId: String): (Errors, GroupSummary)
handleDescribeGroup…FIXME
|
Note
|
handleDescribeGroup is used exclusively when KafkaApis is requested to handleDescribeGroupRequest.
|
handleGroupImmigration Method
handleGroupImmigration(offsetTopicPartitionId: Int): Unit
handleGroupImmigration simply requests the GroupMetadataManager to scheduleLoadGroupAndOffsets (for the given offset and with the onGroupLoaded callback)
|
Note
|
handleGroupImmigration is used exclusively when KafkaApis is requested to handle a LeaderAndIsrRequest.
|
handleGroupEmigration Method
handleGroupEmigration(offsetTopicPartitionId: Int): Unit
handleGroupEmigration simply requests the GroupMetadataManager to removeGroupsForPartition (for the given offset and with the onGroupUnloaded callback).
|
Note
|
handleGroupEmigration is used when KafkaApis is requested to handle a LeaderAndIsrRequest and a StopReplicaRequest.
|
Handling Deleted Partitions — handleDeletedPartitions Method
handleDeletedPartitions(
topicPartitions: Seq[TopicPartition]): Unit
handleDeletedPartitions simply requests the GroupMetadataManager to cleanupGroupMetadata and…FIXME
|
Note
|
handleDeletedPartitions is used when…FIXME
|
handleFetchOffsets Method
handleFetchOffsets(
groupId: String,
partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData])
handleFetchOffsets…FIXME
|
Note
|
handleFetchOffsets is used when…FIXME
|
handleListGroups Method
handleListGroups(): (Errors, List[GroupOverview])
handleListGroups…FIXME
|
Note
|
handleListGroups is used when…FIXME
|
handleJoinGroup Method
handleJoinGroup(
groupId: String,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit
handleJoinGroup starts by validating the status of the group and the coordinator itself. In case of an error, handleJoinGroup uses the given JoinCallback to report it back and returns.
handleJoinGroup validates the group configuration, namely the given sessionTimeoutMs. In case of an error, handleJoinGroup uses the given JoinCallback to report a INVALID_SESSION_TIMEOUT error back and returns.
handleJoinGroup requests the GroupMetadataManager to getGroup by the given groupId.
If the group could not be found and the given memberId is defined (i.e. not empty), handleJoinGroup uses the given JoinCallback to report a UNKNOWN_MEMBER_ID error back and returns.
If the group could not be found and the given memberId is undefined (i.e. empty) or simply the group is available, handleJoinGroup requests the GroupMetadataManager to addGroup followed by doJoinGroup.
|
Note
|
handleJoinGroup is used exclusively when KafkaApis is requested to handle a JoinGroupRequest.
|
handleSyncGroup Method
handleSyncGroup(
groupId: String,
generation: Int,
memberId: String,
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit
handleSyncGroup…FIXME
|
Note
|
handleSyncGroup is used when…FIXME
|
handleDeleteGroups Method
handleDeleteGroups(groupIds: Set[String]): Map[String, Errors]
handleDeleteGroups…FIXME
|
Note
|
handleDeleteGroups is used when…FIXME
|
handleHeartbeat Method
handleHeartbeat(
groupId: String,
memberId: String,
generationId: Int,
responseCallback: Errors => Unit)
handleHeartbeat…FIXME
|
Note
|
handleHeartbeat is used when…FIXME
|
handleLeaveGroup Method
handleLeaveGroup(
groupId: String,
memberId: String,
responseCallback: Errors => Unit): Unit
handleLeaveGroup…FIXME
|
Note
|
handleLeaveGroup is used when…FIXME
|
scheduleHandleTxnCompletion Method
scheduleHandleTxnCompletion(
producerId: Long,
offsetsPartitions: Iterable[TopicPartition],
transactionResult: TransactionResult): Unit
scheduleHandleTxnCompletion…FIXME
|
Note
|
scheduleHandleTxnCompletion is used exclusively when KafkaApis is requested to handleWriteTxnMarkersRequest.
|
handleTxnCommitOffsets Method
handleTxnCommitOffsets(
groupId: String,
producerId: Long,
producerEpoch: Short,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit
handleTxnCommitOffsets…FIXME
|
Note
|
handleTxnCommitOffsets is used when…FIXME
|
onGroupLoaded Internal Callback
onGroupLoaded(group: GroupMetadata): Unit
onGroupLoaded…FIXME
|
Note
|
onGroupLoaded is used when…FIXME
|
onGroupUnloaded Internal Callback
onGroupUnloaded(group: GroupMetadata): Unit
onGroupUnloaded…FIXME
|
Note
|
onGroupUnloaded is used when…FIXME
|
Validating Group Status — validateGroupStatus Internal Method
validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors]
validateGroupStatus…FIXME
|
Note
|
validateGroupStatus is used when…FIXME
|
doJoinGroup Internal Method
doJoinGroup(
group: GroupMetadata,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit
doJoinGroup…FIXME
|
Note
|
doJoinGroup is used exclusively when GroupCoordinator is requested to handleJoinGroup.
|
prepareRebalance Internal Method
prepareRebalance(group: GroupMetadata, reason: String): Unit
prepareRebalance…FIXME
|
Note
|
prepareRebalance is used exclusively when GroupCoordinator is requested to maybePrepareRebalance.
|
maybePrepareRebalance Internal Method
maybePrepareRebalance(group: GroupMetadata, reason: String): Unit
maybePrepareRebalance…FIXME
|
Note
|
maybePrepareRebalance is used exclusively when GroupCoordinator is requested to…FIXME
|
addMemberAndRebalance Internal Method
addMemberAndRebalance(
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): MemberMetadata
addMemberAndRebalance…FIXME
|
Note
|
addMemberAndRebalance is used exclusively when GroupCoordinator is requested to doJoinGroup.
|
removeMemberAndUpdateGroup Internal Method
removeMemberAndUpdateGroup(
group: GroupMetadata,
member: MemberMetadata,
reason: String): Unit
removeMemberAndUpdateGroup…FIXME
|
Note
|
removeMemberAndUpdateGroup is used when GroupCoordinator is requested to handleLeaveGroup and onExpireHeartbeat.
|