Partition

Partition is an internal representation of a TopicPartition for ReplicaManager (to manage all partitions).

Partition knows the host broker (by local broker ID) which is one of the partition replicas (replicated across brokers of a Kafka cluster).

Partition can be the leader of the TopicPartition or a follower.

Partition can be elected the partition leader (when ReplicaManager is requested to makeLeaders).

Partition is under-replicated if it is the leader and the number of in-sync replicas is below the number of all replicas. Use the UnderReplicated metric to know it (or UnderReplicatedPartitions metric of the ReplicaManager).

Partition uses the remoteReplicasMap internal registry to track remote replicas (and is requested to update partition replicas and the in-sync replicas when…​FIXME).

A Kafka topic is spread across a Kafka cluster as a logical group of one or more partitions.

Kafka producers publish messages to topic leaders as do Kafka consumers consume them from.

In-Sync Replicas (ISR) are brokers that…​FIXME

Offline Replicas are…​FIXME

Partition is created when ReplicaManager is requested to add a TopicPartition to allPartitions (indirectly using apply) and for the OfflinePartition.

Partition uses [Partition [topicPartition] broker=[localBrokerId]] as the logging prefix (aka logIdent).

Tip

Enable ALL logging level for kafka.cluster.Partition logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.cluster.Partition=ALL

Refer to Logging.

Performance Metrics

Partition is a KafkaMetricsGroup with the following performance metrics.

Table 1. Partition’s Performance Metrics
Metric Name Description

InSyncReplicasCount

One of the two possible values:

LastStableOffsetLag

ReplicasCount

One of the two possible values:

UnderMinIsr

One of the two possible values:

UnderReplicated

One of the two possible values:

The performance metrics are registered in kafka.cluster:type=Partition group (only when the partition is not offline).

Partition jconsole.png
Figure 1. Partition Metrics in JConsole JMX Tool

Creating Partition Instance

Partition takes the following to be created:

Partition initializes the internal properties.

Partition and Partition Log — log Registry

log: Option[Log] = None

log is a partition log (of the partition).

log is assigned a Log when Partition is requested to createLogIfNotExists and maybeReplaceCurrentWithFutureReplica.

log is available until Partition is requested to delete.

log is used when:

Partition and PartitionStateStore

When created, a Partition is given a PartitionStateStore (that is a ZkPartitionStateStore by default).

PartitionStateStore is used when Partition is requested for the following:

Creating Partition — apply Utility

apply(
  topicPartition: TopicPartition,
  time: Time,
  replicaManager: ReplicaManager): Partition

apply creates a new Partition for the given TopicPartition, Time, and ReplicaManager with the following:

Note
apply is used when ReplicaManager is requested to add a partition to allPartitions Registry.

Read and Write Locks

leaderIsrUpdateLock: ReentrantReadWriteLock

leaderIsrUpdateLock is a Java’s ReentrantReadWriteLock for a pair of locks, one for read-only operations and one for writing. (The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive).

A read lock (inReadLock) is used for the following:

A write lock (inWriteLock) is used for the following:

maybeExpandIsr Method

maybeExpandIsr(
  replicaId: Int,
  logReadResult: LogReadResult): Boolean

maybeExpandIsr…​FIXME

Note
maybeExpandIsr is used when Partition is requested to updateFollowerFetchState.

updateFollowerFetchState Method

updateFollowerFetchState(
  followerId: Int,
  followerFetchOffsetMetadata: LogOffsetMetadata,
  followerStartOffset: Long,
  followerFetchTimeMs: Long,
  leaderEndOffset: Long,
  lastSentHighwatermark: Long): Boolean

updateFollowerFetchState…​FIXME

Note
updateFollowerFetchState is used when ReplicaManager is requested to updateFollowerFetchState.

maybeShrinkIsr Method

maybeShrinkIsr(
  replicaMaxLagTimeMs: Long): Unit

maybeShrinkIsr…​

Note
maybeShrinkIsr is used when ReplicaManager is requested to maybeShrinkIsr.

updateReplicaLogReadResult Method

updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean

updateReplicaLogReadResult…​FIXME

Note
updateReplicaLogReadResult is used exclusively when ReplicaManager updateFollowerLogReadResults.

updateIsr Internal Method

updateIsr(newIsr: Set[Replica]): Unit

updateIsr…​FIXME

