ReplicaManager

ReplicaManager manages log replicas using the LogManager.

ReplicaManager is created and immediately started when KafkaServer is requested to start up.

ReplicaManager.png
Figure 1. ReplicaManager and KafkaServer

ReplicaManager is given all the required services (e.g. Metrics, KafkaZkClient, Scheduler, LogManager, QuotaManagers, MetadataCache) from the KafkaServer.

Note
KafkaServer creates a ReplicaManager for the only purpose of creating KafkaApis, GroupCoordinator, and TransactionCoordinator.

When started, ReplicaManager schedules isr-expiration and isr-change-propagation recurring tasks (every half of replica.lag.time.max.ms property and 2500 ms, respectively).

ReplicaManager manages a registry of Partitions by TopicPartition.

ReplicaManager can be the leader or a follower of partitions which happens when KafkaApis is requested to handle a LeaderAndIsr request (mostly, if not always, from the KafkaController).

ReplicaManager uses the MetadataCache for the following:

  • …​FIXME

Tip

Enable ALL logging level for kafka.server.ReplicaManager logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.server.ReplicaManager=ALL

Refer to Logging.


Please note that ReplicaManager also uses a preconfigured state.change.logger logger in config/log4j.properties:

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

That means that the logs of ReplicaManager go to logs/state-change.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

ReplicaManager and ReplicaFetcherManager

ReplicaManager creates a ReplicaFetcherManager while being created (via createReplicaFetcherManager).

ReplicaManager ReplicaFetcherManager.png
Figure 2. ReplicaManager and ReplicaFetcherManager

ReplicaManager uses the ReplicaFetcherManager for the following:

ReplicaFetcherManager is also used when:

Performance Metrics

ReplicaManager is a KafkaMetricsGroup with the following performance metrics.

Table 1. ReplicaManager’s Performance Metrics
Metric Name Description

FailedIsrUpdatesPerSec

IsrExpandsPerSec

IsrShrinksPerSec

LeaderCount

OfflineReplicaCount

PartitionCount

Number of allPartitions

UnderMinIsrPartitionCount

UnderReplicatedPartitions

underReplicatedPartitionCount

The performance metrics are registered in kafka.server:type=ReplicaManager group.

ReplicaManager jconsole.png
Figure 3. ReplicaManager in jconsole

Registry of Partitions by TopicPartition — allPartitions Internal Property

allPartitions: Pool[TopicPartition, Partition]

allPartitions is a registry of Partitions by TopicPartition

The number of partitions in allPartitions is available as the PartitionCount performance metric.

A new TopicPartition is added exclusively when looking up a Partition by a TopicPartition that is not registered yet.

A TopicPartition is removed when stopping a replica (and marked as the OfflinePartition).

Creating ReplicaFetcherManager — createReplicaFetcherManager Internal Method

createReplicaFetcherManager(
  metrics: Metrics
  time: Time
  threadNamePrefix: Option[String]
  quotaManager: ReplicationQuotaManager): ReplicaFetcherManager

createReplicaFetcherManager simply creates a ReplicaFetcherManager.

Note
createReplicaFetcherManager is used when ReplicaManager is created.

Creating ReplicaAlterLogDirsManager — createReplicaAlterLogDirsManager Internal Method

createReplicaAlterLogDirsManager(
  quotaManager: ReplicationQuotaManager,
  brokerTopicStats: BrokerTopicStats): ReplicaAlterLogDirsManager

createReplicaAlterLogDirsManager simply creates a ReplicaAlterLogDirsManager.

Note
createReplicaAlterLogDirsManager is used when ReplicaManager is created.

shutdown Method

shutdown(
  checkpointHW: Boolean = true): Unit

shutdown…​FIXME

Note
shutdown is used when KafkaServer is requested to shut down.

alterReplicaLogDirs Method

alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors]

alterReplicaLogDirs…​FIXME

Note
alterReplicaLogDirs is used exclusively when KafkaApis is requested to handle an AlterReplicaLogDirs request.

Becoming Leader or Follower of Partitions — becomeLeaderOrFollower Method

