ReplicaManager

ReplicaManager manages log replicas using the LogManager.

ReplicaManager is created and immediately started when KafkaServer is requested to start up.

ReplicaManager.png
Figure 1. ReplicaManager and KafkaServer

ReplicaManager is given all the required services (e.g. Metrics, KafkaZkClient, Scheduler, LogManager, QuotaManagers, MetadataCache) from the KafkaServer.

Note
KafkaServer creates a ReplicaManager for the only purpose of creating KafkaApis, GroupCoordinator, and TransactionCoordinator.

When started, ReplicaManager schedules isr-expiration and isr-change-propagation recurring tasks (every half of replica.lag.time.max.ms property and 2500 ms, respectively).

ReplicaManager is a KafkaMetricsGroup and registers performance metrics.

Table 1. ReplicaManager’s Performance Metrics
Metric Name Description

FailedIsrUpdatesPerSec

IsrExpandsPerSec

IsrShrinksPerSec

LeaderCount

OfflineReplicaCount

PartitionCount

Number of allPartitions

UnderMinIsrPartitionCount

UnderReplicatedPartitions

underReplicatedPartitionCount

The performance metrics are registered in kafka.server:type=ReplicaManager group.

ReplicaManager jconsole.png
Figure 2. ReplicaManager in jconsole

ReplicaManager uses the MetadataCache for the following:

  • …​FIXME

Table 2. ReplicaManager’s Internal Properties (e.g. Registries and Counters)
Name Description

replicaFetcherManager

Created immediately with ReplicaManager

Used when:

allPartitions

Partitions by TopicPartition

Available as PartitionCount performance metric

highWatermarkCheckpoints

OffsetCheckpointFiles per live log data directory

stateChangeLogger

StateChangeLogger with the broker ID and inControllerContext flag off (that sets Broker log prefix)

isrChangeSet

Collection of TopicPartition that…​FIXME

lastIsrChangeMs

Time when isrChangeSet has a new TopicPartition added.

logDirFailureHandler

OfflinePartition

Tip

Enable TRACE logging level for kafka.server.ReplicaManager logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.server.ReplicaManager=TRACE

Refer to Logging.


Please note that ReplicaManager also uses a preconfigured state.change.logger logger in config/log4j.properties:

log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.state.change.logger=TRACE, stateChangeAppender
log4j.additivity.state.change.logger=false

That means that the logs of ReplicaManager go to logs/state-change.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

Creating ReplicaFetcherManager — createReplicaFetcherManager Internal Factory Method

createReplicaFetcherManager(
  metrics: Metrics
  time: Time
  threadNamePrefix: Option[String]
  quotaManager: ReplicationQuotaManager): ReplicaFetcherManager

createReplicaFetcherManager simply creates a ReplicaFetcherManager.

Note
createReplicaFetcherManager is used exclusively when ReplicaManager is created.

shutdown Method

shutdown(checkpointHW: Boolean = true): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

alterReplicaLogDirs Method

alterReplicaLogDirs(partitionDirs: Map[TopicPartition, String]): Map[TopicPartition, Errors]

alterReplicaLogDirs…​FIXME

Note
alterReplicaLogDirs is used exclusively when KafkaApis is requested to handle an AlterReplicaLogDirs request.

becomeLeaderOrFollower Method

