ZkPartitionStateMachine
ZkPartitionStateMachine is a PartitionStateMachine (with a given ControllerContext) for KafkaController (for every KafkaServer).
When requested to handle partition state changes, ZkPartitionStateMachine uses the ControllerBrokerRequestBatch to propagate them to all brokers in a cluster.
ZkPartitionStateMachine uses [PartitionStateMachine controllerId=[brokerId]] as the logging prefix (aka logIdent).
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating ZkPartitionStateMachine Instance
ZkPartitionStateMachine takes the following to be created:
Handling State Changes of Partitions — handleStateChanges Method
handleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Throwable]
|
Note
|
handleStateChanges is part of the PartitionStateMachine Contract to handle state changes of partitions (partition state changes).
|
handleStateChanges requests the ControllerBrokerRequestBatch to prepare a new batch.
handleStateChanges doHandleStateChanges (that may give some errors that are returned in the end).
In the end, handleStateChanges requests the ControllerBrokerRequestBatch to send controller requests to brokers and returns the errors.
In case of ControllerMovedException, handleStateChanges prints out the following ERROR message to the logs:
Controller moved to another broker when moving some partitions to [targetState] state
In case of any other error (Throwable), handleStateChanges prints out the following ERROR message to the logs:
Error while moving some partitions to [targetState] state
doHandleStateChanges Internal Method
doHandleStateChanges(
partitions: Seq[TopicPartition],
targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Throwable]
For every partition (in partitions), doHandleStateChanges requests the ControllerContext to putPartitionStateIfNotExists to NonExistentPartition.
doHandleStateChanges requests the ControllerContext to checkValidPartitionStateChange (that gives valid and invalid partitions).
For every invalid partition, doHandleStateChanges logInvalidTransition.
doHandleStateChanges branches off per the target state: NewPartition, OnlinePartition, OfflinePartition, and NonExistentPartition.
In the end, doHandleStateChanges returns the partitions with election failed (for OnlinePartition target state and the valid partitions in OfflinePartition or OnlinePartition states).
|
Note
|
doHandleStateChanges is used exclusively when ZkPartitionStateMachine is requested to handle partition state changes.
|
NewPartition
For NewPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to NewPartition state.
Changed partition [partition] state from [state] to NewPartition with assigned replicas [partitionReplicaAssignment]
OnlinePartition (and Partition Leader Election)
For OnlinePartition target state, doHandleStateChanges splits valid partitions into two collections. One partition set (uninitializedPartitions) with partitions in NewPartition state while the other with partitions in OfflinePartition or OnlinePartition states (partitionsToElectLeader).
For the partitions in NewPartition state (uninitialized valid partitions), doHandleStateChanges initializeLeaderAndIsrForPartitions (that gives partitions successfully initialized, aka successfulInitializations). For every partition successfully initialized, doHandleStateChanges prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
For partitions to elect leader (valid partitions in OfflinePartition or OnlinePartition states), doHandleStateChanges electLeaderForPartitions (with the partitions and the PartitionLeaderElectionStrategy). That gives two sets of partitions with election successful and failed. For every partition with election successful, doHandleStateChanges prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.
Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]
In the end, doHandleStateChanges returns the partitions with election failed.
OfflinePartition
For OfflinePartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OfflinePartition state.
Changed partition [partition] state from [state] to OfflinePartition
NonExistentPartition
For NonExistentPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to NonExistentPartition state.
Changed partition [partition] state from [state] to NonExistentPartition
initializeLeaderAndIsrForPartitions Internal Method
initializeLeaderAndIsrForPartitions(
partitions: Seq[TopicPartition]): Seq[TopicPartition]
initializeLeaderAndIsrForPartitions starts by requesting the ControllerContext for the partition replica assignment for every partition (in the given partitions).
From the partition replica assignments, initializeLeaderAndIsrForPartitions makes sure that the replicas are all online only (per the ControllerContext) so all other partitions are filtered out (excluded).
initializeLeaderAndIsrForPartitions splits the partitions (with online replicas only) into two sets with and without replicas (partitionsWithLiveReplicas and partitionsWithoutLiveReplicas, respectively).
For every partition without live (online) replicas, initializeLeaderAndIsrForPartitions prints out the following ERROR message and the StateChangeFailedException to the logs:
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from NewPartition to OnlinePartition
Controller [controllerId] epoch [epoch] encountered error during state change of partition [partition] from New to Online, assigned replicas are [[replicas]], live brokers are [[liveBrokerIds]]. No assigned replica is alive.
initializeLeaderAndIsrForPartitions converts the partitions with live (online) replicas into leaderIsrAndControllerEpochs (LeaderIsrAndControllerEpoch with LeaderAndIsr) and for every pair initializeLeaderAndIsrForPartitions requests the KafkaZkClient to create state znodes for the partitions.
For every successful response (from creating partition state znodes), initializeLeaderAndIsrForPartitions requests the following:
-
The ControllerContext to record the
leaderIsrAndControllerEpochfor the partition (in the partitionLeadershipInfo registry) -
The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers (with
isNewflag on)
In the end, initializeLeaderAndIsrForPartitions returns the partitions that were successfully initialized.
In case of ControllerMovedException (while…FIXME), initializeLeaderAndIsrForPartitions…FIXME
In case of any other error (Exception) (while…FIXME), initializeLeaderAndIsrForPartitions…FIXME
|
Note
|
initializeLeaderAndIsrForPartitions is used exclusively when ZkPartitionStateMachine is requested to handle partition state changes (for partitions in NewPartition state that are transitioned to OnlinePartition target state).
|
electLeaderForPartitions Internal Method
electLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Seq[TopicPartition], Map[TopicPartition, Throwable])
electLeaderForPartitions simply doElectLeaderForPartitions until all the given partitions have partition leaders elected successfully or not.
For every failed election, electLeaderForPartitions prints out the following ERROR message (with an exception) to the logs:
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from [state] to OnlinePartition
In the end, electLeaderForPartitions returns the partitions with leader election successful and failed.
|
Note
|
electLeaderForPartitions is used when ZkPartitionStateMachine is requested to handle partition state changes (when the partitions are expected in OnlinePartition target state).
|
doElectLeaderForPartitions Internal Method
doElectLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception])
doElectLeaderForPartitions requests the KafkaZkClient for the partition states and converts them to LeaderIsrAndControllerEpoch per partition.
doElectLeaderForPartitions collects the partitions that are eligible for partition election (that have controller epoch older than the current controller epoch per the ControllerContext).
doElectLeaderForPartitions branches off per the PartitionLeaderElectionStrategy and ends up with partitions with and without leaders:
-
For OfflinePartitionLeaderElectionStrategy,
doElectLeaderForPartitionsfirst collectUncleanLeaderElectionState with the valid partitions for election followed by leaderForOffline. -
For ReassignPartitionLeaderElectionStrategy,
doElectLeaderForPartitionsleaderForReassign -
For PreferredReplicaPartitionLeaderElectionStrategy,
doElectLeaderForPartitionsleaderForPreferredReplica -
For ControlledShutdownPartitionLeaderElectionStrategy,
doElectLeaderForPartitionsleaderForControlledShutdown
doElectLeaderForPartitions requests the KafkaZkClient to updateLeaderAndIsr (with the adjusted leader and ISRs).
For every successfully-updated partition (in Zookeeper), doElectLeaderForPartitions requests the following:
-
The ControllerContext to record the
leaderIsrAndControllerEpochfor the partition (in the partitionLeadershipInfo registry) -
The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers to every live replica broker (with
isNewflag off)
In the end, doElectLeaderForPartitions returns the partitions that were successfully updated, to be updated again and failed (during election and update).
|
Note
|
doElectLeaderForPartitions is used when ZkPartitionStateMachine is requested to electLeaderForPartitions.
|
collectUncleanLeaderElectionState Internal Method
collectUncleanLeaderElectionState(
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
collectUncleanLeaderElectionState…FIXME
|
Note
|
collectUncleanLeaderElectionState is used when ZkPartitionStateMachine is requested to doElectLeaderForPartitions (for OfflinePartitionLeaderElectionStrategy).
|
logInvalidTransition Internal Method
logInvalidTransition(
partition: TopicPartition,
targetState: PartitionState): Unit
logInvalidTransition…FIXME
|
Note
|
logInvalidTransition is used exclusively when ZkPartitionStateMachine is requested to doHandleStateChanges (for invalid partitions).
|
Printing Out ERROR Message to Logs — logFailedStateChange Internal Method
logFailedStateChange(
partition: TopicPartition,
currState: PartitionState,
targetState: PartitionState,
code: Code): Unit (1)
logFailedStateChange(
partition: TopicPartition,
currState: PartitionState,
targetState: PartitionState,
t: Throwable): Unit
-
Converts the code to a
KeeperException
logFailedStateChange simply prints out the following ERROR message to the logs:
Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from [currState] to [targetState]
|
Note
|
logFailedStateChange is used when ZkPartitionStateMachine is requested to initializeLeaderAndIsrForPartitions, electLeaderForPartitions, collectUncleanLeaderElectionState, and logInvalidTransition.
|
partitionState Internal Method
partitionState(
partition: TopicPartition): PartitionState
partitionState…FIXME
|
Note
|
partitionState is used when…FIXME
|
leaderForOffline Method
leaderForOffline(
controllerContext: ControllerContext,
partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
): Seq[ElectionResult]
// Private API
leaderForOffline(
partition: TopicPartition,
leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch],
uncleanLeaderElectionEnabled: Boolean,
controllerContext: ControllerContext
): ElectionResult
leaderForOffline…FIXME
|
Note
|
leaderForOffline is used when…FIXME
|
leaderForReassign Method
leaderForReassign(
controllerContext: ControllerContext,
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForReassign(
partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
controllerContext: ControllerContext
): ElectionResult
leaderForReassign…FIXME
|
Note
|
leaderForReassign is used when…FIXME
|
leaderForPreferredReplica Method
leaderForPreferredReplica(
controllerContext: ControllerContext,
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForPreferredReplica(
partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
controllerContext: ControllerContext
): ElectionResult
leaderForPreferredReplica…FIXME
|
Note
|
leaderForPreferredReplica is used when…FIXME
|
leaderForControlledShutdown Method
leaderForControlledShutdown(
controllerContext: ControllerContext,
leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForControlledShutdown(
partition: TopicPartition,
leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
shuttingDownBrokerIds: Set[Int],
controllerContext: ControllerContext
): ElectionResult
leaderForControlledShutdown…FIXME
|
Note
|
leaderForControlledShutdown is used when…FIXME
|