GroupCoordinator

GroupCoordinator is also used for partitionFor.

GroupCoordinator is created and immediately started when KafkaServer is requested to start up for the only purpose of creating the KafkaApis.

GroupCoordinator startup.png
Figure 1. GroupCoordinator’s Startup

GroupCoordinator manages the GroupMetadataManager. GroupCoordinator uses isActive flag to control whether the GroupMetadataManager was requested to start up (when GroupCoordinator was) that is used in handleListGroups and validateGroupStatus.

GroupCoordinator uses [GroupCoordinator [brokerId]] as the logging prefix (aka logIdent).

Tip

Enable DEBUG logging level for kafka.coordinator.group.GroupCoordinator logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.coordinator.group.GroupCoordinator=DEBUG

Refer to Logging.

Creating GroupCoordinator Instance — apply Factory Method

apply(
  config: KafkaConfig,
  zkClient: KafkaZkClient,
  replicaManager: ReplicaManager,
  time: Time): GroupCoordinator (1)
apply(
  config: KafkaConfig,
  zkClient: KafkaZkClient,
  replicaManager: ReplicaManager,
  heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
  joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
  time: Time): GroupCoordinator
  1. Creates a DelayedOperationPurgatory and a DelayedOperationPurgatory

apply…​FIXME

Note
apply is used exclusively when KafkaServer is requested to start up.

Creating GroupCoordinator Instance

GroupCoordinator takes the following to be created:

GroupCoordinator initializes the internal registries and counters.

Starting Up — startup Method

startup(enableMetadataExpiration: Boolean = true): Unit

startup first prints out the following INFO message to the logs:

Starting up.

startup requests the GroupMetadataManager to startup (with the given enableMetadataExpiration flag).

startup turns the isActive internal flag on.

In the end, startup prints out the following INFO message to the logs:

Startup complete.
Note
startup is used exclusively when KafkaServer is requested to start up.

partitionFor Method

partitionFor(group: String): Int

partitionFor simply requests the GroupMetadataManager to partitionFor the given group ID.

Note

partitionFor is used when:

handleCommitOffsets Method

