log4j.logger.kafka.cluster.Partition=ALL
Partition
Partition
is an internal representation of a TopicPartition for ReplicaManager (to manage all partitions).
Partition
knows the host broker (by local broker ID) which is one of the partition replicas (replicated across brokers of a Kafka cluster).
Partition
can be the leader of the TopicPartition or a follower.
Partition
can be elected the partition leader (when ReplicaManager
is requested to makeLeaders).
Partition
is under-replicated if it is the leader and the number of in-sync replicas is below the number of all replicas. Use the UnderReplicated metric to know it (or UnderReplicatedPartitions metric of the ReplicaManager).
Partition
uses the remoteReplicasMap internal registry to track remote replicas (and is requested to update partition replicas and the in-sync replicas when…FIXME).
A Kafka topic is spread across a Kafka cluster as a logical group of one or more partitions.
Kafka producers publish messages to topic leaders as do Kafka consumers consume them from.
In-Sync Replicas (ISR) are brokers that…FIXME
Offline Replicas are…FIXME
Partition
is created when ReplicaManager
is requested to add a TopicPartition to allPartitions (indirectly using apply) and for the OfflinePartition.
Partition
uses [Partition [topicPartition] broker=[localBrokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to Refer to Logging. |
Performance Metrics
Partition
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
One of the two possible values:
|
|
|
|
One of the two possible values:
|
|
One of the two possible values:
|
|
One of the two possible values:
|
The performance metrics are registered in kafka.cluster:type=Partition group (only when the partition is not offline).
Creating Partition Instance
Partition
takes the following to be created:
Partition
initializes the internal properties.
Partition and Partition Log — log
Registry
log: Option[Log] = None
log
is a partition log (of the partition).
log
is assigned a Log
when Partition
is requested to createLogIfNotExists and maybeReplaceCurrentWithFutureReplica.
log
is available until Partition
is requested to delete.
log
is used when:
-
Partition
is requested for the LastStableOffsetLag metric, getLocalLog, leaderLogIfLocal, localLogOrException, maybeReplaceCurrentWithFutureReplica -
ReplicaManager
is requested to localLog, becomeLeaderOrFollower, checkpointHighWatermarks and handleLogDirFailure.
Partition and PartitionStateStore
When created, a Partition
is given a PartitionStateStore (that is a ZkPartitionStateStore by default).
PartitionStateStore
is used when Partition
is requested for the following:
Creating Partition — apply
Utility
apply(
topicPartition: TopicPartition,
time: Time,
replicaManager: ReplicaManager): Partition
apply
creates a new Partition for the given TopicPartition
, Time
, and ReplicaManager with the following:
-
Creates a new
ZkPartitionStateStore
for the givenTopicPartition
and ReplicaManager -
Creates a new
DelayedOperations
for the givenTopicPartition
and ReplicaManager -
Uses replica.lag.time.max.ms and inter.broker.protocol.version configuration properties.
Note
|
apply is used when ReplicaManager is requested to add a partition to allPartitions Registry.
|
Read and Write Locks
leaderIsrUpdateLock: ReentrantReadWriteLock
leaderIsrUpdateLock
is a Java’s ReentrantReadWriteLock for a pair of locks, one for read-only operations and one for writing. (The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive).
A read lock (inReadLock
) is used for the following:
A write lock (inWriteLock
) is used for the following:
maybeExpandIsr
Method
maybeExpandIsr(
replicaId: Int,
logReadResult: LogReadResult): Boolean
maybeExpandIsr
…FIXME
Note
|
maybeExpandIsr is used when Partition is requested to updateFollowerFetchState.
|
updateFollowerFetchState
Method
updateFollowerFetchState(
followerId: Int,
followerFetchOffsetMetadata: LogOffsetMetadata,
followerStartOffset: Long,
followerFetchTimeMs: Long,
leaderEndOffset: Long,
lastSentHighwatermark: Long): Boolean
updateFollowerFetchState
…FIXME
Note
|
updateFollowerFetchState is used when ReplicaManager is requested to updateFollowerFetchState.
|
maybeShrinkIsr
Method
maybeShrinkIsr(
replicaMaxLagTimeMs: Long): Unit
maybeShrinkIsr
…
Note
|
maybeShrinkIsr is used when ReplicaManager is requested to maybeShrinkIsr.
|
updateReplicaLogReadResult
Method
updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean
updateReplicaLogReadResult
…FIXME
Note
|
updateReplicaLogReadResult is used exclusively when ReplicaManager updateFollowerLogReadResults.
|
makeFollower
Method
makeFollower(
controllerId: Int,
partitionStateInfo: LeaderAndIsrRequest.PartitionState,
correlationId: Int): Boolean
makeFollower
…FIXME
Note
|
makeFollower is used exclusively when ReplicaManager is requested to makeFollowers.
|
leaderReplicaIfLocal
Method
leaderReplicaIfLocal: Option[Replica]
leaderReplicaIfLocal
returns a Replica when the leaderReplicaIdOpt is the localBrokerId. Otherwise, leaderReplicaIfLocal
returns None
(i.e. undefined).
Note
|
leaderReplicaIfLocal is used…FIXME
|
isUnderMinIsr
Predicate
isUnderMinIsr: Boolean
isUnderMinIsr
is true
only if the partition isLeaderReplicaLocal and the number of in-sync replicas is below the min.insync.replicas configuration property (as configured for the Log of the leader replica).
Note
|
isUnderMinIsr is used when…FIXME
|
checkEnoughReplicasReachOffset
Method
checkEnoughReplicasReachOffset(
requiredOffset: Long): (Boolean, Errors)
checkEnoughReplicasReachOffset
…FIXME
Note
|
checkEnoughReplicasReachOffset is used when…FIXME
|
Electing Local Partition Replica as Partition Leader — makeLeader
Method
makeLeader(
controllerId: Int,
partitionState: LeaderAndIsrPartitionState,
correlationId: Int,
highWatermarkCheckpoints: OffsetCheckpoints): Boolean
makeLeader
returns true
if this broker has just been elected as the leader for the partition.
Note
|
makeLeader could be executed for a broker that is the partition leader already.
|
Internally, makeLeader
starts by acquiring write lock (which makes it a single-thread-exclusive operation).
makeLeader
changes the internal controller epoch based on the given LeaderAndIsrPartitionState
.
makeLeader
requests the given LeaderAndIsrPartitionState
for the partition replicas (their broker IDs) and the ISR (as a list of broker IDs) and updates internal registries for replica assignments and in-sync replicas.
makeLeader
creates a Log (unless available already) for the local broker ID and the given OffsetCheckpoints
(with isFutureReplica
flag off).
makeLeader
requests the leader log for the logEndOffset.
makeLeader
prints out the following INFO message to the logs:
[topicPartition] starts at Leader Epoch [leaderEpoch] from offset [leaderEpochStartOffset]. Previous Leader Epoch was: [leaderEpoch]
makeLeader
updates the internal registries: leaderEpoch, leaderEpochStartOffsetOpt and zkVersion.
makeLeader
requests the leader log to maybeAssignEpochStartOffset with the current leaderEpoch and the log end offset (that is now considered the leader epoch’s start offset).
makeLeader
requests the remoteReplicas to resetLastCaughtUpTime. For replicas in inSyncReplicaIds, the last caught-up time is the current time while for the others it is 0
.
When the partition has just been elected a new leader, makeLeader
updates the leaderReplicaIdOpt internal registry and requests the remoteReplicas to updateFetchState.
makeLeader
checks if increment the high watermark with the leader log and, if incremented, tryCompleteDelayedRequests.
Note
|
makeLeader is used when ReplicaManager is requested to makeLeaders.
|
Updating Internal Registries for Replica Assignments and In-Sync Replicas — updateAssignmentAndIsr
Method
updateAssignmentAndIsr(
assignment: Seq[Int],
isr: Set[Int]): Unit
updateAssignmentAndIsr
uses the remoteReplicasMap internal registry and the given assignment
to find the broker IDs that are no longer partition replicas (of the partition).
updateAssignmentAndIsr
creates new partition replicas for the broker IDs that have not been registered in the remoteReplicasMap internal registry before (based on the given assignment
).
updateAssignmentAndIsr
updates the allReplicaIds and the inSyncReplicaIds internal registries with the given assignment
and isr
, respectively.
Looking Up or Creating Replica — getOrCreateReplica
Method
getOrCreateReplica(
replicaId: Int,
isNew: Boolean = false): Replica
getOrCreateReplica
simply looks up the Replica in the allReplicasMap internal registry (by the given replicaId
).
If not found, getOrCreateReplica
…FIXME
Note
|
|
maybeCreateFutureReplica
Method
maybeCreateFutureReplica(logDir: String): Boolean
maybeCreateFutureReplica
…FIXME
Note
|
maybeCreateFutureReplica is used exclusively when ReplicaManager is requested to alterReplicaLogDirs.
|
appendRecordsToLeader
Method
appendRecordsToLeader(
records: MemoryRecords,
isFromClient: Boolean,
requiredAcks: Int = 0): LogAppendInfo
appendRecordsToLeader
basically requests the Log
(of the leader Replica) to appendAsLeader.
Internally, appendRecordsToLeader
…FIXME
Note
|
|
doAppendRecordsToFollowerOrFutureReplica
Internal Method
doAppendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo]
doAppendRecordsToFollowerOrFutureReplica
…FIXME
Note
|
doAppendRecordsToFollowerOrFutureReplica is used exclusively when Partition is requested to appendRecordsToFollowerOrFutureReplica.
|
appendRecordsToFollowerOrFutureReplica
Method
appendRecordsToFollowerOrFutureReplica(
records: MemoryRecords,
isFuture: Boolean): Option[LogAppendInfo]
appendRecordsToFollowerOrFutureReplica
…FIXME
Note
|
|
truncateFullyAndStartAt
Method
truncateFullyAndStartAt(newOffset: Long, isFuture: Boolean): Unit
truncateFullyAndStartAt
…FIXME
Note
|
|
maybeReplaceCurrentWithFutureReplica
Method
maybeReplaceCurrentWithFutureReplica(): Boolean
maybeReplaceCurrentWithFutureReplica
…FIXME
Note
|
maybeReplaceCurrentWithFutureReplica is used exclusively when ReplicaAlterLogDirsThread is requested to processPartitionData.
|
delete
Method
delete(): Unit
delete
…FIXME
Note
|
delete is used exclusively when ReplicaManager is requested to stopReplica.
|
removeFutureLocalReplica
Method
removeFutureLocalReplica(deleteFromLogDir: Boolean = true): Unit
removeFutureLocalReplica
…FIXME
Note
|
removeFutureLocalReplica is used when ReplicaManager is requested to alterReplicaLogDirs and handleLogDirFailure.
|
isLeaderReplicaLocal
Internal Method
isLeaderReplicaLocal: Boolean
isLeaderReplicaLocal
is positive (true
) when the optional Replica is defined. Otherwise, false
.
Note
|
isLeaderReplicaLocal is used when ReplicaManager is requested for the performance metrics (InSyncReplicasCount and ReplicasCount), isUnderReplicated, and lowWatermarkIfLeader.
|
localReplicaOrException
Method
localReplicaOrException: Replica
localReplicaOrException
localReplica and returns the local replica if available. Otherwise, localReplicaOrException
throws a ReplicaNotAvailableException
:
Replica for partition [topicPartition] is not available on broker [localBrokerId]
Note
|
|
localReplica
Method
localReplica: Option[Replica]
localReplica
simply gets the partition replica for the local broker ID.
Note
|
|
getReplica
Method
getReplica(replicaId: Int): Option[Replica]
getReplica
returns the replica by the given replicaId
(in the allReplicasMap registry) or None
.
Note
|
|
addReplicaIfNotExists
Method
addReplicaIfNotExists(replica: Replica): Replica
addReplicaIfNotExists
…FIXME
Note
|
addReplicaIfNotExists is used when…FIXME
|
assignedReplicas
Method
assignedReplicas: Set[Replica]
assignedReplicas
…FIXME
Note
|
assignedReplicas is used when…FIXME
|
allReplicas
Method
allReplicas: Set[Replica]
allReplicas
…FIXME
Note
|
allReplicas is used when…FIXME
|
removeReplica
Internal Method
removeReplica(replicaId: Int): Unit
removeReplica
…FIXME
Note
|
removeReplica is used when…FIXME
|
String (Textual) Representation — toString
Method
toString(): String
Note
|
toString is part of the java.lang.Object Contract for a string representation of the object.
|
toString
…FIXME
readRecords
Method
readRecords(
fetchOffset: Long,
currentLeaderEpoch: Optional[Integer],
maxBytes: Int,
fetchIsolation: FetchIsolation,
fetchOnlyFromLeader: Boolean,
minOneMessage: Boolean): LogReadInfo
readRecords
…FIXME
Note
|
readRecords is used when…FIXME
|
deleteRecordsOnLeader
Method
deleteRecordsOnLeader(
offset: Long): LogDeleteRecordsResult
deleteRecordsOnLeader
…FIXME
Note
|
deleteRecordsOnLeader is used when…FIXME
|
Creating Log or Future Log (Unless Available Already) — createLogIfNotExists
Method
createLogIfNotExists(
replicaId: Int,
isNew: Boolean,
isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Unit
createLogIfNotExists
branches off per the given isFutureReplica
flag:
For all other cases, createLogIfNotExists
simply prints out the following TRACE message (with the Future
prefix for isFutureReplica
flag enabled):
[Future] Log already exists.
Note
|
|
getOutOfSyncReplicas
Method
getOutOfSyncReplicas(
maxLagMs: Long): Set[Int]
getOutOfSyncReplicas
requests the Log for the logEndOffset.
maybeShrinkIsr
isFollowerOutOfSync for every inSyncReplicaIds (without the local broker ID)
Note
|
getOutOfSyncReplicas is used when Partition is requested to maybeShrinkIsr.
|
localLogOrException
Method
localLogOrException: Log
localLogOrException
gives the Log if defined or throws a ReplicaNotAvailableException
:
Log for partition [topicPartition] is not available on broker [localBrokerId]
Note
|
localLogOrException is used when…FIXME
|
Checking If Broker is Partition Leader — isLeader
Method
isLeader: Boolean
isLeader
is positive (true
) when the leaderReplicaIdOpt is the local broker ID.
Note
|
|
lastOffsetForLeaderEpoch
Method
lastOffsetForLeaderEpoch(
currentLeaderEpoch: Optional[Integer],
leaderEpoch: Int,
fetchOnlyFromLeader: Boolean): EpochEndOffset
lastOffsetForLeaderEpoch
…FIXME
Note
|
lastOffsetForLeaderEpoch is used when…FIXME
|
tryCompleteDelayedRequests
Internal Method
tryCompleteDelayedRequests(): Unit
tryCompleteDelayedRequests
…FIXME
Note
|
tryCompleteDelayedRequests is used when Partition is requested to makeLeader, updateReplicaLogReadResult, maybeShrinkIsr, and appendRecordsToLeader (when leaderHWIncremented ).
|
createLog
Internal Method
createLog(
replicaId: Int,
isNew: Boolean,
isFutureReplica: Boolean,
offsetCheckpoints: OffsetCheckpoints): Log
createLog
requests the LogManager to initializingLog for the TopicPartition.
createLog
requests the LogManager to look up or create a new partition log for the TopicPartition. The LogConfig
passed in is created by requesting the PartitionStateStore for the fetchTopicConfig to override the currentDefaultConfig of the LogManager.
createLog
requests the given OffsetCheckpoints
to fetch the checkpointed high watermark for the TopicPartition. Unless found, createLog
prints out the following INFO message to the logs and assumes 0
:
No checkpointed highwatermark is found for partition [topicPartition]
createLog
requests the Log
to update the high watermark.
createLog
prints out the following INFO message to the logs:
Log loaded for partition [topicPartition] with initial high watermark [initialHighWatermark]
In the end, createLog
requests the LogManager to finishedInitializingLog for the TopicPartition.
Note
|
createLog is used when Partition is requested to create a log or future log (unless available already).
|
expandIsr
Internal Method
expandIsr(
newIsr: Set[Int]): Unit
expandIsr
…FIXME
Note
|
expandIsr is used when Partition is requested to maybeExpandIsr.
|
shrinkIsr
Internal Method
shrinkIsr(
newIsr: Set[Int]): Unit
shrinkIsr
…FIXME
Note
|
shrinkIsr is used when Partition is requested to maybeShrinkIsr.
|
isFollowerOutOfSync
Internal Method
isFollowerOutOfSync(
replicaId: Int,
leaderEndOffset: Long,
currentTimeMs: Long,
maxLagMs: Long): Boolean
isFollowerOutOfSync
gets the follower replica for the given replicaId
(if available or throws an exception).
isFollowerOutOfSync
is positive (true
) when the following hold:
-
logEndOffset of the follower replica is different than the given
leaderEndOffset
-
Time since the lastCaughtUpTimeMs of the follower replica is greater than the given
maxLagMs
(i.e. replica.lag.time.max.ms configuration property).
Note
|
isFollowerOutOfSync is used when Partition is requested to getOutOfSyncReplicas.
|
getReplicaOrException
Internal Method
getReplicaOrException(
replicaId: Int): Replica
getReplicaOrException
finds the replica for the given replica ID or throws a ReplicaNotAvailableException
:
Replica with id [replicaId] is not available on broker [localBrokerId]
Note
|
getReplicaOrException is used when Partition is requested to checkEnoughReplicasReachOffset, maybeShrinkIsr, and isFollowerOutOfSync.
|
getLocalLog
Internal Method
getLocalLog(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Either[Log, Errors]
getLocalLog
checkCurrentLeaderEpoch for the given currentLeaderEpoch
and branches off per the result:
-
For no errors (
Errors.NONE
), if therequireLeader
flag is on (true
), but the broker is not the leader,getLocalLog
returnsErrors.NOT_LEADER_FOR_PARTITION
. -
For no errors (
Errors.NONE
), if therequireLeader
flag is off (false
) or the broker is the leader,getLocalLog
branches off per the optional log.getLocalLog
gives the log if defined or the errors:-
Errors.NOT_LEADER_FOR_PARTITION
when therequireLeader
flag is on (true
) -
Errors.REPLICA_NOT_AVAILABLE
when therequireLeader
flag is off (false
)
-
-
For any error,
getLocalLog
simply returns it
Note
|
getLocalLog is used when Partition is requested to localLogWithEpochOrException and lastOffsetForLeaderEpoch.
|
checkCurrentLeaderEpoch
Internal Method
checkCurrentLeaderEpoch(
remoteLeaderEpochOpt: Optional[Integer]): Errors
checkCurrentLeaderEpoch
…FIXME
Note
|
checkCurrentLeaderEpoch is used when Partition is requested to getLocalLog.
|
localLogWithEpochOrException
Internal Method
localLogWithEpochOrException(
currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Log
localLogWithEpochOrException
…FIXME
Note
|
localLogWithEpochOrException is used when Partition is requested to…FIXME
|
isFollowerInSync
Internal Method
isFollowerInSync(
followerReplica: Replica,
highWatermark: Long): Boolean
isFollowerInSync
…FIXME
Note
|
isFollowerInSync is used when Partition is requested to maybeExpandIsr.
|
maybeUpdateIsrAndVersion
Internal Method
maybeUpdateIsrAndVersion(
isr: Set[Int],
zkVersionOpt: Option[Int]): Unit
maybeUpdateIsrAndVersion
…FIXME
Internal Properties
Name | Description | ||
---|---|---|---|
|
|||
|
In-sync replicas of the topic partition, i.e. broker IDs that are known to be in-sync with the leader
Updated when maybeUpdateIsrAndVersion, updateAssignmentAndIsr, and delete |
||
|
Default:
|
||
|
All replica broker IDs that were assigned to this topic partition |
||
|
Replicas by ID
Used in getReplica, assignedReplicas, allReplicas, toString |
||
|
|||
|
Remote replicas by broker ID (
Used for getReplica, remoteReplicas, maybeIncrementLeaderHW All of the pairs are removed when delete |