ZkPartitionStateMachine
ZkPartitionStateMachine
is a PartitionStateMachine (with a given ControllerContext) for KafkaController (for every KafkaServer).
When requested to handle partition state changes, ZkPartitionStateMachine
uses the ControllerBrokerRequestBatch to propagate them to all brokers in a cluster.
ZkPartitionStateMachine
uses [PartitionStateMachine controllerId=[brokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating ZkPartitionStateMachine Instance
ZkPartitionStateMachine
takes the following to be created:
Handling State Changes of Partitions — handleStateChanges
Method
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Throwable]
Note
|
handleStateChanges is part of the PartitionStateMachine Contract to handle state changes of partitions (partition state changes).
|
handleStateChanges
requests the ControllerBrokerRequestBatch to prepare a new batch.
handleStateChanges
doHandleStateChanges (that may give some errors that are returned in the end).
In the end, handleStateChanges
requests the ControllerBrokerRequestBatch to send controller requests to brokers and returns the errors.
In case of ControllerMovedException
, handleStateChanges
prints out the following ERROR message to the logs:
Controller moved to another broker when moving some partitions to [targetState] state
In case of any other error (Throwable
), handleStateChanges
prints out the following ERROR message to the logs:
Error while moving some partitions to [targetState] state
doHandleStateChanges
Internal Method
doHandleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Throwable]
For every partition (in partitions
), doHandleStateChanges
requests the ControllerContext to putPartitionStateIfNotExists to NonExistentPartition
.
doHandleStateChanges
requests the ControllerContext to checkValidPartitionStateChange (that gives valid and invalid partitions).
For every invalid partition, doHandleStateChanges
logInvalidTransition.
doHandleStateChanges
branches off per the target state: NewPartition, OnlinePartition, OfflinePartition, and NonExistentPartition.
In the end, doHandleStateChanges
returns the partitions with election failed (for OnlinePartition
target state and the valid partitions in OfflinePartition
or OnlinePartition
states).
Note
|
doHandleStateChanges is used exclusively when ZkPartitionStateMachine is requested to handle partition state changes.
|
NewPartition
For NewPartition
target state, doHandleStateChanges
goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to NewPartition
state.
Changed partition [partition] state from [state] to NewPartition with assigned replicas [partitionReplicaAssignment]
OnlinePartition (and Partition Leader Election)
For OnlinePartition
target state, doHandleStateChanges
splits valid partitions into two collections. One partition set (uninitializedPartitions
) with partitions in NewPartition
state while the other with partitions in OfflinePartition
or OnlinePartition
states (partitionsToElectLeader
).
For the partitions in NewPartition
state (uninitialized valid partitions), doHandleStateChanges
initializeLeaderAndIsrForPartitions (that gives partitions successfully initialized, aka successfulInitializations). For every partition successfully initialized, doHandleStateChanges
prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition
state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
For partitions to elect leader (valid partitions in OfflinePartition
or OnlinePartition
states), doHandleStateChanges
electLeaderForPartitions (with the partitions and the PartitionLeaderElectionStrategy
). That gives two sets of partitions with election successful and failed. For every partition with election successful, doHandleStateChanges
prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition
state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
In the end, doHandleStateChanges
returns the partitions with election failed.
OfflinePartition
For OfflinePartition
target state, doHandleStateChanges
goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OfflinePartition
state.
Changed partition [partition] state from [state] to OfflinePartition
NonExistentPartition
For NonExistentPartition
target state, doHandleStateChanges
goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to NonExistentPartition
state.
Changed partition [partition] state from [state] to NonExistentPartition
initializeLeaderAndIsrForPartitions
Internal Method
initializeLeaderAndIsrForPartitions(
partitions: Seq[TopicPartition]): Seq[TopicPartition]
initializeLeaderAndIsrForPartitions
starts by requesting the ControllerContext for the partition replica assignment for every partition (in the given partitions
).
From the partition replica assignments, initializeLeaderAndIsrForPartitions
makes sure that the replicas are all online only (per the ControllerContext) so all other partitions are filtered out (excluded).
initializeLeaderAndIsrForPartitions
splits the partitions (with online replicas only) into two sets with and without replicas (partitionsWithLiveReplicas
and partitionsWithoutLiveReplicas
, respectively).
For every partition without live (online) replicas, initializeLeaderAndIsrForPartitions
prints out the following ERROR message and the StateChangeFailedException to the logs:
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from NewPartition to OnlinePartition
Controller [controllerId] epoch [epoch] encountered error during state change of partition [partition] from New to Online, assigned replicas are [[replicas]], live brokers are [[liveBrokerIds]]. No assigned replica is alive.
initializeLeaderAndIsrForPartitions
converts the partitions with live (online) replicas into leaderIsrAndControllerEpochs
(LeaderIsrAndControllerEpoch
with LeaderAndIsr
) and for every pair initializeLeaderAndIsrForPartitions
requests the KafkaZkClient to create state znodes for the partitions.
For every successful response (from creating partition state znodes), initializeLeaderAndIsrForPartitions
requests the following:
-
The ControllerContext to record the
leaderIsrAndControllerEpoch
for the partition (in the partitionLeadershipInfo registry) -
The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers (with
isNew
flag on)
In the end, initializeLeaderAndIsrForPartitions
returns the partitions that were successfully initialized.
In case of ControllerMovedException
(while…FIXME), initializeLeaderAndIsrForPartitions
…FIXME
In case of any other error (Exception
) (while…FIXME), initializeLeaderAndIsrForPartitions
…FIXME
Note
|
initializeLeaderAndIsrForPartitions is used exclusively when ZkPartitionStateMachine is requested to handle partition state changes (for partitions in NewPartition state that are transitioned to OnlinePartition target state).
|
electLeaderForPartitions
Internal Method
electLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Seq[TopicPartition], Map[TopicPartition, Throwable])
electLeaderForPartitions
simply doElectLeaderForPartitions until all the given partitions have partition leaders elected successfully or not.
For every failed election, electLeaderForPartitions
prints out the following ERROR message (with an exception) to the logs:
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from [state] to OnlinePartition
In the end, electLeaderForPartitions
returns the partitions with leader election successful and failed.
Note
|
electLeaderForPartitions is used when ZkPartitionStateMachine is requested to handle partition state changes (when the partitions are expected in OnlinePartition target state).
|
doElectLeaderForPartitions
Internal Method
doElectLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception])
doElectLeaderForPartitions
requests the KafkaZkClient for the partition states and converts them to LeaderIsrAndControllerEpoch
per partition.
doElectLeaderForPartitions
collects the partitions that are eligible for partition election (that have controller epoch older than the current controller epoch per the ControllerContext).
doElectLeaderForPartitions
branches off per the PartitionLeaderElectionStrategy and ends up with partitions with and without leaders:
-
For OfflinePartitionLeaderElectionStrategy,
doElectLeaderForPartitions
first collectUncleanLeaderElectionState with the valid partitions for election followed by leaderForOffline. -
For ReassignPartitionLeaderElectionStrategy,
doElectLeaderForPartitions
leaderForReassign -
For PreferredReplicaPartitionLeaderElectionStrategy,
doElectLeaderForPartitions
leaderForPreferredReplica -
For ControlledShutdownPartitionLeaderElectionStrategy,
doElectLeaderForPartitions
leaderForControlledShutdown
doElectLeaderForPartitions
requests the KafkaZkClient to updateLeaderAndIsr (with the adjusted leader and ISRs).
For every successfully-updated partition (in Zookeeper), doElectLeaderForPartitions
requests the following:
-
The ControllerContext to record the
leaderIsrAndControllerEpoch
for the partition (in the partitionLeadershipInfo registry) -
The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers to every live replica broker (with
isNew
flag off)
In the end, doElectLeaderForPartitions
returns the partitions that were successfully updated, to be updated again and failed (during election and update).
Note
|
doElectLeaderForPartitions is used when ZkPartitionStateMachine is requested to electLeaderForPartitions.
|
collectUncleanLeaderElectionState
Internal Method
collectUncleanLeaderElectionState(
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
collectUncleanLeaderElectionState
…FIXME
Note
|
collectUncleanLeaderElectionState is used when ZkPartitionStateMachine is requested to doElectLeaderForPartitions (for OfflinePartitionLeaderElectionStrategy).
|
logInvalidTransition
Internal Method
logInvalidTransition(
partition: TopicPartition,
targetState: PartitionState): Unit
logInvalidTransition
…FIXME
Note
|
logInvalidTransition is used exclusively when ZkPartitionStateMachine is requested to doHandleStateChanges (for invalid partitions).
|
Printing Out ERROR Message to Logs — logFailedStateChange
Internal Method
logFailedStateChange(
partition: TopicPartition,
currState: PartitionState,
targetState: PartitionState,
code: Code): Unit (1)
logFailedStateChange(
partition: TopicPartition,
currState: PartitionState,
targetState: PartitionState,
t: Throwable): Unit
-
Converts the code to a
KeeperException
logFailedStateChange
simply prints out the following ERROR message to the logs:
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from [currState] to [targetState]
Note
|
logFailedStateChange is used when ZkPartitionStateMachine is requested to initializeLeaderAndIsrForPartitions, electLeaderForPartitions, collectUncleanLeaderElectionState, and logInvalidTransition.
|
partitionState
Internal Method
partitionState(
partition: TopicPartition): PartitionState
partitionState
…FIXME
Note
|
partitionState is used when…FIXME
|
leaderForOffline
Method
leaderForOffline(
controllerContext: ControllerContext,
partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
): Seq[ElectionResult]
// Private API
leaderForOffline(
partition: TopicPartition,
leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch],
uncleanLeaderElectionEnabled: Boolean,
controllerContext: ControllerContext
): ElectionResult
leaderForOffline
…FIXME
Note
|
leaderForOffline is used when…FIXME
|
leaderForReassign
Method
leaderForReassign(
controllerContext: ControllerContext,
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForReassign(
partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
controllerContext: ControllerContext
): ElectionResult
leaderForReassign
…FIXME
Note
|
leaderForReassign is used when…FIXME
|
leaderForPreferredReplica
Method
leaderForPreferredReplica(
controllerContext: ControllerContext,
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForPreferredReplica(
partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
controllerContext: ControllerContext
): ElectionResult
leaderForPreferredReplica
…FIXME
Note
|
leaderForPreferredReplica is used when…FIXME
|
leaderForControlledShutdown
Method
leaderForControlledShutdown(
controllerContext: ControllerContext,
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForControlledShutdown(
partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
shuttingDownBrokerIds: Set[Int],
controllerContext: ControllerContext
): ElectionResult
leaderForControlledShutdown
…FIXME
Note
|
leaderForControlledShutdown is used when…FIXME
|