log4j.logger.kafka.cluster.Partition=ALL
Partition
Partition is an internal representation of a TopicPartition for ReplicaManager (to manage all partitions).
Partition knows the host broker (by local broker ID) which is one of the partition replicas (replicated across brokers of a Kafka cluster).
Partition can be the leader of the TopicPartition or a follower.
Partition can be elected the partition leader (when ReplicaManager is requested to makeLeaders).
Partition is under-replicated if it is the leader and the number of in-sync replicas is below the number of all replicas. Use the UnderReplicated metric to know it (or UnderReplicatedPartitions metric of the ReplicaManager).
Partition uses the remoteReplicasMap internal registry to track remote replicas (and is requested to update partition replicas and the in-sync replicas when…FIXME).
A Kafka topic is spread across a Kafka cluster as a logical group of one or more partitions.
Kafka producers publish messages to topic leaders as do Kafka consumers consume them from.
In-Sync Replicas (ISR) are brokers that…FIXME
Offline Replicas are…FIXME
Partition is created when ReplicaManager is requested to add a TopicPartition to allPartitions (indirectly using apply) and for the OfflinePartition.
Partition uses [Partition [topicPartition] broker=[localBrokerId]] as the logging prefix (aka logIdent).
|
Tip
|
Enable Add the following line to Refer to Logging. |
Performance Metrics
Partition is a KafkaMetricsGroup with the following performance metrics.
| Metric Name | Description |
|---|---|
|
One of the two possible values:
|
|
|
|
One of the two possible values:
|
|
One of the two possible values:
|
|
One of the two possible values:
|
The performance metrics are registered in kafka.cluster:type=Partition group (only when the partition is not offline).
Creating Partition Instance
Partition takes the following to be created:
Partition initializes the internal properties.
Partition and Partition Log — log Registry
log: Option[Log] = None
log is a partition log (of the partition).
log is assigned a Log when Partition is requested to createLogIfNotExists and maybeReplaceCurrentWithFutureReplica.
log is available until Partition is requested to delete.
log is used when:
-
Partitionis requested for the LastStableOffsetLag metric, getLocalLog, leaderLogIfLocal, localLogOrException, maybeReplaceCurrentWithFutureReplica -
ReplicaManageris requested to localLog, becomeLeaderOrFollower, checkpointHighWatermarks and handleLogDirFailure.
Partition and PartitionStateStore
When created, a Partition is given a PartitionStateStore (that is a ZkPartitionStateStore by default).
PartitionStateStore is used when Partition is requested for the following:
Creating Partition — apply Utility
apply(
topicPartition: TopicPartition,
time: Time,
replicaManager: ReplicaManager): Partition
apply creates a new Partition for the given TopicPartition, Time, and ReplicaManager with the following:
-
Creates a new
ZkPartitionStateStorefor the givenTopicPartitionand ReplicaManager -
Creates a new
DelayedOperationsfor the givenTopicPartitionand ReplicaManager -
Uses replica.lag.time.max.ms and inter.broker.protocol.version configuration properties.
|
Note
|
apply is used when ReplicaManager is requested to add a partition to allPartitions Registry.
|
Read and Write Locks
leaderIsrUpdateLock: ReentrantReadWriteLock
leaderIsrUpdateLock is a Java’s ReentrantReadWriteLock for a pair of locks, one for read-only operations and one for writing. (The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive).
A read lock (inReadLock) is used for the following:
A write lock (inWriteLock) is used for the following:
maybeExpandIsr Method
maybeExpandIsr(
replicaId: Int,
logReadResult: LogReadResult): Boolean
maybeExpandIsr…FIXME
|
Note
|
maybeExpandIsr is used when Partition is requested to updateFollowerFetchState.
|
updateFollowerFetchState Method
updateFollowerFetchState(
followerId: Int,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
lastSentHighwatermark: Long): Boolean
updateFollowerFetchState…FIXME
|
Note
|
updateFollowerFetchState is used when ReplicaManager is requested to updateFollowerFetchState.
|
maybeShrinkIsr Method
maybeShrinkIsr(
replicaMaxLagTimeMs: Long): Unit
maybeShrinkIsr…
|
Note
|
maybeShrinkIsr is used when ReplicaManager is requested to maybeShrinkIsr.
|
updateReplicaLogReadResult Method
updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean
updateReplicaLogReadResult…FIXME
|
Note
|
updateReplicaLogReadResult is used exclusively when ReplicaManager updateFollowerLogReadResults.
|
makeFollower Method
makeFollower(
controllerId: Int,
partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean
makeFollower…FIXME
|
Note
|
makeFollower is used exclusively when ReplicaManager is requested to makeFollowers.
|
leaderReplicaIfLocal Method
leaderReplicaIfLocal: Option[Replica]
leaderReplicaIfLocal returns a Replica when the leaderReplicaIdOpt is the localBrokerId. Otherwise, leaderReplicaIfLocal returns None (i.e. undefined).
|
Note
|
leaderReplicaIfLocal is used…FIXME
|
isUnderMinIsr Predicate
isUnderMinIsr: Boolean
isUnderMinIsr is true only if the partition isLeaderReplicaLocal and the number of in-sync replicas is below the min.insync.replicas configuration property (as configured for the Log of the leader replica).
|
Note
|
isUnderMinIsr is used when…FIXME
|
checkEnoughReplicasReachOffset Method
checkEnoughReplicasReachOffset(
requiredOffset: Long): (Boolean, Errors)
checkEnoughReplicasReachOffset…FIXME
|
Note
|
checkEnoughReplicasReachOffset is used when…FIXME
|
Electing Local Partition Replica as Partition Leader — makeLeader Method
makeLeader(
controllerId: Int,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean
makeLeader returns true if this broker has just been elected as the leader for the partition.
|
Note
|
makeLeader could be executed for a broker that is the partition leader already.
|
Internally, makeLeader starts by acquiring write lock (which makes it a single-thread-exclusive operation).
makeLeader changes the internal controller epoch based on the given LeaderAndIsrPartitionState.
makeLeader requests the given LeaderAndIsrPartitionState for the partition replicas (their broker IDs) and the ISR (as a list of broker IDs) and updates internal registries for replica assignments and in-sync replicas.
makeLeader creates a Log (unless available already) for the local broker ID and the given OffsetCheckpoints (with isFutureReplica flag off).
makeLeader requests the leader log for the logEndOffset.
makeLeader prints out the following INFO message to the logs:
[topicPartition] starts at Leader Epoch [leaderEpoch] from offset [leaderEpochStartOffset]. Previous Leader Epoch was: [leaderEpoch]
makeLeader updates the internal registries: leaderEpoch, leaderEpochStartOffsetOpt and zkVersion.
makeLeader requests the leader log to maybeAssignEpochStartOffset with the current leaderEpoch and the log end offset (that is now considered the leader epoch’s start offset).
makeLeader requests the remoteReplicas to resetLastCaughtUpTime. For replicas in inSyncReplicaIds, the last caught-up time is the current time while for the others it is 0.
When the partition has just been elected a new leader, makeLeader updates the leaderReplicaIdOpt internal registry and requests the remoteReplicas to updateFetchState.
makeLeader checks if increment the high watermark with the leader log and, if incremented, tryCompleteDelayedRequests.
|
Note
|
makeLeader is used when ReplicaManager is requested to makeLeaders.
|
Updating Internal Registries for Replica Assignments and In-Sync Replicas — updateAssignmentAndIsr Method
updateAssignmentAndIsr(
assignment: Seq[Int],
isr: Set[Int]): Unit
updateAssignmentAndIsr uses the remoteReplicasMap internal registry and the given assignment to find the broker IDs that are no longer partition replicas (of the partition).
updateAssignmentAndIsr creates new partition replicas for the broker IDs that have not been registered in the remoteReplicasMap internal registry before (based on the given assignment).
updateAssignmentAndIsr updates the allReplicaIds and the inSyncReplicaIds internal registries with the given assignment and isr, respectively.
Looking Up or Creating Replica — getOrCreateReplica Method
getOrCreateReplica(
replicaId: Int,
isNew: Boolean = false): Replica
getOrCreateReplica simply looks up the Replica in the allReplicasMap internal registry (by the given replicaId).
If not found, getOrCreateReplica…FIXME
|
Note
|
|
maybeCreateFutureReplica Method
maybeCreateFutureReplica(logDir: String): Boolean
maybeCreateFutureReplica…FIXME
|
Note
|
maybeCreateFutureReplica is used exclusively when ReplicaManager is requested to alterReplicaLogDirs.
|
appendRecordsToLeader Method
appendRecordsToLeader(
records: MemoryRecords,
isFromClient: Boolean,
requiredAcks: Int = 0): LogAppendInfo
appendRecordsToLeader basically requests the Log (of the leader Replica) to appendAsLeader.
Internally, appendRecordsToLeader…FIXME
|
Note
|
|
doAppendRecordsToFollowerOrFutureReplica Internal Method
doAppendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo]
doAppendRecordsToFollowerOrFutureReplica…FIXME
|
Note
|
doAppendRecordsToFollowerOrFutureReplica is used exclusively when Partition is requested to appendRecordsToFollowerOrFutureReplica.
|
appendRecordsToFollowerOrFutureReplica Method
appendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo]
appendRecordsToFollowerOrFutureReplica…FIXME
|
Note
|
|
truncateFullyAndStartAt Method
truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit
truncateFullyAndStartAt…FIXME
|
Note
|
|
maybeReplaceCurrentWithFutureReplica Method
maybeReplaceCurrentWithFutureReplica(): Boolean
maybeReplaceCurrentWithFutureReplica…FIXME
|
Note
|
maybeReplaceCurrentWithFutureReplica is used exclusively when ReplicaAlterLogDirsThread is requested to processPartitionData.
|
delete Method
delete(): Unit
delete…FIXME
|
Note
|
delete is used exclusively when ReplicaManager is requested to stopReplica.
|
removeFutureLocalReplica Method
removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit
removeFutureLocalReplica…FIXME
|
Note
|
removeFutureLocalReplica is used when ReplicaManager is requested to alterReplicaLogDirs and handleLogDirFailure.
|
isLeaderReplicaLocal Internal Method
isLeaderReplicaLocal: Boolean
isLeaderReplicaLocal is positive (true) when the optional Replica is defined. Otherwise, false.
|
Note
|
isLeaderReplicaLocal is used when ReplicaManager is requested for the performance metrics (InSyncReplicasCount and ReplicasCount), isUnderReplicated, and lowWatermarkIfLeader.
|
localReplicaOrException Method
localReplicaOrException: Replica
localReplicaOrException localReplica and returns the local replica if available. Otherwise, localReplicaOrException throws a ReplicaNotAvailableException:
Replica for partition [topicPartition] is not available on broker [localBrokerId]
|
Note
|
|
localReplica Method
localReplica: Option[Replica]
localReplica simply gets the partition replica for the local broker ID.
|
Note
|
|
getReplica Method
getReplica(replicaId: Int): Option[Replica]
getReplica returns the replica by the given replicaId (in the allReplicasMap registry) or None.
|
Note
|
|
addReplicaIfNotExists Method
addReplicaIfNotExists(replica: Replica): Replica
addReplicaIfNotExists…FIXME
|
Note
|
addReplicaIfNotExists is used when…FIXME
|
assignedReplicas Method
assignedReplicas: Set[Replica]
assignedReplicas…FIXME
|
Note
|
assignedReplicas is used when…FIXME
|
allReplicas Method
allReplicas: Set[Replica]
allReplicas…FIXME
|
Note
|
allReplicas is used when…FIXME
|
removeReplica Internal Method
removeReplica(replicaId: Int): Unit
removeReplica…FIXME
|
Note
|
removeReplica is used when…FIXME
|
String (Textual) Representation — toString Method
toString(): String
|
Note
|
toString is part of the java.lang.Object Contract for a string representation of the object.
|
toString…FIXME
readRecords Method
readRecords(
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
fetchOnlyFromLeader: Boolean,
minOneMessage: Boolean): LogReadInfo
readRecords…FIXME
|
Note
|
readRecords is used when…FIXME
|
deleteRecordsOnLeader Method
deleteRecordsOnLeader(
offset: Long): LogDeleteRecordsResult
deleteRecordsOnLeader…FIXME
|
Note
|
deleteRecordsOnLeader is used when…FIXME
|
Creating Log or Future Log (Unless Available Already) — createLogIfNotExists Method
createLogIfNotExists(
replicaId: Int,
isNew: Boolean,
isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Unit
createLogIfNotExists branches off per the given isFutureReplica flag:
For all other cases, createLogIfNotExists simply prints out the following TRACE message (with the Future prefix for isFutureReplica flag enabled):
[Future] Log already exists.
|
Note
|
|
getOutOfSyncReplicas Method
getOutOfSyncReplicas(
maxLagMs: Long): Set[Int]
getOutOfSyncReplicas requests the Log for the logEndOffset.
maybeShrinkIsr isFollowerOutOfSync for every inSyncReplicaIds (without the local broker ID)
|
Note
|
getOutOfSyncReplicas is used when Partition is requested to maybeShrinkIsr.
|
localLogOrException Method
localLogOrException: Log
localLogOrException gives the Log if defined or throws a ReplicaNotAvailableException:
Log for partition [topicPartition] is not available on broker [localBrokerId]
|
Note
|
localLogOrException is used when…FIXME
|
Checking If Broker is Partition Leader — isLeader Method
isLeader: Boolean
isLeader is positive (true) when the leaderReplicaIdOpt is the local broker ID.
|
Note
|
|
lastOffsetForLeaderEpoch Method
lastOffsetForLeaderEpoch(
currentLeaderEpoch: Optional[Integer],
leaderEpoch: Int,
fetchOnlyFromLeader: Boolean): EpochEndOffset
lastOffsetForLeaderEpoch…FIXME
|
Note
|
lastOffsetForLeaderEpoch is used when…FIXME
|
tryCompleteDelayedRequests Internal Method
tryCompleteDelayedRequests(): Unit
tryCompleteDelayedRequests…FIXME
|
Note
|
tryCompleteDelayedRequests is used when Partition is requested to makeLeader, updateReplicaLogReadResult, maybeShrinkIsr, and appendRecordsToLeader (when leaderHWIncremented).
|
createLog Internal Method
createLog(
replicaId: Int,
isNew: Boolean,
isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Log
createLog requests the LogManager to initializingLog for the TopicPartition.
createLog requests the LogManager to look up or create a new partition log for the TopicPartition. The LogConfig passed in is created by requesting the PartitionStateStore for the fetchTopicConfig to override the currentDefaultConfig of the LogManager.
createLog requests the given OffsetCheckpoints to fetch the checkpointed high watermark for the TopicPartition. Unless found, createLog prints out the following INFO message to the logs and assumes 0:
No checkpointed highwatermark is found for partition [topicPartition]
createLog requests the Log to update the high watermark.
createLog prints out the following INFO message to the logs:
Log loaded for partition [topicPartition] with initial high watermark [initialHighWatermark]
In the end, createLog requests the LogManager to finishedInitializingLog for the TopicPartition.
|
Note
|
createLog is used when Partition is requested to create a log or future log (unless available already).
|
expandIsr Internal Method
expandIsr(
newIsr: Set[Int]): Unit
expandIsr…FIXME
|
Note
|
expandIsr is used when Partition is requested to maybeExpandIsr.
|
shrinkIsr Internal Method
shrinkIsr(
newIsr: Set[Int]): Unit
shrinkIsr…FIXME
|
Note
|
shrinkIsr is used when Partition is requested to maybeShrinkIsr.
|
isFollowerOutOfSync Internal Method
isFollowerOutOfSync(
replicaId: Int,
leaderEndOffset: Long,
currentTimeMs: Long,
maxLagMs: Long): Boolean
isFollowerOutOfSync gets the follower replica for the given replicaId (if available or throws an exception).
isFollowerOutOfSync is positive (true) when the following hold:
-
logEndOffset of the follower replica is different than the given
leaderEndOffset -
Time since the lastCaughtUpTimeMs of the follower replica is greater than the given
maxLagMs(i.e. replica.lag.time.max.ms configuration property).
|
Note
|
isFollowerOutOfSync is used when Partition is requested to getOutOfSyncReplicas.
|
getReplicaOrException Internal Method
getReplicaOrException(
replicaId: Int): Replica
getReplicaOrException finds the replica for the given replica ID or throws a ReplicaNotAvailableException:
Replica with id [replicaId] is not available on broker [localBrokerId]
|
Note
|
getReplicaOrException is used when Partition is requested to checkEnoughReplicasReachOffset, maybeShrinkIsr, and isFollowerOutOfSync.
|
getLocalLog Internal Method
getLocalLog(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Either[Log, Errors]
getLocalLog checkCurrentLeaderEpoch for the given currentLeaderEpoch and branches off per the result:
-
For no errors (
Errors.NONE), if therequireLeaderflag is on (true), but the broker is not the leader,getLocalLogreturnsErrors.NOT_LEADER_FOR_PARTITION. -
For no errors (
Errors.NONE), if therequireLeaderflag is off (false) or the broker is the leader,getLocalLogbranches off per the optional log.getLocalLoggives the log if defined or the errors:-
Errors.NOT_LEADER_FOR_PARTITIONwhen therequireLeaderflag is on (true) -
Errors.REPLICA_NOT_AVAILABLEwhen therequireLeaderflag is off (false)
-
-
For any error,
getLocalLogsimply returns it
|
Note
|
getLocalLog is used when Partition is requested to localLogWithEpochOrException and lastOffsetForLeaderEpoch.
|
checkCurrentLeaderEpoch Internal Method
checkCurrentLeaderEpoch(
remoteLeaderEpochOpt: Optional[Integer]): Errors
checkCurrentLeaderEpoch…FIXME
|
Note
|
checkCurrentLeaderEpoch is used when Partition is requested to getLocalLog.
|
localLogWithEpochOrException Internal Method
localLogWithEpochOrException(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Log
localLogWithEpochOrException…FIXME
|
Note
|
localLogWithEpochOrException is used when Partition is requested to…FIXME
|
isFollowerInSync Internal Method
isFollowerInSync(
followerReplica: Replica,
highWatermark: Long): Boolean
isFollowerInSync…FIXME
|
Note
|
isFollowerInSync is used when Partition is requested to maybeExpandIsr.
|
maybeUpdateIsrAndVersion Internal Method
maybeUpdateIsrAndVersion(
isr: Set[Int],
zkVersionOpt: Option[Int]): Unit
maybeUpdateIsrAndVersion…FIXME
Internal Properties
| Name | Description | ||
|---|---|---|---|
|
|||
|
In-sync replicas of the topic partition, i.e. broker IDs that are known to be in-sync with the leader
Updated when maybeUpdateIsrAndVersion, updateAssignmentAndIsr, and delete |
||
|
Default:
|
||
|
All replica broker IDs that were assigned to this topic partition |
||
|
Replicas by ID
Used in getReplica, assignedReplicas, allReplicas, toString |
||
|
|||
|
Remote replicas by broker ID (
Used for getReplica, remoteReplicas, maybeIncrementLeaderHW All of the pairs are removed when delete |