PartitionStateMachine

PartitionStateMachine is created exclusively for KafkaController.

PartitionStateMachine is started when KafkaController is requested to onControllerFailover (when KafkaController is requested for controller election and a broker is successfully elected as the controller).

PartitionStateMachine is shut down when KafkaController is requested to resign as the active controller.

PartitionStateMachine uses a PartitionLeaderElectionStrategy when handleStateChanges (with doHandleStateChanges) and electLeaderForPartitions (with doElectLeaderForPartitions):

  • ControlledShutdownPartitionLeaderElectionStrategy

  • OfflinePartitionLeaderElectionStrategy

  • PreferredReplicaPartitionLeaderElectionStrategy

  • ReassignPartitionLeaderElectionStrategy

Table 1. PartitionStateMachine’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

controllerId

topicDeletionManager

TopicDeletionManager

offlinePartitionCount

PartitionStateMachine uses [PartitionStateMachine controllerId=[controllerId]] as the logging prefix (aka logIdent).

Tip

Enable INFO or ERROR logging levels for kafka.controller.PartitionStateMachine logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.PartitionStateMachine=INFO

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.controller logger in config/log4j.properties:

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

That means that the logs of PartitionStateMachine go to logs/controller.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

triggerOnlinePartitionStateChange Method

triggerOnlinePartitionStateChange(): Unit (1)
triggerOnlinePartitionStateChange(topic: String): Unit  (2)
triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, PartitionState]): Unit
  1. Uses the partitionState for the input partitionState

  2. Takes TopicPartitions from the partitionState that match the input topic for the input partitionState

triggerOnlinePartitionStateChange collects the TopicPartitions (from the given PartitionState per TopicPartition collection) that meet the following requirements:

  1. TopicDeletionManager is not going to delete their topics

  2. PartitionState is neither OfflinePartition nor NewPartition

triggerOnlinePartitionStateChange handleStateChanges for the partitions with OnlinePartition target state and the OfflinePartitionLeaderElectionStrategy.

Note

triggerOnlinePartitionStateChange is used when:

handleStateChanges Method

handleStateChanges(
  partitions: Set[TopicAndPartition],
  targetState: PartitionState,
  leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderS...,
  callbacks: Callbacks): Unit

handleStateChanges…​FIXME

Note

handleStateChanges is used when:

shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

Starting Up (On Active Controller) — startup Method

startup(): Unit

startup prints out the following INFO message to the logs:

Initializing partition state

startup prints out the following INFO message to the logs:

Triggering online partition state changes

In the end, startup prints out the following INFO message to the logs:

Started partition state machine with initial state -> [partitionState]
Note
startup is used exclusively when KafkaController is requested to onControllerFailover (when KafkaController is requested to elect and a broker is successfully elected as the controller).

doElectLeaderForPartitions Internal Method

doElectLeaderForPartitions(
  partitions: Seq[TopicPartition],
  partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy)
: (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception])

doElectLeaderForPartitions…​FIXME

Note
doElectLeaderForPartitions is used when…​FIXME

Creating PartitionStateMachine Instance

PartitionStateMachine takes the following when created:

PartitionStateMachine initializes the internal registries and counters.

electLeaderForPartitions Internal Method

electLeaderForPartitions(
  partitions: Seq[TopicPartition],
  partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition]

electLeaderForPartitions…​FIXME

Note
electLeaderForPartitions is used exclusively when PartitionStateMachine is requested to doHandleStateChanges.

doHandleStateChanges Internal Method

doHandleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState,
  partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit

doHandleStateChanges…​FIXME

Note
doHandleStateChanges is used exclusively when PartitionStateMachine is requested to handleStateChanges.

initializePartitionState Internal Method

initializePartitionState(): Unit

initializePartitionState requests the ControllerContext for all partitions (across all the brokers in the Kafka cluster).

For every TopicPartition, initializePartitionState requests the ControllerContext for the LeaderIsrAndControllerEpoch metadata (using the partitionLeadershipInfo internal registry).

initializePartitionState changeStateTo of a TopicPartition as follows:

Note
initializePartitionState is used exclusively when PartitionStateMachine is requested to start up on the active Controller.

changeStateTo Internal Method

changeStateTo(
  partition: TopicPartition,
  currentState: PartitionState,
  targetState: PartitionState): Unit

changeStateTo…​FIXME

Note
changeStateTo is used when…​FIXME

results matching ""

    No results matching ""