ReplicaManager
ReplicaManager
manages log replicas using the LogManager.
ReplicaManager
is given all the required services (e.g. Metrics, KafkaZkClient, Scheduler, LogManager, QuotaManagers, MetadataCache) from the KafkaServer
.
Note
|
KafkaServer creates a ReplicaManager for the only purpose of creating KafkaApis, GroupCoordinator, and TransactionCoordinator.
|
When started, ReplicaManager
schedules isr-expiration and isr-change-propagation recurring tasks (every half of replica.lag.time.max.ms property and 2500 ms, respectively).
ReplicaManager
manages a registry of Partitions by TopicPartition.
ReplicaManager
can be the leader or a follower of partitions which happens when KafkaApis
is requested to handle a LeaderAndIsr request (mostly, if not always, from the KafkaController).
ReplicaManager
uses the MetadataCache for the following:
-
…FIXME
Tip
|
Enable Add the following line to
Refer to Logging. Please note that
That means that the logs of |
ReplicaManager and ReplicaFetcherManager
ReplicaManager
creates a ReplicaFetcherManager while being created (via createReplicaFetcherManager).
ReplicaManager
uses the ReplicaFetcherManager
for the following:
ReplicaFetcherManager
is also used when:
-
DynamicThreadPool
is requested to reconfigure (resize) the thread pool -
KafkaApis
is requested to handle a StopReplica request
Performance Metrics
ReplicaManager
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
Number of allPartitions |
|
|
|
The performance metrics are registered in kafka.server:type=ReplicaManager group.
Registry of Partitions by TopicPartition — allPartitions
Internal Property
allPartitions: Pool[TopicPartition, Partition]
allPartitions
is a registry of Partitions by TopicPartition
The number of partitions in allPartitions
is available as the PartitionCount performance metric.
A new TopicPartition
is added exclusively when looking up a Partition by a TopicPartition that is not registered yet.
A TopicPartition
is removed when stopping a replica (and marked as the OfflinePartition).
Note
|
allPartitions is used when getPartition, nonOfflinePartitionsIterator, offlinePartitionsIterator, becomeLeaderOrFollower, maybeShrinkIsr, handleLogDirFailure
|
Creating ReplicaFetcherManager — createReplicaFetcherManager
Internal Method
createReplicaFetcherManager(
metrics: Metrics
time: Time
threadNamePrefix: Option[String]
quotaManager: ReplicationQuotaManager): ReplicaFetcherManager
createReplicaFetcherManager
simply creates a ReplicaFetcherManager.
Note
|
createReplicaFetcherManager is used when ReplicaManager is created.
|
Creating ReplicaAlterLogDirsManager — createReplicaAlterLogDirsManager
Internal Method
createReplicaAlterLogDirsManager(
quotaManager: ReplicationQuotaManager,
brokerTopicStats: BrokerTopicStats): ReplicaAlterLogDirsManager
createReplicaAlterLogDirsManager
simply creates a ReplicaAlterLogDirsManager.
Note
|
createReplicaAlterLogDirsManager is used when ReplicaManager is created.
|
shutdown
Method
shutdown(
checkpointHW: Boolean = true): Unit
shutdown
…FIXME
Note
|
shutdown is used when KafkaServer is requested to shut down.
|
alterReplicaLogDirs
Method
alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors]
alterReplicaLogDirs
…FIXME
Note
|
alterReplicaLogDirs is used exclusively when KafkaApis is requested to handle an AlterReplicaLogDirs request.
|
Becoming Leader or Follower of Partitions — becomeLeaderOrFollower
Method
becomeLeaderOrFollower(
correlationId: Int,
leaderAndIsrRequest: LeaderAndIsrRequest,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse
becomeLeaderOrFollower
prints out the following TRACE message to the logs:
Received LeaderAndIsr request [stateInfo] correlation id [correlationId] from controller [controllerId] epoch [controllerEpoch] for partition [topicPartition]
becomeLeaderOrFollower
records the current controller epoch (of the LeaderAndIsrRequest) in the controllerEpoch internal registry.
For all valid partition states, becomeLeaderOrFollower
finds the partition states with the leader being the (local) broker and that are the partitions for which the broker becomes the leader. All other partition states are for partitions for which the broker becomes a follower.
For all partitions for which the broker becomes the leader, becomeLeaderOrFollower
makeLeaders.
For all partitions for which the broker becomes a follower, becomeLeaderOrFollower
makeFollowers.
With the hwThreadInitialized internal flag disabled (false
), becomeLeaderOrFollower
startHighWaterMarksCheckPointThread and turns the flag on (true
).
For every new partitions, becomeLeaderOrFollower
…FIXME
becomeLeaderOrFollower
…FIXME
becomeLeaderOrFollower
calls the given onLeadershipChange
callback with the partitions for the broker to be the leader and a follower.
In the end, becomeLeaderOrFollower
creates a new LeaderAndIsrResponse
to "announce" a successful request processing.
Note
|
becomeLeaderOrFollower is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.
|
makeFollowers
Internal Method
makeFollowers(
controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition]
makeFollowers
…FIXME
Note
|
makeFollowers is used when ReplicaManager is requested to becomeLeaderOrFollower.
|
recordIsrChange
Method
recordIsrChange(topicPartition: TopicPartition): Unit
recordIsrChange
adds the input topicPartition
to isrChangeSet internal registry and sets lastIsrChangeMs to the current time.
Note
|
recordIsrChange is used exclusively when Partition does updateIsr
|
updateFollowerLogReadResults
Internal Method
updateFollowerLogReadResults(
replicaId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]
updateFollowerLogReadResults
…FIXME
Note
|
updateFollowerLogReadResults is used exclusively when ReplicaManager is requested to fetch messages from the leader replica.
|
fetchMessages
Method
fetchMessages(
timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
isolationLevel: IsolationLevel,
clientMetadata: Option[ClientMetadata]): Unit
fetchMessages
determines fetchIsolation
:
-
FetchLogEnd
for a request from a follower or when thereplicaId
isRequest.FutureLocalReplicaId
-
FetchTxnCommitted
forisolationLevel
beingREAD_COMMITTED
-
FetchHighWatermark
otherwise
fetchMessages
readFromLocalLog (with the fetchIsolation
).
fetchMessages
…FIXME
Note
|
|
maybePropagateIsrChanges
Method
maybePropagateIsrChanges(): Unit
maybePropagateIsrChanges
…FIXME
Note
|
maybePropagateIsrChanges is used exclusively when isr-change-propagation task is executed (every 2500 milliseconds).
|
Creating ReplicaManager Instance
ReplicaManager
takes the following when created:
ReplicaManager
initializes the internal registries and counters.
Starting ReplicaManager (and Scheduling ISR-Related Tasks) — startup
Method
startup(): Unit
startup
requests Scheduler to schedule the ISR-related tasks:
startup
then creates a LogDirFailureHandler and requests it to start.
Note
|
startup uses Scheduler that was specified when ReplicaManager was created.
|
Note
|
startup is used exclusively when KafkaServer starts up.
|
maybeShrinkIsr
Internal Method
maybeShrinkIsr(): Unit
maybeShrinkIsr
prints out the following TRACE message to the logs:
Evaluating ISR list of partitions to see which replicas can be removed from the ISR
maybeShrinkIsr
requests the partitions (from allPartitions pool that are not offline partitions) to maybeShrinkIsr (with replica.lag.time.max.ms property).
Note
|
maybeShrinkIsr is used exclusively to schedule isr-expiration recurring task when ReplicaManager starts up.
|
makeLeaders
Internal Method
makeLeaders(
controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition]
makeLeaders
…FIXME
Note
|
makeLeaders is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.
|
describeLogDirs
Method
describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo]
describeLogDirs
…FIXME
Note
|
describeLogDirs is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.
|
Finding Log For TopicPartition — getLog
Method
getLog(topicPartition: TopicPartition): Option[Log]
getLog
…FIXME
Note
|
|
startHighWaterMarksCheckPointThread
Method
startHighWaterMarksCheckPointThread(): Unit
startHighWaterMarksCheckPointThread
…FIXME
Note
|
startHighWaterMarksCheckPointThread is used when…FIXME
|
checkpointHighWatermarks
Method
checkpointHighWatermarks(): Unit
checkpointHighWatermarks
…FIXME
Note
|
checkpointHighWatermarks is used when…FIXME
|
shutdownIdleReplicaAlterLogDirsThread
Method
shutdownIdleReplicaAlterLogDirsThread(): Unit
shutdownIdleReplicaAlterLogDirsThread
…FIXME
Note
|
shutdownIdleReplicaAlterLogDirsThread is used when…FIXME
|
handleLogDirFailure
Method
handleLogDirFailure(
dir: String,
sendZkNotification: Boolean = true): Unit
handleLogDirFailure
…FIXME
Note
|
handleLogDirFailure is used when LogDirFailureHandler is requested to do the work.
|
maybeUpdateMetadataCache
Method
maybeUpdateMetadataCache(
correlationId: Int,
updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition]
maybeUpdateMetadataCache
…FIXME
Note
|
maybeUpdateMetadataCache is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.
|
Appending Records — appendRecords
Method
appendRecords(
timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit
appendRecords
…FIXME
Note
|
|
Validating requiredAcks — isValidRequiredAcks
Internal Method
isValidRequiredAcks(requiredAcks: Short): Boolean
isValidRequiredAcks
is positive (true
) when the given requiredAcks
is one of the following:
-
-1
-
1
-
0
Otherwise, isValidRequiredAcks
is negative (false
).
Note
|
isValidRequiredAcks is used exclusively when ReplicaManager is requested to appendRecords.
|
appendToLocalLog
Internal Method
appendToLocalLog(
internalTopicsAllowed: Boolean,
isFromClient: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult]
appendToLocalLog
processes (maps over) the given Map[TopicPartition, MemoryRecords]
(entriesPerPartition
), so that the leader partition (of every TopicPartition
) is requested to appendRecordsToLeader.
Internally, appendToLocalLog
prints out the following TRACE message to the logs:
Append [[entriesPerPartition]] to local log
For every tuple in the given entriesPerPartition
(Map[TopicPartition, MemoryRecords]
), appendToLocalLog
does the following steps:
-
Requests the BrokerTopicStats to mark the occurrence of an event for the totalProduceRequestRate for the topic (of the
TopicPartition
) in the topicStats and for all topics -
Gets the partition (or throws an exception) (with
expectLeader
flag enabled) -
Requests the
Partition
to appendRecordsToLeader (with theMemoryRecords
, theisFromClient
flag, and therequiredAcks
bit map) -
Requests the BrokerTopicStats to mark the
sizeInBytes
of theMemoryRecords
for the bytesInRate for the topic (of theTopicPartition
) in the bytesInRate and for all topics -
Requests the BrokerTopicStats to mark the number of messages appended for the messagesInRate for the topic (of the
TopicPartition
) in the bytesInRate and for all topics -
Prints out the following TRACE message to the logs:
[sizeInBytes] written to log [topicPartition] beginning at offset [firstOffset] and ending at offset [lastOffset]
In case Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed
, appendToLocalLog
…FIXME
In case of exceptions, appendToLocalLog
…FIXME
Note
|
appendToLocalLog is used exclusively when ReplicaManager is requested to append records.
|
Getting Partition Or Throwing Exception — getPartitionOrException
Method
getPartitionOrException(
topicPartition: TopicPartition,
expectLeader: Boolean): Partition
getPartitionOrException
gets the partition if available or throws one of the following exceptions:
-
KafkaStorageException
when the partition is offlinePartition [topicPartition] is in an offline log directory
-
NotLeaderForPartitionException
Broker [localBrokerId] is not a replica of [topicPartition]
-
ReplicaNotAvailableException
Partition [topicPartition] is not available
-
UnknownTopicOrPartitionException
Partition [topicPartition] doesn't exist
Note
|
getPartitionOrException is used when…FIXME
|
Getting Partition by TopicPartition (If Available) — getPartition
Method
getPartition(topicPartition: TopicPartition): Option[Partition]
getPartition
gets the partition for the given TopicPartition
.
Note
|
|
Stopping Partition Replica — stopReplica
Method
stopReplica(
topicPartition: TopicPartition,
deletePartition: Boolean): Unit
stopReplica
…FIXME
Note
|
stopReplica is used exclusively when ReplicaManager is requested to stopReplicas.
|
underReplicatedPartitionCount
Method
underReplicatedPartitionCount: Int
underReplicatedPartitionCount
…FIXME
Note
|
underReplicatedPartitionCount is used exclusively for the UnderReplicatedPartitions performance metric.
|
leaderPartitionsIterator
Internal Method
leaderPartitionsIterator: Iterator[Partition]
leaderPartitionsIterator
…FIXME
Note
|
leaderPartitionsIterator is used exclusively for the performance metrics: LeaderCount, UnderMinIsrPartitionCount, and UnderReplicatedPartitions (indirectly using underReplicatedPartitionCount).
|
nonOfflinePartitionsIterator
Internal Method
nonOfflinePartitionsIterator: Iterator[Partition]
nonOfflinePartitionsIterator
…FIXME
Note
|
nonOfflinePartitionsIterator is used when ReplicaManager is requested to leaderPartitionsIterator, checkpointHighWatermarks, and handleLogDirFailure.
|
Looking Up Partition or Creating New One (by TopicPartition) — getOrCreatePartition
Method
getOrCreatePartition(topicPartition: TopicPartition): Partition
getOrCreatePartition
simply looks up a Partition by the TopicPartition
(in the allPartitions internal registry). If not found, getOrCreatePartition
adds a new Partition
.
Note
|
getOrCreatePartition is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.
|
offlinePartitionsIterator
Internal Method
offlinePartitionsIterator: Iterator[Partition]
offlinePartitionsIterator
…FIXME
Note
|
offlinePartitionsIterator is used when…FIXME
|
markPartitionOffline
Method
markPartitionOffline(tp: TopicPartition): Unit
markPartitionOffline
…FIXME
Note
|
markPartitionOffline is used when…FIXME
|
lastOffsetForLeaderEpoch
Method
lastOffsetForLeaderEpoch(
requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]
): Map[TopicPartition, EpochEndOffset]
lastOffsetForLeaderEpoch
…FIXME
Note
|
lastOffsetForLeaderEpoch is used when…FIXME
|
nonOfflinePartition
Method
nonOfflinePartition(topicPartition: TopicPartition): Option[Partition]
nonOfflinePartition
…FIXME
Note
|
nonOfflinePartition is used when…FIXME
|
deleteRecords
Method
deleteRecords(
timeout: Long,
offsetPerPartition: Map[TopicPartition, Long],
responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit): Unit
deleteRecords
…FIXME
Note
|
deleteRecords is used when…FIXME
|
fetchOffsetForTimestamp
Method
fetchOffsetForTimestamp(
topicPartition: TopicPartition,
timestamp: Long,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): TimestampOffset
fetchOffsetForTimestamp
…FIXME
Note
|
fetchOffsetForTimestamp is used when…FIXME
|
stopReplicas
Method
stopReplicas(
stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors)
stopReplicas
…FIXME
Note
|
stopReplicas is used exclusively when KafkaApis is requested to handle a StopReplica request.
|
getMagic
Method
getMagic(topicPartition: TopicPartition): Option[Byte]
getMagic
…FIXME
Note
|
getMagic is used when…FIXME
|
localReplicaOrException
Method
localReplicaOrException(topicPartition: TopicPartition): Replica
localReplicaOrException
finds the partition (or throws an exception) for the given TopicPartition
(and expectLeader
flag off) and requests the Partition
to get the local partition replica (or throw an exception).
Note
|
A partition replica is local when the replica ID is exactly the local broker ID. |
Note
|
localReplicaOrException is used when…FIXME
|
shouldLeaderThrottle
Method
shouldLeaderThrottle(
quota: ReplicaQuota,
topicPartition: TopicPartition,
replicaId: Int): Boolean
shouldLeaderThrottle
…FIXME
Note
|
shouldLeaderThrottle is used when…FIXME
|
readFromLocalLog
Method
readFromLocalLog(
replicaId: Int,
fetchOnlyFromLeader: Boolean,
fetchIsolation: FetchIsolation,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota,
clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)]
readFromLocalLog
returns records (as FetchDataInfo
) for every TopicPartition
in the given readPartitionInfo
.
Internally, readFromLocalLog
reads the requested PartitionData
(up to fetchMaxBytes
that is decremented sequentially for every partition) for every TopicPartition
(in the given readPartitionInfo
collection).
Note
|
|
read
Internal Method
read(
tp: TopicPartition,
fetchInfo: PartitionData,
limitBytes: Int,
minOneMessage: Boolean): LogReadResult
read
increments the totalFetchRequestRate metric of the topic (of the TopicPartition
) and the allTopicsStats in the BrokerTopicStats.
read
prints out the following TRACE message to the logs:
Fetching log segment for partition [tp], offset [offset], partition fetch size [partitionFetchSize], remaining response limit [limitBytes]
read
finds the Partition for the TopicPartition
.
read
…FIXME
Triggerring Preferred Replica Leader Election — electPreferredLeaders
Method
electPreferredLeaders(
controller: KafkaController,
partitions: Set[TopicPartition],
responseCallback: Map[TopicPartition, ApiError] => Unit,
requestTimeout: Long): Unit
electPreferredLeaders
simply requests the KafkaController
to electPreferredLeaders for the partitions.
Note
|
electPreferredLeaders is used exclusively when KafkaApis is requested to handle an ElectPreferredLeaders request.
|
tryCompleteElection
Method
tryCompleteElection(key: DelayedOperationKey): Unit
tryCompleteElection
…FIXME
Note
|
tryCompleteElection is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.
|
localLog
Method
localLog(
topicPartition: TopicPartition): Option[Log]
localLog
…FIXME
Note
|
localLog is used when…FIXME
|
findPreferredReadReplica
Method
findPreferredReadReplica(
tp: TopicPartition,
clientMetadata: ClientMetadata,
replicaId: Int,
fetchOffset: Long,
currentTimeMs: Long): Option[Int]
findPreferredReadReplica
finds the Partition for the TopicPartition
(with expectLeader
flag off).
findPreferredReadReplica
…FIXME
Note
|
findPreferredReadReplica is used when ReplicaManager is requested to readFromLocalLog.
|
localLogOrException
Method
localLogOrException(
topicPartition: TopicPartition): Log
localLogOrException
finds the Partition for the given TopicPartition
(and expectLeader
flag off) and then requests it for the local Log.
localLogOrException
throws an exception if neither the Partition
nor the Log
of the given TopicPartition
is available locally.
Note
|
localLogOrException is used when ReplicaFetcherThread is requested for the latest epoch, logEndOffset, endOffsetForEpoch, and buildFetch.
|
updateFollowerFetchState
Internal Method
updateFollowerFetchState(
followerId: Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]
updateFollowerFetchState
…FIXME
Note
|
updateFollowerFetchState is used when ReplicaManager is requested to fetchMessages.
|
Internal Properties
Name | Description |
---|---|
|
|
|
Created immediately with Used when:
|
|
|
|
StateChangeLogger with the broker ID and |
|
|
|
|
|
Time when isrChangeSet has a new |
|
|
|