becomeLeaderOrFollower(
  correlationId: Int,
  leaderAndIsrRequest: LeaderAndIsrRequest,
  onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse

becomeLeaderOrFollower…​FIXME

Note
becomeLeaderOrFollower is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.

makeFollowers Internal Method

makeFollowers(
  controllerId: Int,
  epoch: Int,
  partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
  correlationId: Int,
  responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition]

makeFollowers…​FIXME

Note
makeFollowers is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.

recordIsrChange Method

recordIsrChange(topicPartition: TopicPartition): Unit

recordIsrChange adds the input topicPartition to isrChangeSet internal registry and sets lastIsrChangeMs to the current time.

Note
recordIsrChange is used exclusively when Partition does updateIsr

updateFollowerLogReadResults Internal Method

updateFollowerLogReadResults(
  replicaId: Int,
  readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]

updateFollowerLogReadResults…​FIXME

Note
updateFollowerLogReadResults is used exclusively when ReplicaManager is requested to fetch messages from the leader replica.

fetchMessages Method

fetchMessages(
  timeout: Long,
  replicaId: Int,
  fetchMinBytes: Int,
  fetchMaxBytes: Int,
  hardMaxBytesLimit: Boolean,
  fetchInfos: Seq[(TopicPartition, FetchRequest.PartitionData)],
  quota: ReplicaQuota = UnboundedQuota,
  responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
  isolationLevel: IsolationLevel): Unit

fetchMessages…​FIXME

Note
fetchMessages is used exclusively when KafkaApis is requested to handles a Fetch request.

maybePropagateIsrChanges Method

maybePropagateIsrChanges(): Unit

maybePropagateIsrChanges…​FIXME

Note
maybePropagateIsrChanges is used exclusively when isr-change-propagation task is executed (every 2500 milliseconds).

Creating ReplicaManager Instance

ReplicaManager takes the following when created:

ReplicaManager initializes the internal registries and counters.

startup(): Unit

startup then creates a LogDirFailureHandler and requests it to start.

Note
startup uses Scheduler that was specified when ReplicaManager was created.
Note
startup is used exclusively when KafkaServer starts up.

maybeShrinkIsr Internal Method

maybeShrinkIsr(): Unit

maybeShrinkIsr prints out the following TRACE message to the logs:

TRACE Evaluating ISR list of partitions to see which replicas can be removed from the ISR

maybeShrinkIsr requests the partitions (from allPartitions pool that are not offline partitions) to maybeShrinkIsr (with replicaLagTimeMaxMs property).

Note
maybeShrinkIsr is used exclusively to schedule isr-expiration recurring task when ReplicaManager starts up.

makeLeaders Internal Method

makeLeaders(
  controllerId: Int,
  epoch: Int,
  partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
  correlationId: Int,
  responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition]

makeLeaders…​FIXME

Note
makeLeaders is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.

describeLogDirs Method

describeLogDirs(partitions: Set[TopicPartition]): Map[String, LogDirInfo]

describeLogDirs…​FIXME

Note
describeLogDirs is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.

Finding Log For TopicPartition — getLog Method

getLog(topicPartition: TopicPartition): Option[Log]

getLog…​FIXME

Note

getLog is used when:

startHighWaterMarksCheckPointThread Method

startHighWaterMarksCheckPointThread(): Unit

startHighWaterMarksCheckPointThread…​FIXME

Note
startHighWaterMarksCheckPointThread is used when…​FIXME

checkpointHighWatermarks Method

checkpointHighWatermarks(): Unit

checkpointHighWatermarks…​FIXME

Note
checkpointHighWatermarks is used when…​FIXME

shutdownIdleReplicaAlterLogDirsThread Method

shutdownIdleReplicaAlterLogDirsThread(): Unit

shutdownIdleReplicaAlterLogDirsThread…​FIXME

Note
shutdownIdleReplicaAlterLogDirsThread is used when…​FIXME

handleLogDirFailure Method

handleLogDirFailure(
  dir: String,
  sendZkNotification: Boolean = true): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used exclusively when LogDirFailureHandler is requested to do the work.

maybeUpdateMetadataCache Method

maybeUpdateMetadataCache(
  correlationId: Int,
  updateMetadataRequest: UpdateMetadataRequest) : Seq[TopicPartition]

maybeUpdateMetadataCache…​FIXME

Note
maybeUpdateMetadataCache is used exclusively when KafkaApis is requested to handle an UpdateMetadata request.

Appending Records — appendRecords Method

appendRecords(
  timeout: Long,
  requiredAcks: Short,
  internalTopicsAllowed: Boolean,
  isFromClient: Boolean,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
  delayedProduceLock: Option[Lock] = None,
  recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit

appendRecords…​FIXME

Note

appendRecords is used when:

appendToLocalLog Internal Method

appendToLocalLog(
  internalTopicsAllowed: Boolean,
  isFromClient: Boolean,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  requiredAcks: Short): Map[TopicPartition, LogAppendResult]

appendToLocalLog prints out the following TRACE message to the logs:

Append [[entriesPerPartition]] to local log

For every partition in the given entriesPerPartition, appendToLocalLog gets the partition (or throws an exception) (with expectLeader flag enabled) and then request the Partition to appendRecordsToLeader.

appendToLocalLog…​FIXME

In the end, appendToLocalLog prints out the following TRACE message to the logs:

[sizeInBytes] written to log [topicPartition] beginning at offset [firstOffset] and ending at offset [lastOffset]

In case of exceptions, appendToLocalLog…​FIXME

Note
appendToLocalLog is used exclusively when ReplicaManager is requested to append records.

Getting Partition Or Throwing Exception — getPartitionOrException Method

getPartitionOrException(
  topicPartition: TopicPartition,
  expectLeader: Boolean): Partition

getPartitionOrException gets the partition if available or throws one of the following exceptions:

  • KafkaStorageException when the partition is offline

    Partition [topicPartition] is in an offline log directory
  • NotLeaderForPartitionException

    Broker [localBrokerId] is not a replica of [topicPartition]
  • ReplicaNotAvailableException

    Partition [topicPartition] is not available
  • UnknownTopicOrPartitionException

    Partition [topicPartition] doesn't exist
Note
getPartitionOrException is used when…​FIXME

Getting Partition by TopicPartition (If Available) — getPartition Method

getPartition(topicPartition: TopicPartition): Option[Partition]

getPartition gets the partition for the given TopicPartition.

Note

getPartition is used when:

stopReplica Method

stopReplica(
  topicPartition: TopicPartition,
  deletePartition: Boolean): Unit

stopReplica…​FIXME

Note
stopReplica is used exclusively when ReplicaManager is requested to stopReplicas.

underReplicatedPartitionCount Method

underReplicatedPartitionCount: Int

underReplicatedPartitionCount…​FIXME

Note
underReplicatedPartitionCount is used exclusively for the UnderReplicatedPartitions performance metric.

leaderPartitionsIterator Internal Method

leaderPartitionsIterator: Iterator[Partition]

leaderPartitionsIterator…​FIXME

Note
leaderPartitionsIterator is used exclusively for the performance metrics: LeaderCount, UnderMinIsrPartitionCount, and UnderReplicatedPartitions (indirectly using underReplicatedPartitionCount).

nonOfflinePartitionsIterator Internal Method

nonOfflinePartitionsIterator: Iterator[Partition]

nonOfflinePartitionsIterator…​FIXME

Note
nonOfflinePartitionsIterator is used when ReplicaManager is requested to leaderPartitionsIterator, checkpointHighWatermarks, and handleLogDirFailure.

getOrCreatePartition Method

getOrCreatePartition(topicPartition: TopicPartition): Partition

getOrCreatePartition…​FIXME

Note
getOrCreatePartition is used exclusively when ReplicaManager is requested to becomeLeaderOrFollower.

offlinePartitionsIterator Internal Method

offlinePartitionsIterator: Iterator[Partition]

offlinePartitionsIterator…​FIXME

Note
offlinePartitionsIterator is used when…​FIXME

markPartitionOffline Method

markPartitionOffline(tp: TopicPartition): Unit

markPartitionOffline…​FIXME

Note
markPartitionOffline is used when…​FIXME

lastOffsetForLeaderEpoch Method

lastOffsetForLeaderEpoch(
  requestedEpochInfo: Map[TopicPartition, OffsetsForLeaderEpochRequest.PartitionData]
): Map[TopicPartition, EpochEndOffset]

lastOffsetForLeaderEpoch…​FIXME

Note
lastOffsetForLeaderEpoch is used when…​FIXME

nonOfflinePartition Method

nonOfflinePartition(topicPartition: TopicPartition): Option[Partition]

nonOfflinePartition…​FIXME

Note
nonOfflinePartition is used when…​FIXME

deleteRecords Method

deleteRecords(
  timeout: Long,
  offsetPerPartition: Map[TopicPartition, Long],
  responseCallback: Map[TopicPartition, DeleteRecordsResponse.PartitionResponse] => Unit): Unit

deleteRecords…​FIXME

Note
deleteRecords is used when…​FIXME

fetchOffsetForTimestamp Method

fetchOffsetForTimestamp(
  topicPartition: TopicPartition,
  timestamp: Long,
  isolationLevel: Option[IsolationLevel],
  currentLeaderEpoch: Optional[Integer],
  fetchOnlyFromLeader: Boolean): TimestampOffset

fetchOffsetForTimestamp…​FIXME

Note
fetchOffsetForTimestamp is used when…​FIXME

stopReplicas Method

stopReplicas(
  stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Errors], Errors)

stopReplicas…​FIXME

Note
stopReplicas is used exclusively when KafkaApis is requested to handle a StopReplica request.

getMagic Method

getMagic(topicPartition: TopicPartition): Option[Byte]

getMagic…​FIXME

Note
getMagic is used when…​FIXME

localReplicaOrException Method

localReplicaOrException(topicPartition: TopicPartition): Replica

localReplicaOrException finds the partition (or throws an exception) for the given TopicPartition (and expectLeader flag off) and requests the Partition to get the local partition replica (or throw an exception).

Note
A partition replica is local when the replica ID is exactly the local broker ID.
Note
localReplicaOrException is used when…​FIXME

shouldLeaderThrottle Method

shouldLeaderThrottle(
  quota: ReplicaQuota,
  topicPartition: TopicPartition,
  replicaId: Int): Boolean

shouldLeaderThrottle…​FIXME

Note
shouldLeaderThrottle is used when…​FIXME

results matching ""

    No results matching ""