becomeLeaderOrFollower(
  correlationId: Int,
  leaderAndIsrRequest: LeaderAndIsrRequest,
  onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse

becomeLeaderOrFollower prints out the following TRACE message to the logs:

Received LeaderAndIsr request [stateInfo] correlation id [correlationId] from controller [controllerId] epoch [controllerEpoch] for partition [topicPartition]

becomeLeaderOrFollower records the current controller epoch (of the LeaderAndIsrRequest) in the controllerEpoch internal registry.

For all valid partition states, becomeLeaderOrFollower finds the partition states with the leader being the (local) broker and that are the partitions for which the broker becomes the leader. All other partition states are for partitions for which the broker becomes a follower.

For all partitions for which the broker becomes the leader, becomeLeaderOrFollower makeLeaders.

For all partitions for which the broker becomes a follower, becomeLeaderOrFollower makeFollowers.

With the hwThreadInitialized internal flag disabled (false), becomeLeaderOrFollower startHighWaterMarksCheckPointThread and turns the flag on (true).

For every new partitions, becomeLeaderOrFollower…​FIXME

becomeLeaderOrFollower…​FIXME

becomeLeaderOrFollower calls the given onLeadershipChange callback with the partitions for the broker to be the leader and a follower.

In the end, becomeLeaderOrFollower creates a new LeaderAndIsrResponse to "announce" a successful request processing.

Note
becomeLeaderOrFollower is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.

makeFollowers Internal Method

makeFollowers(
  controllerId: Int,
  epoch: Int,
  partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
  correlationId: Int,
  responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition]

makeFollowers…​FIXME

Note
makeFollowers is used when ReplicaManager is requested to becomeLeaderOrFollower.

recordIsrChange Method

recordIsrChange(topicPartition: TopicPartition): Unit

recordIsrChange adds the input topicPartition to isrChangeSet internal registry and sets lastIsrChangeMs to the current time.

Note
recordIsrChange is used exclusively when Partition does updateIsr

updateFollowerLogReadResults Internal Method

updateFollowerLogReadResults(
  replicaId: Int,
  readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]

updateFollowerLogReadResults…​FIXME

Note
updateFollowerLogReadResults is used exclusively when ReplicaManager is requested to fetch messages from the leader replica.

fetchMessages Method

fetchMessages(
  timeout: Long,
  replicaId: Int,
  fetchMinBytes: Int,
  fetchMaxBytes: Int,
  hardMaxBytesLimit: Boolean,
  fetchInfos: Seq[(TopicPartition, PartitionData)],
  quota: ReplicaQuota,
  responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
  isolationLevel: IsolationLevel,
  clientMetadata: Option[ClientMetadata]): Unit

fetchMessages determines fetchIsolation:

  • FetchLogEnd for a request from a follower or when the replicaId is Request.FutureLocalReplicaId

  • FetchTxnCommitted for isolationLevel being READ_COMMITTED

  • FetchHighWatermark otherwise

fetchMessages readFromLocalLog (with the fetchIsolation).

fetchMessages…​FIXME

Note

fetchMessages is used when:

maybePropagateIsrChanges Method

maybePropagateIsrChanges(): Unit

maybePropagateIsrChanges…​FIXME

Note
maybePropagateIsrChanges is used exclusively when isr-change-propagation task is executed (every 2500 milliseconds).

Creating ReplicaManager Instance

ReplicaManager takes the following when created:

ReplicaManager initializes the internal registries and counters.

startup(): Unit

startup then creates a LogDirFailureHandler and requests it to start.

Note
startup uses Scheduler that was specified when ReplicaManager was created.
Note
startup is used exclusively when KafkaServer starts up.

maybeShrinkIsr Internal Method

maybeShrinkIsr(): Unit

maybeShrinkIsr prints out the following TRACE message to the logs:

Evaluating ISR list of partitions to see which replicas can be removed from the ISR

maybeShrinkIsr requests the partitions (from allPartitions pool that are not offline partitions) to maybeShrinkIsr (with replica.lag.time.max.ms property).

Note
maybeShrinkIsr is used exclusively to schedule isr-expiration recurring task when ReplicaManager starts up.

makeLeaders Internal Method

makeLeaders(
  controllerId: Int,
  epoch: Int,
  partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
  correlationId: Int,
  responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition]

makeLeaders…​FIXME

Note
makeLeaders is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.

describeLogDirs Method

describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo]