Note
updateIsr is used when Partition is requested to expand or shrink the ISR.

makeFollower Method

makeFollower(
  controllerId: Int,
  partitionStateInfo: LeaderAndIsrRequest.PartitionState,
  correlationId: Int): Boolean

makeFollower…​FIXME

Note
makeFollower is used exclusively when ReplicaManager is requested to makeFollowers.

leaderReplicaIfLocal Method

leaderReplicaIfLocal: Option[Replica]

leaderReplicaIfLocal returns a Replica when the leaderReplicaIdOpt is the localBrokerId. Otherwise, leaderReplicaIfLocal returns None (i.e. undefined).

Note
leaderReplicaIfLocal is used…​FIXME

isUnderMinIsr Predicate

isUnderMinIsr: Boolean

isUnderMinIsr is true only if the partition isLeaderReplicaLocal and the number of in-sync replicas is below the min.insync.replicas configuration property (as configured for the Log of the leader replica).

Note
isUnderMinIsr is used when…​FIXME

checkEnoughReplicasReachOffset Method

checkEnoughReplicasReachOffset(
  requiredOffset: Long): (Boolean, Errors)

checkEnoughReplicasReachOffset…​FIXME

Note
checkEnoughReplicasReachOffset is used when…​FIXME

Electing Local Partition Replica as Partition Leader — makeLeader Method

makeLeader(
  controllerId: Int,
  partitionState: LeaderAndIsrPartitionState,
  correlationId: Int,
  highWatermarkCheckpoints: OffsetCheckpoints): Boolean

makeLeader returns true if this broker has just been elected as the leader for the partition.

Note
makeLeader could be executed for a broker that is the partition leader already.

Internally, makeLeader starts by acquiring write lock (which makes it a single-thread-exclusive operation).

makeLeader changes the internal controller epoch based on the given LeaderAndIsrPartitionState.

makeLeader requests the given LeaderAndIsrPartitionState for the partition replicas (their broker IDs) and the ISR (as a list of broker IDs) and updates internal registries for replica assignments and in-sync replicas.

makeLeader creates a Log (unless available already) for the local broker ID and the given OffsetCheckpoints (with isFutureReplica flag off).

makeLeader requests the leader log for the logEndOffset.

makeLeader prints out the following INFO message to the logs:

[topicPartition] starts at Leader Epoch [leaderEpoch] from offset [leaderEpochStartOffset]. Previous Leader Epoch was: [leaderEpoch]

makeLeader updates the internal registries: leaderEpoch, leaderEpochStartOffsetOpt and zkVersion.

makeLeader requests the leader log to maybeAssignEpochStartOffset with the current leaderEpoch and the log end offset (that is now considered the leader epoch’s start offset).

makeLeader requests the remoteReplicas to resetLastCaughtUpTime. For replicas in inSyncReplicaIds, the last caught-up time is the current time while for the others it is 0.

When the partition has just been elected a new leader, makeLeader updates the leaderReplicaIdOpt internal registry and requests the remoteReplicas to updateFetchState.

makeLeader checks if increment the high watermark with the leader log and, if incremented, tryCompleteDelayedRequests.

Note
makeLeader is used when ReplicaManager is requested to makeLeaders.

Updating Internal Registries for Replica Assignments and In-Sync Replicas — updateAssignmentAndIsr Method

updateAssignmentAndIsr(
  assignment: Seq[Int],
  isr: Set[Int]): Unit

updateAssignmentAndIsr uses the remoteReplicasMap internal registry and the given assignment to find the broker IDs that are no longer partition replicas (of the partition).

updateAssignmentAndIsr creates new partition replicas for the broker IDs that have not been registered in the remoteReplicasMap internal registry before (based on the given assignment).

updateAssignmentAndIsr updates the allReplicaIds and the inSyncReplicaIds internal registries with the given assignment and isr, respectively.

Note
updateAssignmentAndIsr is used when Partition is requested to make the local replica the leader or a follower.

Looking Up or Creating Replica — getOrCreateReplica Method

getOrCreateReplica(
  replicaId: Int,
  isNew: Boolean = false): Replica

getOrCreateReplica simply looks up the Replica in the allReplicasMap internal registry (by the given replicaId).

If not found, getOrCreateReplica…​FIXME

Note

getOrCreateReplica is used when:

maybeCreateFutureReplica Method

