GroupCoordinator

GroupCoordinator is also used for partitionFor.

GroupCoordinator is created and immediately started when KafkaServer is requested to start up for the only purpose of creating the KafkaApis.

GroupCoordinator startup.png
Figure 1. GroupCoordinator’s Startup

GroupCoordinator manages the GroupMetadataManager. GroupCoordinator uses isActive flag to control whether the GroupMetadataManager was requested to start up (when GroupCoordinator was) that is used in handleListGroups and validateGroupStatus.

GroupCoordinator uses [GroupCoordinator [brokerId]] as the logging prefix (aka logIdent).

Tip

Enable DEBUG logging level for kafka.coordinator.group.GroupCoordinator logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.coordinator.group.GroupCoordinator=DEBUG

Refer to Logging.

Creating GroupCoordinator Instance — apply Factory Method

apply(
  config: KafkaConfig,
  zkClient: KafkaZkClient,
  replicaManager: ReplicaManager,
  time: Time): GroupCoordinator (1)
apply(
  config: KafkaConfig,
  zkClient: KafkaZkClient,
  replicaManager: ReplicaManager,
  heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
  joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
  time: Time): GroupCoordinator
  1. Creates a DelayedOperationPurgatory and a DelayedOperationPurgatory

apply…​FIXME

Note
apply is used exclusively when KafkaServer is requested to start up.

Creating GroupCoordinator Instance

GroupCoordinator takes the following to be created:

GroupCoordinator initializes the internal registries and counters.

Starting Up — startup Method

startup(enableMetadataExpiration: Boolean = true): Unit

startup first prints out the following INFO message to the logs:

Starting up.

startup requests the GroupMetadataManager to startup (with the given enableMetadataExpiration flag).

startup turns the isActive internal flag on.

In the end, startup prints out the following INFO message to the logs:

Startup complete.
Note
startup is used exclusively when KafkaServer is requested to start up.

partitionFor Method

partitionFor(group: String): Int

partitionFor simply requests the GroupMetadataManager to partitionFor the input group ID.

Note
partitionFor is used when…​FIXME

handleCommitOffsets Method

handleCommitOffsets(
  groupId: String,
  memberId: String,
  generationId: Int,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

handleCommitOffsets…​FIXME

Note
handleCommitOffsets is used exclusively when KafkaApis is requested to handleOffsetCommitRequest.

doCommitOffsets Internal Method

doCommitOffsets(
  group: GroupMetadata,
  memberId: String,
  generationId: Int,
  producerId: Long,
  producerEpoch: Short,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

doCommitOffsets…​FIXME

Note
doCommitOffsets is used when GroupCoordinator is requested to handleTxnCommitOffsets and handleCommitOffsets.

onCompleteJoin Method

onCompleteJoin(group: GroupMetadata): Unit

onCompleteJoin…​FIXME

Note
onCompleteJoin is used when…​FIXME

doSyncGroup Internal Method

doSyncGroup(
  group: GroupMetadata,
  generationId: Int,
  memberId: String,
  groupAssignment: Map[String, Array[Byte]],
  responseCallback: SyncCallback): Unit

doSyncGroup…​FIXME

Note
doSyncGroup is used when…​FIXME

handleDescribeGroup Method

handleDescribeGroup(groupId: String): (Errors, GroupSummary)

handleDescribeGroup…​FIXME

Note
handleDescribeGroup is used exclusively when KafkaApis is requested to handleDescribeGroupRequest.

handleGroupImmigration Method

handleGroupImmigration(offsetTopicPartitionId: Int): Unit

handleGroupImmigration simply requests the GroupMetadataManager to scheduleLoadGroupAndOffsets (for the given offset and with the onGroupLoaded callback)

Note
handleGroupImmigration is used exclusively when KafkaApis is requested to handle a LeaderAndIsrRequest.

handleGroupEmigration Method

handleGroupEmigration(offsetTopicPartitionId: Int): Unit

handleGroupEmigration simply requests the GroupMetadataManager to removeGroupsForPartition (for the given offset and with the onGroupUnloaded callback).

Note
handleGroupEmigration is used when KafkaApis is requested to handle a LeaderAndIsrRequest and a StopReplicaRequest.

handleDeletedPartitions Method

handleDeletedPartitions(topicPartitions: Seq[TopicPartition]): Unit

handleDeletedPartitions simply requests the GroupMetadataManager to cleanupGroupMetadata and…​FIXME

Note
handleDeletedPartitions is used when…​FIXME

handleFetchOffsets Method

handleFetchOffsets(
  groupId: String,
  partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData])

handleFetchOffsets…​FIXME

Note
handleFetchOffsets is used when…​FIXME

handleListGroups Method

handleListGroups(): (Errors, List[GroupOverview])

handleListGroups…​FIXME

Note
handleListGroups is used when…​FIXME

handleJoinGroup Method

handleJoinGroup(
  groupId: String,
  memberId: String,
  clientId: String,
  clientHost: String,
  rebalanceTimeoutMs: Int,
  sessionTimeoutMs: Int,
  protocolType: String,
  protocols: List[(String, Array[Byte])],
  responseCallback: JoinCallback): Unit

handleJoinGroup…​FIXME

Note
handleJoinGroup is used when…​FIXME

handleSyncGroup Method

handleSyncGroup(
  groupId: String,
  generation: Int,
  memberId: String,
  groupAssignment: Map[String, Array[Byte]],
  responseCallback: SyncCallback): Unit

handleSyncGroup…​FIXME

Note
handleSyncGroup is used when…​FIXME

handleDeleteGroups Method

handleDeleteGroups(groupIds: Set[String]): Map[String, Errors]

handleDeleteGroups…​FIXME

Note
handleDeleteGroups is used when…​FIXME

handleHeartbeat Method

handleHeartbeat(
  groupId: String,
  memberId: String,
  generationId: Int,
  responseCallback: Errors => Unit)

handleHeartbeat…​FIXME

Note
handleHeartbeat is used when…​FIXME

handleLeaveGroup Method

handleLeaveGroup(
  groupId: String,
  memberId: String,
  responseCallback: Errors => Unit): Unit

handleLeaveGroup…​FIXME

Note
handleLeaveGroup is used when…​FIXME

scheduleHandleTxnCompletion Method

scheduleHandleTxnCompletion(
  producerId: Long,
  offsetsPartitions: Iterable[TopicPartition],
  transactionResult: TransactionResult): Unit

scheduleHandleTxnCompletion…​FIXME

Note
scheduleHandleTxnCompletion is used when…​FIXME

handleTxnCommitOffsets Method

handleTxnCommitOffsets(
  groupId: String,
  producerId: Long,
  producerEpoch: Short,
  offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
  responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit

handleTxnCommitOffsets…​FIXME

Note
handleTxnCommitOffsets is used when…​FIXME

onGroupLoaded Internal Callback

onGroupLoaded(group: GroupMetadata): Unit

onGroupLoaded…​FIXME

Note
onGroupLoaded is used when…​FIXME

onGroupUnloaded Internal Callback

onGroupUnloaded(group: GroupMetadata): Unit

onGroupUnloaded…​FIXME

Note
onGroupUnloaded is used when…​FIXME

validateGroupStatus Internal Method

validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors]

validateGroupStatus…​FIXME

Note
validateGroupStatus is used when…​FIXME

results matching ""

    No results matching ""