describeLogDirs…​FIXME

Note
describeLogDirs is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.

Finding Log For TopicPartition — getLog Method

getLog(topicPartition: TopicPartition): Option[Log]

getLog…​FIXME

Note

getLog is used when:

startHighWaterMarksCheckPointThread Method

startHighWaterMarksCheckPointThread(): Unit

startHighWaterMarksCheckPointThread…​FIXME

Note
startHighWaterMarksCheckPointThread is used when…​FIXME

checkpointHighWatermarks Method

checkpointHighWatermarks(): Unit

checkpointHighWatermarks…​FIXME

Note
checkpointHighWatermarks is used when…​FIXME

shutdownIdleReplicaAlterLogDirsThread Method

shutdownIdleReplicaAlterLogDirsThread(): Unit

shutdownIdleReplicaAlterLogDirsThread…​FIXME

Note
shutdownIdleReplicaAlterLogDirsThread is used when…​FIXME

handleLogDirFailure Method

handleLogDirFailure(
  dir: String,
  sendZkNotification: Boolean = true): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used when LogDirFailureHandler is requested to do the work.

maybeUpdateMetadataCache Method

maybeUpdateMetadataCache(
  correlationId: Int,
  updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition]

maybeUpdateMetadataCache…​FIXME

Note
maybeUpdateMetadataCache is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.

Appending Records — appendRecords Method

appendRecords(
  timeout: Long,
  requiredAcks: Short,
  internalTopicsAllowed: Boolean,
  isFromClient: Boolean,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
  delayedProduceLock: Option[Lock] = None,
  recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit

appendRecords…​FIXME

Note

appendRecords is used when:

Validating requiredAcks — isValidRequiredAcks Internal Method

isValidRequiredAcks(requiredAcks: Short): Boolean

isValidRequiredAcks is positive (true) when the given requiredAcks is one of the following:

  • -1

  • 1

  • 0

Otherwise, isValidRequiredAcks is negative (false).

Note
isValidRequiredAcks is used exclusively when ReplicaManager is requested to appendRecords.

appendToLocalLog Internal Method

appendToLocalLog(
  internalTopicsAllowed: Boolean,
  isFromClient: Boolean,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  requiredAcks: Short): Map[TopicPartition, LogAppendResult]

appendToLocalLog processes (maps over) the given Map[TopicPartition, MemoryRecords] (entriesPerPartition), so that the leader partition (of every TopicPartition) is requested to appendRecordsToLeader.

Internally, appendToLocalLog prints out the following TRACE message to the logs:

Append [[entriesPerPartition]] to local log

For every tuple in the given entriesPerPartition (Map[TopicPartition, MemoryRecords]), appendToLocalLog does the following steps:

  1. Requests the BrokerTopicStats to mark the occurrence of an event for the totalProduceRequestRate for the topic (of the TopicPartition) in the topicStats and for all topics

  2. Gets the partition (or throws an exception) (with expectLeader flag enabled)

  3. Requests the Partition to appendRecordsToLeader (with the MemoryRecords, the isFromClient flag, and the requiredAcks bit map)

  4. Requests the BrokerTopicStats to mark the sizeInBytes of the MemoryRecords for the bytesInRate for the topic (of the TopicPartition) in the bytesInRate and for all topics

  5. Requests the BrokerTopicStats to mark the number of messages appended for the messagesInRate for the topic (of the TopicPartition) in the bytesInRate and for all topics

  6. Prints out the following TRACE message to the logs:

    [sizeInBytes] written to log [topicPartition] beginning at offset [firstOffset] and ending at offset [lastOffset]

In case Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed, appendToLocalLog…​FIXME

In case of exceptions, appendToLocalLog…​FIXME

Note
appendToLocalLog is used exclusively when ReplicaManager is requested to append records.

Getting Partition Or Throwing Exception — getPartitionOrException Method

getPartitionOrException(
  topicPartition: TopicPartition,
  expectLeader: Boolean): Partition

getPartitionOrException gets the partition if available or throws one of the following exceptions:

  • KafkaStorageException when the partition is offline

    Partition [topicPartition] is in an offline log directory
  • NotLeaderForPartitionException

    Broker [localBrokerId] is not a replica of [topicPartition]
  • ReplicaNotAvailableException

    Partition [topicPartition] is not available
  • UnknownTopicOrPartitionException

    Partition [topicPartition] doesn't exist
Note
getPartitionOrException is used when…​FIXME

Getting Partition by TopicPartition (If Available) — getPartition Method

getPartition(topicPartition: TopicPartition): Option[Partition]

getPartition gets the partition for the given TopicPartition.

Note

getPartition is used when:

Stopping Partition Replica — stopReplica Method

stopReplica(
  topicPartition: TopicPartition,
  deletePartition: Boolean): Unit

stopReplica…​FIXME

Note
stopReplica is used exclusively when ReplicaManager is requested to stopReplicas.

underReplicatedPartitionCount Method

underReplicatedPartitionCount: Int

underReplicatedPartitionCount…​FIXME

Note
underReplicatedPartitionCount is used exclusively for the UnderReplicatedPartitions performance metric.

leaderPartitionsIterator Internal Method

leaderPartitionsIterator: Iterator[Partition]

leaderPartitionsIterator…​FIXME

Note
leaderPartitionsIterator is used exclusively for the performance metrics: LeaderCount, UnderMinIsrPartitionCount, and UnderReplicatedPartitions (indirectly using underReplicatedPartitionCount).

nonOfflinePartitionsIterator Internal Method

nonOfflinePartitionsIterator: Iterator[Partition]

nonOfflinePartitionsIterator…​FIXME

Note
nonOfflinePartitionsIterator is used when ReplicaManager is requested to leaderPartitionsIterator, checkpointHighWatermarks, and handleLogDirFailure.

Looking Up Partition or Creating New One (by TopicPartition) — getOrCreatePartition Method

getOrCreatePartition(topicPartition: TopicPartition): Partition

getOrCreatePartition simply looks up a Partition by the TopicPartition (in the allPartitions internal registry). If not found, getOrCreatePartition adds a new Partition.

Note
getOrCreatePartition is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.

offlinePartitionsIterator Internal Method

offlinePartitionsIterator: Iterator[Partition]

offlinePartitionsIterator…​FIXME

Note
offlinePartitionsIterator is used when…​FIXME

markPartitionOffline Method

markPartitionOffline(tp: TopicPartition): Unit

markPartitionOffline…​FIXME

Note
markPartitionOffline is used when…​FIXME

lastOffsetForLeaderEpoch Method

lastOffsetForLeaderEpoch(
  requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]
): Map[TopicPartition, EpochEndOffset]