maybeCreateFutureReplica(logDir: String): Boolean

maybeCreateFutureReplica…​FIXME

Note
maybeCreateFutureReplica is used exclusively when ReplicaManager is requested to alterReplicaLogDirs.

appendRecordsToLeader Method

appendRecordsToLeader(
  records: MemoryRecords,
  isFromClient: Boolean,
  requiredAcks: Int = 0): LogAppendInfo

appendRecordsToLeader basically requests the Log (of the leader Replica) to appendAsLeader.

Internally, appendRecordsToLeader…​FIXME

Note

appendRecordsToLeader is used when:

doAppendRecordsToFollowerOrFutureReplica Internal Method

doAppendRecordsToFollowerOrFutureReplica(
  records: MemoryRecords,
  isFuture: Boolean): Option[LogAppendInfo]

doAppendRecordsToFollowerOrFutureReplica…​FIXME

Note
doAppendRecordsToFollowerOrFutureReplica is used exclusively when Partition is requested to appendRecordsToFollowerOrFutureReplica.

appendRecordsToFollowerOrFutureReplica Method

appendRecordsToFollowerOrFutureReplica(
  records: MemoryRecords,
  isFuture: Boolean): Option[LogAppendInfo]

appendRecordsToFollowerOrFutureReplica…​FIXME

Note

appendRecordsToFollowerOrFutureReplica is used when:

truncateTo Method

truncateTo(
  offset: Long,
  isFuture: Boolean): Unit

truncateTo…​FIXME

Note

truncateTo is used when:

  • ReplicaAlterLogDirsThread is requested to truncate

  • ReplicaFetcherThread is requested to truncate

truncateFullyAndStartAt Method

truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit

truncateFullyAndStartAt…​FIXME

Note

truncateFullyAndStartAt is used when:

maybeReplaceCurrentWithFutureReplica Method

maybeReplaceCurrentWithFutureReplica(): Boolean

maybeReplaceCurrentWithFutureReplica…​FIXME

Note
maybeReplaceCurrentWithFutureReplica is used exclusively when ReplicaAlterLogDirsThread is requested to processPartitionData.

delete Method

delete(): Unit

delete…​FIXME

Note
delete is used exclusively when ReplicaManager is requested to stopReplica.

removeFutureLocalReplica Method

removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit

removeFutureLocalReplica…​FIXME

Note
removeFutureLocalReplica is used when ReplicaManager is requested to alterReplicaLogDirs and handleLogDirFailure.

isLeaderReplicaLocal Internal Method

isLeaderReplicaLocal: Boolean

isLeaderReplicaLocal is positive (true) when the optional Replica is defined. Otherwise, false.

Note
isLeaderReplicaLocal is used when ReplicaManager is requested for the performance metrics (InSyncReplicasCount and ReplicasCount), isUnderReplicated, and lowWatermarkIfLeader.

localReplicaOrException Method

localReplicaOrException: Replica

localReplicaOrException localReplica and returns the local replica if available. Otherwise, localReplicaOrException throws a ReplicaNotAvailableException:

Replica for partition [topicPartition] is not available on broker [localBrokerId]

localReplica Method

localReplica: Option[Replica]

localReplica simply gets the partition replica for the local broker ID.

Note

localReplica is used when:

getReplica Method

getReplica(replicaId: Int): Option[Replica]

getReplica returns the replica by the given replicaId (in the allReplicasMap registry) or None.

Note

getReplica is used when:

addReplicaIfNotExists Method

addReplicaIfNotExists(replica: Replica): Replica

addReplicaIfNotExists…​FIXME

Note
addReplicaIfNotExists is used when…​FIXME

assignedReplicas Method

assignedReplicas: Set[Replica]

assignedReplicas…​FIXME

Note
assignedReplicas is used when…​FIXME

allReplicas Method

allReplicas: Set[Replica]

allReplicas…​FIXME

Note
allReplicas is used when…​FIXME

removeReplica Internal Method

removeReplica(replicaId: Int): Unit

removeReplica…​FIXME

Note
removeReplica is used when…​FIXME

String (Textual) Representation — toString Method

toString(): String
Note
toString is part of the java.lang.Object Contract for a string representation of the object.

toString…​FIXME

readRecords Method

