GroupMetadataManager
GroupMetadataManager is a KafkaMetricsGroup and registers performance metrics.
| Metric Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
The performance metrics are registered in kafka.coordinator.group:type=GroupMetadataManager group.
| Name | Description |
|---|---|
|
|
|
GroupMetadata by group ID |
|
Number of partitions for the |
|
KafkaScheduler with 1 daemon thread with group-metadata-manager- prefix Used when:
|
enableMetadataExpiration Method
enableMetadataExpiration(): Unit
enableMetadataExpiration requests KafkaScheduler to start.
enableMetadataExpiration schedules delete-expired-group-metadata task that cleanupGroupMetadata every offsetsRetentionCheckIntervalMs milliseconds.
|
Note
|
enableMetadataExpiration is used exclusively when GroupCoordinator is started.
|
Creating GroupMetadataManager Instance
GroupMetadataManager takes the following when created:
GroupMetadataManager initializes the internal registries and counters.
cleanupGroupMetadata Internal Method
cleanupGroupMetadata(): Unit (1)
cleanupGroupMetadata(deletedTopicPartitions: Option[Seq[TopicPartition]]): Unit
-
Calls the other
cleanupGroupMetadatawith emptydeletedTopicPartitionscollection
cleanupGroupMetadata takes the current time (using time) and for every GroupMetadata in the cache does the following:
-
FIXME
In the end, cleanupGroupMetadata prints out the following INFO message to the logs:
Removed [offsetsRemoved] expired offsets in [duration] milliseconds
|
Note
|
cleanupGroupMetadata is used exclusively when GroupMetadataManager is requested to enableMetadataExpiration (as delete-expired-group-metadata task).
|
Getting Number of Partitions for __consumer_offsets Consumer Group Metadata Topic — getGroupMetadataTopicPartitionCount Internal Method
getGroupMetadataTopicPartitionCount: Int
getGroupMetadataTopicPartitionCount requests the KafkaZkClient for getTopicPartitionCount of __consumer_offsets consumer group metadata topic.
If not available, getGroupMetadataTopicPartitionCount uses the OffsetConfig for offsetsTopicNumPartitions (default: 50).
|
Note
|
getGroupMetadataTopicPartitionCount is used exclusively when GroupMetadataManager is requested for groupMetadataTopicPartitionCount.
|
startup Method
startup(enableMetadataExpiration: Boolean): Unit
startup…FIXME
|
Note
|
startup is used when…FIXME
|
scheduleLoadGroupAndOffsets Method
scheduleLoadGroupAndOffsets(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit): Unit
scheduleLoadGroupAndOffsets…FIXME
|
Note
|
scheduleLoadGroupAndOffsets is used when…FIXME
|
removeGroupsForPartition Method
removeGroupsForPartition(
offsetsPartition: Int,
onGroupUnloaded: GroupMetadata => Unit): Unit
removeGroupsForPartition…FIXME
|
Note
|
removeGroupsForPartition is used when…FIXME
|
scheduleHandleTxnCompletion Method
scheduleHandleTxnCompletion(
producerId: Long,
completedPartitions: Set[Int],
isCommit: Boolean): Unit
scheduleHandleTxnCompletion…FIXME
|
Note
|
scheduleHandleTxnCompletion is used exclusively when GroupCoordinator is requested to scheduleHandleTxnCompletion.
|
loadGroupsAndOffsets Method
loadGroupsAndOffsets(
topicPartition: TopicPartition,
onGroupLoaded: GroupMetadata => Unit): Unit
loadGroupsAndOffsets…FIXME
|
Note
|
loadGroupsAndOffsets is used when…FIXME
|
removeGroupsAndOffsets Internal Method
removeGroupsAndOffsets(): Unit
removeGroupsAndOffsets…FIXME
|
Note
|
removeGroupsAndOffsets is used when…FIXME
|
handleTxnCompletion Method
handleTxnCompletion(
producerId: Long,
completedPartitions: Set[Int],
isCommit: Boolean): Unit
handleTxnCompletion…FIXME
|
Note
|
handleTxnCompletion is used exclusively when GroupMetadataManager is requested to scheduleHandleTxnCompletion.
|
partitionFor Method
partitionFor(groupId: String): Int
partitionFor…FIXME
|
Note
|
partitionFor is used when…FIXME
|
storeOffsets Method
storeOffsets(
group: GroupMetadata,
consumerId: String,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH): Unit
storeOffsets…FIXME
|
Note
|
storeOffsets is used exclusively when GroupCoordinator is requested to doCommitOffsets.
|
storeGroup Method
storeGroup(
group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Unit
storeGroup…FIXME
|
Note
|
storeGroup is used exclusively when GroupCoordinator is requested to doSyncGroup and onCompleteJoin.
|
Requesting ReplicaManager to Append Records — appendForGroup Internal Method
appendForGroup(
group: GroupMetadata,
records: Map[TopicPartition, MemoryRecords],
callback: Map[TopicPartition, PartitionResponse] => Unit): Unit
appendForGroup simply requests the ReplicaManager to append records.
|
Note
|
appendForGroup is used exclusively when GroupMetadataManager is requested to storeGroup and storeOffsets.
|
addGroup Method
addGroup(group: GroupMetadata): GroupMetadata
addGroup…FIXME
|
Note
|
addGroup is used when…FIXME
|
Getting Metadata of Group by Group ID — getGroup Method
getGroup(groupId: String): Option[GroupMetadata]
getGroup finds the GroupMetadata for the group ID in the groupMetadataCache internal registry.
getGroup returns None if the metadata could not be found.
|
Note
|
|
readGroupMessageValue Method
readGroupMessageValue(
groupId: String,
buffer: ByteBuffer,
time: Time): GroupMetadata
readGroupMessageValue…FIXME
|
Note
|
|
doLoadGroupsAndOffsets Internal Method
doLoadGroupsAndOffsets(
topicPartition: TopicPartition,
onGroupLoaded: GroupMetadata => Unit): Unit
doLoadGroupsAndOffsets…FIXME
|
Note
|
doLoadGroupsAndOffsets is used exclusively when GroupMetadataManager is requested to loadGroupsAndOffsets.
|
groupNotExists Method
groupNotExists(groupId: String): Boolean
groupNotExists…FIXME
|
Note
|
groupNotExists is used exclusively when GroupCoordinator is requested to handleDeleteGroups
|