handleCommitOffsets(
  groupId: String,
  memberId: String,
  generationId: Int,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

handleCommitOffsets firstly validateGroupStatus (for the given groupId and OFFSET_COMMIT API key).

handleCommitOffsets requests the GroupMetadataManager to get the metadata of the group (by the given groupId) and then doCommitOffsets.

If the GroupMetadataManager could not getGroup, handleCommitOffsets…​FIXME

In case of an error while validateGroupStatus, handleCommitOffsets…​FIXME

Note
handleCommitOffsets is used exclusively when KafkaApis is requested to handle an OffsetCommitRequest.

doCommitOffsets Internal Method

doCommitOffsets(
  group: GroupMetadata,
  memberId: String,
  generationId: Int,
  producerId: Long,
  producerEpoch: Short,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

doCommitOffsets…​FIXME

Note
doCommitOffsets is used when GroupCoordinator is requested to handleTxnCommitOffsets and handleCommitOffsets.

onCompleteJoin Method

onCompleteJoin(group: GroupMetadata): Unit

onCompleteJoin…​FIXME

Note
onCompleteJoin is used exclusively when DelayedJoin delayed operation is requested to onComplete.

doSyncGroup Internal Method

doSyncGroup(
  group: GroupMetadata,
  generationId: Int,
  memberId: String,
  groupAssignment: Map[String, Array[Byte]],
  responseCallback: SyncCallback): Unit

doSyncGroup…​FIXME

Note
doSyncGroup is used when…​FIXME

handleDescribeGroup Method

handleDescribeGroup(groupId: String): (Errors, GroupSummary)

handleDescribeGroup…​FIXME

Note
handleDescribeGroup is used exclusively when KafkaApis is requested to handleDescribeGroupRequest.

handleGroupImmigration Method

handleGroupImmigration(offsetTopicPartitionId: Int): Unit

handleGroupImmigration simply requests the GroupMetadataManager to scheduleLoadGroupAndOffsets (for the given offset and with the onGroupLoaded callback)

Note
handleGroupImmigration is used exclusively when KafkaApis is requested to handle a LeaderAndIsrRequest.

handleGroupEmigration Method

handleGroupEmigration(offsetTopicPartitionId: Int): Unit

handleGroupEmigration simply requests the GroupMetadataManager to removeGroupsForPartition (for the given offset and with the onGroupUnloaded callback).

Note
handleGroupEmigration is used when KafkaApis is requested to handle a LeaderAndIsrRequest and a StopReplicaRequest.

Handling Deleted Partitions — handleDeletedPartitions Method

handleDeletedPartitions(
  topicPartitions: Seq[TopicPartition]): Unit

handleDeletedPartitions simply requests the GroupMetadataManager to cleanupGroupMetadata and…​FIXME

Note
handleDeletedPartitions is used when…​FIXME

handleFetchOffsets Method

handleFetchOffsets(
  groupId: String,
  partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData])

handleFetchOffsets…​FIXME

Note
handleFetchOffsets is used when…​FIXME

handleListGroups Method

handleListGroups(): (Errors, List[GroupOverview])

handleListGroups…​FIXME

Note
handleListGroups is used when…​FIXME

handleJoinGroup Method

handleJoinGroup(
  groupId: String,
  memberId: String,
  clientId: String,
  clientHost: String,
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  responseCallback: JoinCallback): Unit

handleJoinGroup starts by validating the status of the group and the coordinator itself. In case of an error, handleJoinGroup uses the given JoinCallback to report it back and returns.

handleJoinGroup validates the group configuration, namely the given sessionTimeoutMs. In case of an error, handleJoinGroup uses the given JoinCallback to report a INVALID_SESSION_TIMEOUT error back and returns.

handleJoinGroup requests the GroupMetadataManager to getGroup by the given groupId.

If the group could not be found and the given memberId is defined (i.e. not empty), handleJoinGroup uses the given JoinCallback to report a UNKNOWN_MEMBER_ID error back and returns.

If the group could not be found and the given memberId is undefined (i.e. empty) or simply the group is available, handleJoinGroup requests the GroupMetadataManager to addGroup followed by doJoinGroup.

Note
handleJoinGroup is used exclusively when KafkaApis is requested to handle a JoinGroupRequest.

handleSyncGroup Method

handleSyncGroup(
  groupId: String,
  generation: Int,
  memberId: String,
  groupAssignment: Map[String, Array[Byte]],
  responseCallback: SyncCallback): Unit

handleSyncGroup…​FIXME

Note
handleSyncGroup is used when…​FIXME

handleDeleteGroups Method

handleDeleteGroups(groupIds: Set[String]): Map[String, Errors]

handleDeleteGroups…​FIXME

Note
handleDeleteGroups is used when…​FIXME

handleHeartbeat Method

handleHeartbeat(
  groupId: String,
  memberId: String,
  generationId: Int,
  responseCallback: Errors => Unit)

handleHeartbeat…​FIXME

Note
handleHeartbeat is used when…​FIXME

handleLeaveGroup Method

handleLeaveGroup(
  groupId: String,
  memberId: String,
  responseCallback: Errors => Unit): Unit

handleLeaveGroup…​FIXME

Note
handleLeaveGroup is used when…​FIXME

scheduleHandleTxnCompletion Method

scheduleHandleTxnCompletion(
  producerId: Long,
  offsetsPartitions: Iterable[TopicPartition],
  transactionResult: TransactionResult): Unit

scheduleHandleTxnCompletion…​FIXME

Note
scheduleHandleTxnCompletion is used exclusively when KafkaApis is requested to handleWriteTxnMarkersRequest.

handleTxnCommitOffsets Method

handleTxnCommitOffsets(
  groupId: String,
  producerId: Long,
  producerEpoch: Short,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

handleTxnCommitOffsets…​FIXME

Note
handleTxnCommitOffsets is used when…​FIXME

onGroupLoaded Internal Callback

onGroupLoaded(group: GroupMetadata): Unit

onGroupLoaded…​FIXME

Note
onGroupLoaded is used when…​FIXME

onGroupUnloaded Internal Callback

onGroupUnloaded(group: GroupMetadata): Unit

onGroupUnloaded…​FIXME

Note
onGroupUnloaded is used when…​FIXME

Validating Group Status — validateGroupStatus Internal Method

validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors]

validateGroupStatus…​FIXME

Note
validateGroupStatus is used when…​FIXME

doJoinGroup Internal Method

doJoinGroup(
  group: GroupMetadata,
  memberId: String,
  clientId: String,
  clientHost: String,
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  responseCallback: JoinCallback): Unit

doJoinGroup…​FIXME

Note
doJoinGroup is used exclusively when GroupCoordinator is requested to handleJoinGroup.

prepareRebalance Internal Method

prepareRebalance(group: GroupMetadata, reason: String): Unit

prepareRebalance…​FIXME

Note
prepareRebalance is used exclusively when GroupCoordinator is requested to maybePrepareRebalance.

maybePrepareRebalance Internal Method

maybePrepareRebalance(group: GroupMetadata, reason: String): Unit

maybePrepareRebalance…​FIXME

Note
maybePrepareRebalance is used exclusively when GroupCoordinator is requested to…​FIXME

addMemberAndRebalance Internal Method

addMemberAndRebalance(
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  clientId: String,
  clientHost: String,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  group: GroupMetadata,
  callback: JoinCallback): MemberMetadata

addMemberAndRebalance…​FIXME

Note
addMemberAndRebalance is used exclusively when GroupCoordinator is requested to doJoinGroup.

removeMemberAndUpdateGroup Internal Method

removeMemberAndUpdateGroup(
  group: GroupMetadata,
  member: MemberMetadata,
  reason: String): Unit

removeMemberAndUpdateGroup…​FIXME

Note
removeMemberAndUpdateGroup is used when GroupCoordinator is requested to handleLeaveGroup and onExpireHeartbeat.

onExpireHeartbeat Method

onExpireHeartbeat(
  group: GroupMetadata,
  member: MemberMetadata,
  heartbeatDeadline: Long): Unit

onExpireHeartbeat…​FIXME

Note
onExpireHeartbeat is used exclusively when DelayedHeartbeat is requested to onExpiration.

results matching ""

    No results matching ""