ZkPartitionStateMachine

ZkPartitionStateMachine is a PartitionStateMachine (with a given ControllerContext) for KafkaController (for every KafkaServer).

ZkPartitionStateMachine.png
Figure 1. ZkPartitionStateMachine and KafkaController

ZkPartitionStateMachine uses [PartitionStateMachine controllerId=[brokerId]] as the logging prefix (aka logIdent).

Tip

Enable ALL logging levels for kafka.controller.ZkPartitionStateMachine logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.ZkPartitionStateMachine=ALL

Refer to Logging.

Creating ZkPartitionStateMachine Instance

ZkPartitionStateMachine takes the following to be created:

Handling State Changes of Partitions — handleStateChanges Method

handleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState,
  partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Throwable]
Note
handleStateChanges is part of the PartitionStateMachine Contract to handle state changes of partitions (partition state changes).

handleStateChanges requests the ControllerBrokerRequestBatch to prepare a new batch.

handleStateChanges doHandleStateChanges (that may give some errors that are returned in the end).

In the end, handleStateChanges requests the ControllerBrokerRequestBatch to send controller requests to brokers and returns the errors.

In case of ControllerMovedException, handleStateChanges prints out the following ERROR message to the logs:

Controller moved to another broker when moving some partitions to [targetState] state

In case of any other error (Throwable), handleStateChanges prints out the following ERROR message to the logs:

Error while moving some partitions to [targetState] state

doHandleStateChanges Internal Method

doHandleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState,
  partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
): Map[TopicPartition, Throwable]

For every partition (in partitions), doHandleStateChanges requests the ControllerContext to putPartitionStateIfNotExists to NonExistentPartition.

doHandleStateChanges requests the ControllerContext to checkValidPartitionStateChange (that gives valid and invalid partitions).

For every invalid partition, doHandleStateChanges logInvalidTransition.

doHandleStateChanges branches off per the target state: NewPartition, OnlinePartition, OfflinePartition, and NonExistentPartition.

In the end, doHandleStateChanges returns the partitions with election failed (for OnlinePartition target state and the valid partitions in OfflinePartition or OnlinePartition states).

Note
doHandleStateChanges is used exclusively when ZkPartitionStateMachine is requested to handle partition state changes.

NewPartition

For NewPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to NewPartition state.

Changed partition [partition] state from [state] to NewPartition with assigned replicas [partitionReplicaAssignment]

OnlinePartition (and Partition Leader Election)

For OnlinePartition target state, doHandleStateChanges splits valid partitions into two collections. One partition set (uninitializedPartitions) with partitions in NewPartition state while the other with partitions in OfflinePartition or OnlinePartition states (partitionsToElectLeader).

For the partitions in NewPartition state (uninitialized valid partitions), doHandleStateChanges initializeLeaderAndIsrForPartitions (that gives partitions successfully initialized, aka successfulInitializations). For every partition successfully initialized, doHandleStateChanges prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.

Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]

For partitions to elect leader (valid partitions in OfflinePartition or OnlinePartition states), doHandleStateChanges electLeaderForPartitions (with the partitions and the PartitionLeaderElectionStrategy). That gives two sets of partitions with election successful and failed. For every partition with election successful, doHandleStateChanges prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OnlinePartition state.

Changed partition [partition] from [state] to OnlinePartition with state [leaderAndIsr]

In the end, doHandleStateChanges returns the partitions with election failed.

OfflinePartition

For OfflinePartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to OfflinePartition state.

Changed partition [partition] state from [state] to OfflinePartition

NonExistentPartition

For NonExistentPartition target state, doHandleStateChanges goes over the valid partitions and for every partition prints out the following TRACE message to the logs and requests the ControllerContext to putPartitionState to NonExistentPartition state.

Changed partition [partition] state from [state] to NonExistentPartition

initializeLeaderAndIsrForPartitions Internal Method

initializeLeaderAndIsrForPartitions(
  partitions: Seq[TopicPartition]): Seq[TopicPartition]

initializeLeaderAndIsrForPartitions starts by requesting the ControllerContext for the partition replica assignment for every partition (in the given partitions).

From the partition replica assignments, initializeLeaderAndIsrForPartitions makes sure that the replicas are all online only (per the ControllerContext) so all other partitions are filtered out (excluded).

initializeLeaderAndIsrForPartitions splits the partitions (with online replicas only) into two sets with and without replicas (partitionsWithLiveReplicas and partitionsWithoutLiveReplicas, respectively).

For every partition without live (online) replicas, initializeLeaderAndIsrForPartitions prints out the following ERROR message and the StateChangeFailedException to the logs:

Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from NewPartition to OnlinePartition
Controller [controllerId] epoch [epoch] encountered error during state change of partition [partition] from New to Online, assigned replicas are [[replicas]], live brokers are [[liveBrokerIds]]. No assigned replica is alive.

initializeLeaderAndIsrForPartitions converts the partitions with live (online) replicas into leaderIsrAndControllerEpochs (LeaderIsrAndControllerEpoch with LeaderAndIsr) and for every pair initializeLeaderAndIsrForPartitions requests the KafkaZkClient to create state znodes for the partitions.

For every successful response (from creating partition state znodes), initializeLeaderAndIsrForPartitions requests the following:

  1. The ControllerContext to record the leaderIsrAndControllerEpoch for the partition (in the partitionLeadershipInfo registry)

  2. The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers (with isNew flag on)

In the end, initializeLeaderAndIsrForPartitions returns the partitions that were successfully initialized.