readRecords(
  fetchOffset: Long,
  currentLeaderEpoch: Optional[Integer],
  maxBytes: Int,
  fetchIsolation: FetchIsolation,
  fetchOnlyFromLeader: Boolean,
  minOneMessage: Boolean): LogReadInfo

readRecords…​FIXME

Note
readRecords is used when…​FIXME

deleteRecordsOnLeader Method

deleteRecordsOnLeader(
  offset: Long): LogDeleteRecordsResult

deleteRecordsOnLeader…​FIXME

Note
deleteRecordsOnLeader is used when…​FIXME

Creating Log or Future Log (Unless Available Already) — createLogIfNotExists Method

createLogIfNotExists(
  replicaId: Int,
  isNew: Boolean,
  isFutureReplica: Boolean,
  offsetCheckpoints: OffsetCheckpoints): Unit

createLogIfNotExists branches off per the given isFutureReplica flag:

  • When enabled (true) and the futureLog internal registry is undefined, createLogIfNotExists createLog and saves it in the futureLog internal registry

  • When disabled (false) and the log internal registry is undefined, createLogIfNotExists createLog and saves it in the log internal registry

For all other cases, createLogIfNotExists simply prints out the following TRACE message (with the Future prefix for isFutureReplica flag enabled):

[Future] Log already exists.
Note

createLogIfNotExists is used when:

getOutOfSyncReplicas Method

getOutOfSyncReplicas(
  maxLagMs: Long): Set[Int]

getOutOfSyncReplicas requests the Log for the logEndOffset.

maybeShrinkIsr isFollowerOutOfSync for every inSyncReplicaIds (without the local broker ID)

Note
getOutOfSyncReplicas is used when Partition is requested to maybeShrinkIsr.

localLogOrException Method

localLogOrException: Log

localLogOrException gives the Log if defined or throws a ReplicaNotAvailableException:

Log for partition [topicPartition] is not available on broker [localBrokerId]
Note
localLogOrException is used when…​FIXME

leaderLogIfLocal Method

leaderLogIfLocal: Option[Log]

leaderLogIfLocal gives the Log only if the local broker is the leader.

Note
leaderLogIfLocal is used when…​FIXME

Checking If Broker is Partition Leader — isLeader Method

isLeader: Boolean

isLeader is positive (true) when the leaderReplicaIdOpt is the local broker ID.

Note

isLeader is used when:

lastOffsetForLeaderEpoch Method

lastOffsetForLeaderEpoch(
  currentLeaderEpoch: Optional[Integer],
  leaderEpoch: Int,
  fetchOnlyFromLeader: Boolean): EpochEndOffset

lastOffsetForLeaderEpoch…​FIXME

Note
lastOffsetForLeaderEpoch is used when…​FIXME

tryCompleteDelayedRequests Internal Method

tryCompleteDelayedRequests(): Unit

tryCompleteDelayedRequests…​FIXME

Note
tryCompleteDelayedRequests is used when Partition is requested to makeLeader, updateReplicaLogReadResult, maybeShrinkIsr, and appendRecordsToLeader (when leaderHWIncremented).

createLog Internal Method

createLog(
  replicaId: Int,
  isNew: Boolean,
  isFutureReplica: Boolean,
  offsetCheckpoints: OffsetCheckpoints): Log

createLog requests the LogManager to initializingLog for the TopicPartition.

createLog requests the LogManager to look up or create a new partition log for the TopicPartition. The LogConfig passed in is created by requesting the PartitionStateStore for the fetchTopicConfig to override the currentDefaultConfig of the LogManager.

createLog requests the given OffsetCheckpoints to fetch the checkpointed high watermark for the TopicPartition. Unless found, createLog prints out the following INFO message to the logs and assumes 0:

No checkpointed highwatermark is found for partition [topicPartition]

createLog requests the Log to update the high watermark.

createLog prints out the following INFO message to the logs:

Log loaded for partition [topicPartition] with initial high watermark [initialHighWatermark]

In the end, createLog requests the LogManager to finishedInitializingLog for the TopicPartition.

Note
createLog is used when Partition is requested to create a log or future log (unless available already).

expandIsr Internal Method

expandIsr(
  newIsr: Set[Int]): Unit

expandIsr…​FIXME

Note
expandIsr is used when Partition is requested to maybeExpandIsr.

shrinkIsr Internal Method

shrinkIsr(
  newIsr: Set[Int]): Unit