lastOffsetForLeaderEpoch…​FIXME

Note
lastOffsetForLeaderEpoch is used when…​FIXME

nonOfflinePartition Method

nonOfflinePartition(topicPartition: TopicPartition): Option[Partition]

nonOfflinePartition…​FIXME

Note
nonOfflinePartition is used when…​FIXME

deleteRecords Method

deleteRecords(
  timeout: Long,
  offsetPerPartition: Map[TopicPartition, Long],
  responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit): Unit

deleteRecords…​FIXME

Note
deleteRecords is used when…​FIXME

fetchOffsetForTimestamp Method

fetchOffsetForTimestamp(
  topicPartition: TopicPartition,
  timestamp: Long,
  isolationLevel: Option[IsolationLevel],
  currentLeaderEpoch: Optional[Integer],
  fetchOnlyFromLeader: Boolean): TimestampOffset

fetchOffsetForTimestamp…​FIXME

Note
fetchOffsetForTimestamp is used when…​FIXME

stopReplicas Method

stopReplicas(
  stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors)

stopReplicas…​FIXME

Note
stopReplicas is used exclusively when KafkaApis is requested to handle a StopReplica request.

getMagic Method

getMagic(topicPartition: TopicPartition): Option[Byte]

getMagic…​FIXME

Note
getMagic is used when…​FIXME

localReplicaOrException Method

localReplicaOrException(topicPartition: TopicPartition): Replica

localReplicaOrException finds the partition (or throws an exception) for the given TopicPartition (and expectLeader flag off) and requests the Partition to get the local partition replica (or throw an exception).