In case of ControllerMovedException (while…​FIXME), initializeLeaderAndIsrForPartitions…​FIXME

In case of any other error (Exception) (while…​FIXME), initializeLeaderAndIsrForPartitions…​FIXME

Note
initializeLeaderAndIsrForPartitions is used exclusively when ZkPartitionStateMachine is requested to handle partition state changes (for partitions in NewPartition state that are transitioned to OnlinePartition target state).

electLeaderForPartitions Internal Method

electLeaderForPartitions(
  partitions: Seq[TopicPartition],
  partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Seq[TopicPartition], Map[TopicPartition, Throwable])

electLeaderForPartitions simply doElectLeaderForPartitions until all the given partitions have partition leaders elected successfully or not.

For every failed election, electLeaderForPartitions prints out the following ERROR message (with an exception) to the logs:

Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from [state] to OnlinePartition

In the end, electLeaderForPartitions returns the partitions with leader election successful and failed.

Note
electLeaderForPartitions is used when ZkPartitionStateMachine is requested to handle partition state changes (when the partitions are expected in OnlinePartition target state).

doElectLeaderForPartitions Internal Method

doElectLeaderForPartitions(
  partitions: Seq[TopicPartition],
  partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception])

doElectLeaderForPartitions requests the KafkaZkClient for the partition states and converts them to LeaderIsrAndControllerEpoch per partition.

doElectLeaderForPartitions collects the partitions that are eligible for partition election (that have controller epoch older than the current controller epoch per the ControllerContext).

doElectLeaderForPartitions branches off per the PartitionLeaderElectionStrategy and ends up with partitions with and without leaders:

doElectLeaderForPartitions requests the KafkaZkClient to updateLeaderAndIsr (with the adjusted leader and ISRs).

For every successfully-updated partition (in Zookeeper), doElectLeaderForPartitions requests the following:

  1. The ControllerContext to record the leaderIsrAndControllerEpoch for the partition (in the partitionLeadershipInfo registry)

  2. The ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers to every live replica broker (with isNew flag off)

In the end, doElectLeaderForPartitions returns the partitions that were successfully updated, to be updated again and failed (during election and update).

Note
doElectLeaderForPartitions is used when ZkPartitionStateMachine is requested to electLeaderForPartitions.

collectUncleanLeaderElectionState Internal Method

collectUncleanLeaderElectionState(
  leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]

collectUncleanLeaderElectionState…​FIXME

Note
collectUncleanLeaderElectionState is used when ZkPartitionStateMachine is requested to doElectLeaderForPartitions (for OfflinePartitionLeaderElectionStrategy).

logInvalidTransition Internal Method

logInvalidTransition(
  partition: TopicPartition,
  targetState: PartitionState): Unit

logInvalidTransition…​FIXME

Note
logInvalidTransition is used exclusively when ZkPartitionStateMachine is requested to doHandleStateChanges (for invalid partitions).

Printing Out ERROR Message to Logs — logFailedStateChange Internal Method

logFailedStateChange(
  partition: TopicPartition,
  currState: PartitionState,
  targetState: PartitionState,
  code: Code): Unit (1)
logFailedStateChange(
  partition: TopicPartition,
  currState: PartitionState,
  targetState: PartitionState,
  t: Throwable): Unit
  1. Converts the code to a KeeperException

logFailedStateChange simply prints out the following ERROR message to the logs:

Controller [controllerId] epoch [epoch] failed to change state for partition [partition] from [currState] to [targetState]
Note
logFailedStateChange is used when ZkPartitionStateMachine is requested to initializeLeaderAndIsrForPartitions, electLeaderForPartitions, collectUncleanLeaderElectionState, and logInvalidTransition.

partitionState Internal Method

partitionState(
  partition: TopicPartition): PartitionState

partitionState…​FIXME

Note
partitionState is used when…​FIXME

leaderForOffline Method

leaderForOffline(
  controllerContext: ControllerContext,
  partitionsWithUncleanLeaderElectionState: Seq[(TopicPartition, Option[LeaderIsrAndControllerEpoch], Boolean)]
): Seq[ElectionResult]
// Private API
leaderForOffline(
  partition: TopicPartition,
  leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch],
  uncleanLeaderElectionEnabled: Boolean,
  controllerContext: ControllerContext
): ElectionResult

leaderForOffline…​FIXME

Note
leaderForOffline is used when…​FIXME

leaderForReassign Method

leaderForReassign(
  controllerContext: ControllerContext,
  leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForReassign(
  partition: TopicPartition,
  leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
  controllerContext: ControllerContext
): ElectionResult

leaderForReassign…​FIXME

Note
leaderForReassign is used when…​FIXME

leaderForPreferredReplica Method

leaderForPreferredReplica(
  controllerContext: ControllerContext,
  leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForPreferredReplica(
  partition: TopicPartition,
  leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
  controllerContext: ControllerContext
): ElectionResult

leaderForPreferredReplica…​FIXME

Note
leaderForPreferredReplica is used when…​FIXME

leaderForControlledShutdown Method

leaderForControlledShutdown(
  controllerContext: ControllerContext,
  leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]
): Seq[ElectionResult]
// Private API
leaderForControlledShutdown(
  partition: TopicPartition,
  leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
  shuttingDownBrokerIds: Set[Int],
  controllerContext: ControllerContext
): ElectionResult

leaderForControlledShutdown…​FIXME

Note
leaderForControlledShutdown is used when…​FIXME

results matching ""

    No results matching ""