shrinkIsr…​FIXME

Note
shrinkIsr is used when Partition is requested to maybeShrinkIsr.

isFollowerOutOfSync Internal Method

isFollowerOutOfSync(
  replicaId: Int,
  leaderEndOffset: Long,
  currentTimeMs: Long,
  maxLagMs: Long): Boolean

isFollowerOutOfSync gets the follower replica for the given replicaId (if available or throws an exception).

isFollowerOutOfSync is positive (true) when the following hold:

  1. logEndOffset of the follower replica is different than the given leaderEndOffset

  2. Time since the lastCaughtUpTimeMs of the follower replica is greater than the given maxLagMs (i.e. replica.lag.time.max.ms configuration property).

Note
isFollowerOutOfSync is used when Partition is requested to getOutOfSyncReplicas.

getReplicaOrException Internal Method

getReplicaOrException(
  replicaId: Int): Replica

getReplicaOrException finds the replica for the given replica ID or throws a ReplicaNotAvailableException:

Replica with id [replicaId] is not available on broker [localBrokerId]
Note
getReplicaOrException is used when Partition is requested to checkEnoughReplicasReachOffset, maybeShrinkIsr, and isFollowerOutOfSync.

getLocalLog Internal Method

getLocalLog(
  currentLeaderEpoch: Optional[Integer],
  requireLeader: Boolean): Either[Log, Errors]

getLocalLog checkCurrentLeaderEpoch for the given currentLeaderEpoch and branches off per the result:

  • For no errors (Errors.NONE), if the requireLeader flag is on (true), but the broker is not the leader, getLocalLog returns Errors.NOT_LEADER_FOR_PARTITION.

  • For no errors (Errors.NONE), if the requireLeader flag is off (false) or the broker is the leader, getLocalLog branches off per the optional log. getLocalLog gives the log if defined or the errors:

    • Errors.NOT_LEADER_FOR_PARTITION when the requireLeader flag is on (true)

    • Errors.REPLICA_NOT_AVAILABLE when the requireLeader flag is off (false)

  • For any error, getLocalLog simply returns it

Note
getLocalLog is used when Partition is requested to localLogWithEpochOrException and lastOffsetForLeaderEpoch.

checkCurrentLeaderEpoch Internal Method

checkCurrentLeaderEpoch(
  remoteLeaderEpochOpt: Optional[Integer]): Errors

checkCurrentLeaderEpoch…​FIXME

Note
checkCurrentLeaderEpoch is used when Partition is requested to getLocalLog.

localLogWithEpochOrException Internal Method

localLogWithEpochOrException(
  currentLeaderEpoch: Optional[Integer],
  requireLeader: Boolean): Log

localLogWithEpochOrException…​FIXME

Note
localLogWithEpochOrException is used when Partition is requested to…​FIXME

isFollowerInSync Internal Method

isFollowerInSync(
  followerReplica: Replica,
  highWatermark: Long): Boolean

isFollowerInSync…​FIXME

Note
isFollowerInSync is used when Partition is requested to maybeExpandIsr.

maybeUpdateIsrAndVersion Internal Method

maybeUpdateIsrAndVersion(
  isr: Set[Int],
  zkVersionOpt: Option[Int]): Unit

maybeUpdateIsrAndVersion…​FIXME

Note
maybeUpdateIsrAndVersion is used when Partition is requested to expandIsr and shrinkIsr.

Internal Properties

Name Description

controllerEpoch

Controller epoch

Starts as 0 and changes when Partition is requested to make the local replica the leader or a follower.

Used for expandIsr and shrinkIsr

inSyncReplicaIds

In-sync replicas of the topic partition, i.e. broker IDs that are known to be in-sync with the leader

Note
inSyncReplicaIds contains in-sync replicas only when updateAssignmentAndIsr is executed when the current partition replica is the leader (and empty when a follower or deleted).

leaderReplicaIdOpt

Broker ID of the broker that manages the leader replica

Default: None (undefined)

allReplicaIds

All replica broker IDs that were assigned to this topic partition

allReplicasMap

futureLog

remoteReplicasMap

Remote replicas by broker ID (Pool[Int, Replica]) that is updated when updateAssignmentAndIsr

remoteReplicasMap contains broker IDs of the partition replicas (of the partition) without the local broker ID.

All of the pairs are removed when delete

results matching ""

    No results matching ""