ReplicaManager
ReplicaManager manages log replicas using the LogManager.
ReplicaManager is given all the required services (e.g. Metrics, KafkaZkClient, Scheduler, LogManager, QuotaManagers, MetadataCache) from the KafkaServer.
|
Note
|
KafkaServer creates a ReplicaManager for the only purpose of creating KafkaApis, GroupCoordinator, and TransactionCoordinator.
|
When started, ReplicaManager schedules isr-expiration and isr-change-propagation recurring tasks (every half of replica.lag.time.max.ms property and 2500 ms, respectively).
ReplicaManager manages a registry of Partitions by TopicPartition.
ReplicaManager can be the leader or a follower of partitions which happens when KafkaApis is requested to handle a LeaderAndIsr request (mostly, if not always, from the KafkaController).
ReplicaManager uses the MetadataCache for the following:
-
…FIXME
|
Tip
|
Enable Add the following line to
Refer to Logging. Please note that
That means that the logs of |
ReplicaManager and ReplicaFetcherManager
ReplicaManager creates a ReplicaFetcherManager while being created (via createReplicaFetcherManager).
ReplicaManager uses the ReplicaFetcherManager for the following:
ReplicaFetcherManager is also used when:
-
DynamicThreadPoolis requested to reconfigure (resize) the thread pool -
KafkaApisis requested to handle a StopReplica request
Performance Metrics
ReplicaManager is a KafkaMetricsGroup with the following performance metrics.
| Metric Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
Number of allPartitions |
|
|
|
The performance metrics are registered in kafka.server:type=ReplicaManager group.
Registry of Partitions by TopicPartition — allPartitions Internal Property
allPartitions: Pool[TopicPartition, Partition]
allPartitions is a registry of Partitions by TopicPartition
The number of partitions in allPartitions is available as the PartitionCount performance metric.
A new TopicPartition is added exclusively when looking up a Partition by a TopicPartition that is not registered yet.
A TopicPartition is removed when stopping a replica (and marked as the OfflinePartition).
|
Note
|
allPartitions is used when getPartition, nonOfflinePartitionsIterator, offlinePartitionsIterator, becomeLeaderOrFollower, maybeShrinkIsr, handleLogDirFailure
|
Creating ReplicaFetcherManager — createReplicaFetcherManager Internal Method
createReplicaFetcherManager(
metrics: Metrics
time: Time
threadNamePrefix: Option[String]
quotaManager: ReplicationQuotaManager): ReplicaFetcherManager
createReplicaFetcherManager simply creates a ReplicaFetcherManager.
|
Note
|
createReplicaFetcherManager is used when ReplicaManager is created.
|
Creating ReplicaAlterLogDirsManager — createReplicaAlterLogDirsManager Internal Method
createReplicaAlterLogDirsManager(
quotaManager: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats): ReplicaAlterLogDirsManager
createReplicaAlterLogDirsManager simply creates a ReplicaAlterLogDirsManager.
|
Note
|
createReplicaAlterLogDirsManager is used when ReplicaManager is created.
|
shutdown Method
shutdown(
checkpointHW: Boolean = true): Unit
shutdown…FIXME
|
Note
|
shutdown is used when KafkaServer is requested to shut down.
|
alterReplicaLogDirs Method
alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors]
alterReplicaLogDirs…FIXME
|
Note
|
alterReplicaLogDirs is used exclusively when KafkaApis is requested to handle an AlterReplicaLogDirs request.
|
Becoming Leader or Follower of Partitions — becomeLeaderOrFollower Method
becomeLeaderOrFollower(
correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse
becomeLeaderOrFollower prints out the following TRACE message to the logs:
Received LeaderAndIsr request [stateInfo] correlation id [correlationId] from controller [controllerId] epoch [controllerEpoch] for partition [topicPartition]
becomeLeaderOrFollower records the current controller epoch (of the LeaderAndIsrRequest) in the controllerEpoch internal registry.
For all valid partition states, becomeLeaderOrFollower finds the partition states with the leader being the (local) broker and that are the partitions for which the broker becomes the leader. All other partition states are for partitions for which the broker becomes a follower.
For all partitions for which the broker becomes the leader, becomeLeaderOrFollower makeLeaders.
For all partitions for which the broker becomes a follower, becomeLeaderOrFollower makeFollowers.
With the hwThreadInitialized internal flag disabled (false), becomeLeaderOrFollower startHighWaterMarksCheckPointThread and turns the flag on (true).
For every new partitions, becomeLeaderOrFollower…FIXME
becomeLeaderOrFollower…FIXME
becomeLeaderOrFollower calls the given onLeadershipChange callback with the partitions for the broker to be the leader and a follower.
In the end, becomeLeaderOrFollower creates a new LeaderAndIsrResponse to "announce" a successful request processing.
|
Note
|
becomeLeaderOrFollower is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.
|
makeFollowers Internal Method
makeFollowers(
controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition]
makeFollowers…FIXME
|
Note
|
makeFollowers is used when ReplicaManager is requested to becomeLeaderOrFollower.
|
recordIsrChange Method
recordIsrChange(topicPartition: TopicPartition): Unit
recordIsrChange adds the input topicPartition to isrChangeSet internal registry and sets lastIsrChangeMs to the current time.
|
Note
|
recordIsrChange is used exclusively when Partition does updateIsr
|
updateFollowerLogReadResults Internal Method
updateFollowerLogReadResults(
replicaId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]
updateFollowerLogReadResults…FIXME
|
Note
|
updateFollowerLogReadResults is used exclusively when ReplicaManager is requested to fetch messages from the leader replica.
|
fetchMessages Method
fetchMessages(
timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): Unit
fetchMessages determines fetchIsolation:
-
FetchLogEndfor a request from a follower or when thereplicaIdisRequest.FutureLocalReplicaId -
FetchTxnCommittedforisolationLevelbeingREAD_COMMITTED -
FetchHighWatermarkotherwise
fetchMessages readFromLocalLog (with the fetchIsolation).
fetchMessages…FIXME
|
Note
|
|
maybePropagateIsrChanges Method
maybePropagateIsrChanges(): Unit
maybePropagateIsrChanges…FIXME
|
Note
|
maybePropagateIsrChanges is used exclusively when isr-change-propagation task is executed (every 2500 milliseconds).
|
Creating ReplicaManager Instance
ReplicaManager takes the following when created:
ReplicaManager initializes the internal registries and counters.
Starting ReplicaManager (and Scheduling ISR-Related Tasks) — startup Method
startup(): Unit
startup requests Scheduler to schedule the ISR-related tasks:
startup then creates a LogDirFailureHandler and requests it to start.
|
Note
|
startup uses Scheduler that was specified when ReplicaManager was created.
|
|
Note
|
startup is used exclusively when KafkaServer starts up.
|
maybeShrinkIsr Internal Method
maybeShrinkIsr(): Unit
maybeShrinkIsr prints out the following TRACE message to the logs:
Evaluating ISR list of partitions to see which replicas can be removed from the ISR
maybeShrinkIsr requests the partitions (from allPartitions pool that are not offline partitions) to maybeShrinkIsr (with replica.lag.time.max.ms property).
|
Note
|
maybeShrinkIsr is used exclusively to schedule isr-expiration recurring task when ReplicaManager starts up.
|
makeLeaders Internal Method
makeLeaders(
controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition]
makeLeaders…FIXME
|
Note
|
makeLeaders is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.
|
describeLogDirs Method
describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo]
describeLogDirs…FIXME
|
Note
|
describeLogDirs is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.
|
Finding Log For TopicPartition — getLog Method
getLog(topicPartition: TopicPartition): Option[Log]
getLog…FIXME
|
Note
|
|
startHighWaterMarksCheckPointThread Method
startHighWaterMarksCheckPointThread(): Unit
startHighWaterMarksCheckPointThread…FIXME
|
Note
|
startHighWaterMarksCheckPointThread is used when…FIXME
|
checkpointHighWatermarks Method
checkpointHighWatermarks(): Unit
checkpointHighWatermarks…FIXME
|
Note
|
checkpointHighWatermarks is used when…FIXME
|
shutdownIdleReplicaAlterLogDirsThread Method
shutdownIdleReplicaAlterLogDirsThread(): Unit
shutdownIdleReplicaAlterLogDirsThread…FIXME
|
Note
|
shutdownIdleReplicaAlterLogDirsThread is used when…FIXME
|
handleLogDirFailure Method
handleLogDirFailure(
dir: String,
sendZkNotification: Boolean = true): Unit
handleLogDirFailure…FIXME
|
Note
|
handleLogDirFailure is used when LogDirFailureHandler is requested to do the work.
|
maybeUpdateMetadataCache Method
maybeUpdateMetadataCache(
correlationId: Int,
updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition]
maybeUpdateMetadataCache…FIXME
|
Note
|
maybeUpdateMetadataCache is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.
|
Appending Records — appendRecords Method
appendRecords(
timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit
appendRecords…FIXME
|
Note
|
|
Validating requiredAcks — isValidRequiredAcks Internal Method
isValidRequiredAcks(requiredAcks: Short): Boolean
isValidRequiredAcks is positive (true) when the given requiredAcks is one of the following:
-
-1 -
1 -
0
Otherwise, isValidRequiredAcks is negative (false).
|
Note
|
isValidRequiredAcks is used exclusively when ReplicaManager is requested to appendRecords.
|
appendToLocalLog Internal Method
appendToLocalLog(
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult]
appendToLocalLog processes (maps over) the given Map[TopicPartition, MemoryRecords] (entriesPerPartition), so that the leader partition (of every TopicPartition) is requested to appendRecordsToLeader.
Internally, appendToLocalLog prints out the following TRACE message to the logs:
Append [[entriesPerPartition]] to local log
For every tuple in the given entriesPerPartition (Map[TopicPartition, MemoryRecords]), appendToLocalLog does the following steps:
-
Requests the BrokerTopicStats to mark the occurrence of an event for the totalProduceRequestRate for the topic (of the
TopicPartition) in the topicStats and for all topics -
Gets the partition (or throws an exception) (with
expectLeaderflag enabled) -
Requests the
Partitionto appendRecordsToLeader (with theMemoryRecords, theisFromClientflag, and therequiredAcksbit map) -
Requests the BrokerTopicStats to mark the
sizeInBytesof theMemoryRecordsfor the bytesInRate for the topic (of theTopicPartition) in the bytesInRate and for all topics -
Requests the BrokerTopicStats to mark the number of messages appended for the messagesInRate for the topic (of the
TopicPartition) in the bytesInRate and for all topics -
Prints out the following TRACE message to the logs:
[sizeInBytes] written to log [topicPartition] beginning at offset [firstOffset] and ending at offset [lastOffset]
In case Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed, appendToLocalLog…FIXME
In case of exceptions, appendToLocalLog…FIXME
|
Note
|
appendToLocalLog is used exclusively when ReplicaManager is requested to append records.
|
Getting Partition Or Throwing Exception — getPartitionOrException Method
getPartitionOrException(
topicPartition: TopicPartition,
expectLeader: Boolean): Partition
getPartitionOrException gets the partition if available or throws one of the following exceptions:
-
KafkaStorageExceptionwhen the partition is offlinePartition [topicPartition] is in an offline log directory -
NotLeaderForPartitionExceptionBroker [localBrokerId] is not a replica of [topicPartition] -
ReplicaNotAvailableExceptionPartition [topicPartition] is not available -
UnknownTopicOrPartitionExceptionPartition [topicPartition] doesn't exist
|
Note
|
getPartitionOrException is used when…FIXME
|
Getting Partition by TopicPartition (If Available) — getPartition Method
getPartition(topicPartition: TopicPartition): Option[Partition]
getPartition gets the partition for the given TopicPartition.
|
Note
|
|
Stopping Partition Replica — stopReplica Method
stopReplica(
topicPartition: TopicPartition,
deletePartition: Boolean): Unit
stopReplica…FIXME
|
Note
|
stopReplica is used exclusively when ReplicaManager is requested to stopReplicas.
|
underReplicatedPartitionCount Method
underReplicatedPartitionCount: Int
underReplicatedPartitionCount…FIXME
|
Note
|
underReplicatedPartitionCount is used exclusively for the UnderReplicatedPartitions performance metric.
|
leaderPartitionsIterator Internal Method
leaderPartitionsIterator: Iterator[Partition]
leaderPartitionsIterator…FIXME
|
Note
|
leaderPartitionsIterator is used exclusively for the performance metrics: LeaderCount, UnderMinIsrPartitionCount, and UnderReplicatedPartitions (indirectly using underReplicatedPartitionCount).
|
nonOfflinePartitionsIterator Internal Method
nonOfflinePartitionsIterator: Iterator[Partition]
nonOfflinePartitionsIterator…FIXME
|
Note
|
nonOfflinePartitionsIterator is used when ReplicaManager is requested to leaderPartitionsIterator, checkpointHighWatermarks, and handleLogDirFailure.
|
Looking Up Partition or Creating New One (by TopicPartition) — getOrCreatePartition Method
getOrCreatePartition(topicPartition: TopicPartition): Partition
getOrCreatePartition simply looks up a Partition by the TopicPartition (in the allPartitions internal registry). If not found, getOrCreatePartition adds a new Partition.
|
Note
|
getOrCreatePartition is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.
|
offlinePartitionsIterator Internal Method
offlinePartitionsIterator: Iterator[Partition]
offlinePartitionsIterator…FIXME
|
Note
|
offlinePartitionsIterator is used when…FIXME
|
markPartitionOffline Method
markPartitionOffline(tp: TopicPartition): Unit
markPartitionOffline…FIXME
|
Note
|
markPartitionOffline is used when…FIXME
|
lastOffsetForLeaderEpoch Method
lastOffsetForLeaderEpoch(
requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]
): Map[TopicPartition, EpochEndOffset]
lastOffsetForLeaderEpoch…FIXME
|
Note
|
lastOffsetForLeaderEpoch is used when…FIXME
|
nonOfflinePartition Method
nonOfflinePartition(topicPartition: TopicPartition): Option[Partition]
nonOfflinePartition…FIXME
|
Note
|
nonOfflinePartition is used when…FIXME
|
deleteRecords Method
deleteRecords(
timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit): Unit
deleteRecords…FIXME
|
Note
|
deleteRecords is used when…FIXME
|
fetchOffsetForTimestamp Method
fetchOffsetForTimestamp(
topicPartition: TopicPartition,
timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): TimestampOffset
fetchOffsetForTimestamp…FIXME
|
Note
|
fetchOffsetForTimestamp is used when…FIXME
|
stopReplicas Method
stopReplicas(
stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors)
stopReplicas…FIXME
|
Note
|
stopReplicas is used exclusively when KafkaApis is requested to handle a StopReplica request.
|
getMagic Method
getMagic(topicPartition: TopicPartition): Option[Byte]
getMagic…FIXME
|
Note
|
getMagic is used when…FIXME
|
localReplicaOrException Method
localReplicaOrException(topicPartition: TopicPartition): Replica
localReplicaOrException finds the partition (or throws an exception) for the given TopicPartition (and expectLeader flag off) and requests the Partition to get the local partition replica (or throw an exception).
|
Note
|
A partition replica is local when the replica ID is exactly the local broker ID. |
|
Note
|
localReplicaOrException is used when…FIXME
|
shouldLeaderThrottle Method
shouldLeaderThrottle(
quota: ReplicaQuota,
topicPartition: TopicPartition,
replicaId: Int): Boolean
shouldLeaderThrottle…FIXME
|
Note
|
shouldLeaderThrottle is used when…FIXME
|
readFromLocalLog Method
readFromLocalLog(
replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)]
readFromLocalLog returns records (as FetchDataInfo) for every TopicPartition in the given readPartitionInfo.
Internally, readFromLocalLog reads the requested PartitionData (up to fetchMaxBytes that is decremented sequentially for every partition) for every TopicPartition (in the given readPartitionInfo collection).
|
Note
|
|
read Internal Method
read(
tp: TopicPartition,
fetchInfo: PartitionData,
limitBytes: Int,
minOneMessage: Boolean): LogReadResult
read increments the totalFetchRequestRate metric of the topic (of the TopicPartition) and the allTopicsStats in the BrokerTopicStats.
read prints out the following TRACE message to the logs:
Fetching log segment for partition [tp], offset [offset], partition fetch size [partitionFetchSize], remaining response limit [limitBytes]
read finds the Partition for the TopicPartition.
read…FIXME
Triggerring Preferred Replica Leader Election — electPreferredLeaders Method
electPreferredLeaders(
controller: KafkaController,
partitions: Set[TopicPartition],
responseCallback: Map[TopicPartition, ApiError] => Unit,
requestTimeout: Long): Unit
electPreferredLeaders simply requests the KafkaController to electPreferredLeaders for the partitions.
|
Note
|
electPreferredLeaders is used exclusively when KafkaApis is requested to handle an ElectPreferredLeaders request.
|
tryCompleteElection Method
tryCompleteElection(key: DelayedOperationKey): Unit
tryCompleteElection…FIXME
|
Note
|
tryCompleteElection is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.
|
localLog Method
localLog(
topicPartition: TopicPartition): Option[Log]
localLog…FIXME
|
Note
|
localLog is used when…FIXME
|
findPreferredReadReplica Method
findPreferredReadReplica(
tp: TopicPartition,
clientMetadata: ClientMetadata,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int]
findPreferredReadReplica finds the Partition for the TopicPartition (with expectLeader flag off).
findPreferredReadReplica…FIXME
|
Note
|
findPreferredReadReplica is used when ReplicaManager is requested to readFromLocalLog.
|
localLogOrException Method
localLogOrException(
topicPartition: TopicPartition): Log
localLogOrException finds the Partition for the given TopicPartition (and expectLeader flag off) and then requests it for the local Log.
localLogOrException throws an exception if neither the Partition nor the Log of the given TopicPartition is available locally.
|
Note
|
localLogOrException is used when ReplicaFetcherThread is requested for the latest epoch, logEndOffset, endOffsetForEpoch, and buildFetch.
|
updateFollowerFetchState Internal Method
updateFollowerFetchState(
followerId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]
updateFollowerFetchState…FIXME
|
Note
|
updateFollowerFetchState is used when ReplicaManager is requested to fetchMessages.
|
Internal Properties
| Name | Description |
|---|---|
|
|
|
Created immediately with Used when:
|
|
|
|
StateChangeLogger with the broker ID and |
|
|
|
|
|
Time when isrChangeSet has a new |
|
|
|