Note
A partition replica is local when the replica ID is exactly the local broker ID.
Note
localReplicaOrException is used when…​FIXME

shouldLeaderThrottle Method

shouldLeaderThrottle(
  quota: ReplicaQuota,
  topicPartition: TopicPartition,
  replicaId: Int): Boolean

shouldLeaderThrottle…​FIXME

Note
shouldLeaderThrottle is used when…​FIXME

readFromLocalLog Method

readFromLocalLog(
  replicaId: Int,
  fetchOnlyFromLeader: Boolean,
  fetchIsolation: FetchIsolation,
  fetchMaxBytes: Int,
  hardMaxBytesLimit: Boolean,
  readPartitionInfo: Seq[(TopicPartition, PartitionData)],
  quota: ReplicaQuota,
  clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)]

readFromLocalLog returns records (as FetchDataInfo) for every TopicPartition in the given readPartitionInfo.

Internally, readFromLocalLog reads the requested PartitionData (up to fetchMaxBytes that is decremented sequentially for every partition) for every TopicPartition (in the given readPartitionInfo collection).

Note

readFromLocalLog is used when:

read Internal Method

read(
  tp: TopicPartition,
  fetchInfo: PartitionData,
  limitBytes: Int,
  minOneMessage: Boolean): LogReadResult

read increments the totalFetchRequestRate metric of the topic (of the TopicPartition) and the allTopicsStats in the BrokerTopicStats.

read prints out the following TRACE message to the logs:

Fetching log segment for partition [tp], offset [offset], partition fetch size [partitionFetchSize], remaining response limit [limitBytes]

read finds the Partition for the TopicPartition.

read…​FIXME

Triggerring Preferred Replica Leader Election — electPreferredLeaders Method

electPreferredLeaders(
  controller: KafkaController,
  partitions: Set[TopicPartition],
  responseCallback: Map[TopicPartition, ApiError] => Unit,
  requestTimeout: Long): Unit

electPreferredLeaders simply requests the KafkaController to electPreferredLeaders for the partitions.

Note
electPreferredLeaders is used exclusively when KafkaApis is requested to handle an ElectPreferredLeaders request.

tryCompleteElection Method

tryCompleteElection(key: DelayedOperationKey): Unit

tryCompleteElection…​FIXME

Note
tryCompleteElection is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.

localLog Method

localLog(
  topicPartition: TopicPartition): Option[Log]

localLog…​FIXME

Note
localLog is used when…​FIXME

findPreferredReadReplica Method

findPreferredReadReplica(
  tp: TopicPartition,
  clientMetadata: ClientMetadata,
  replicaId: Int,
  fetchOffset: Long,
  currentTimeMs: Long): Option[Int]

findPreferredReadReplica finds the Partition for the TopicPartition (with expectLeader flag off).

findPreferredReadReplica…​FIXME

Note
findPreferredReadReplica is used when ReplicaManager is requested to readFromLocalLog.

localLogOrException Method

localLogOrException(
  topicPartition: TopicPartition): Log

localLogOrException finds the Partition for the given TopicPartition (and expectLeader flag off) and then requests it for the local Log.

localLogOrException throws an exception if neither the Partition nor the Log of the given TopicPartition is available locally.

Note
localLogOrException is used when ReplicaFetcherThread is requested for the latest epoch, logEndOffset, endOffsetForEpoch, and buildFetch.

updateFollowerFetchState Internal Method

updateFollowerFetchState(
  followerId: Int,
  readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]

updateFollowerFetchState…​FIXME

Note
updateFollowerFetchState is used when ReplicaManager is requested to fetchMessages.

Internal Properties

Name Description

controllerEpoch

replicaAlterLogDirsManager

Created immediately with ReplicaManager

Used when:

highWatermarkCheckpoints

OffsetCheckpointFiles per live log data directory

stateChangeLogger

StateChangeLogger with the broker ID and inControllerContext flag off (that sets Broker log prefix)

hwThreadInitialized

isrChangeSet

Collection of TopicPartition that…​FIXME

lastIsrChangeMs

Time when isrChangeSet has a new TopicPartition added.

logDirFailureHandler

OfflinePartition

results matching ""

    No results matching ""