ReplicaStateMachine

ReplicaStateMachine is created exclusively for a KafkaController that uses it for the following:

  • FIXME

ReplicaStateMachine is started when KafkaController is requested to onControllerFailover (when KafkaController is requested for controller election and a broker is successfully elected as the controller).

ReplicaStateMachine is shut down when KafkaController is requested to resign as the active controller.

ReplicaStateMachine uses [ReplicaStateMachine controllerId=[brokerID]] as the logging prefix (aka logIdent).

Tip

Enable INFO or ERROR logging levels for kafka.controller.ReplicaStateMachine logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.controller.ReplicaStateMachine=INFO

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.controller logger in config/log4j.properties:

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

That means that the logs of ReplicaStateMachine go to logs/controller.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

handleStateChanges Method

handleStateChanges(
  replicas: Seq[PartitionAndReplica],
  targetState: ReplicaState,
  callbacks: Callbacks = new Callbacks()): Unit

handleStateChanges…​FIXME

shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

startup Method

startup(): Unit

startup prints out the following INFO message to the logs:

Initializing replica state

startup prints out the following INFO message to the logs:

Triggering online replica state changes

startup handleStateChanges with the live replicas (from the ControllerContext) and OnlineReplica target state.

In the end, startup prints out the following INFO message to the logs:

Started replica state machine with initial state -> [replicaState]
Note
startup is used exclusively when KafkaController is requested to onControllerFailover (when a KafkaController is requested to elect and a broker is successfully elected as the controller).

getTopicPartitionStatesFromZk Internal Method

getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition])
: (Map[TopicPartition, LeaderAndIsr], Seq[TopicPartition], Map[TopicPartition, Exception])

getTopicPartitionStatesFromZk…​FIXME

Note
getTopicPartitionStatesFromZk is used when…​FIXME

initializeReplicaState Internal Method

initializeReplicaState(): Unit

initializeReplicaState…​FIXME

Note
initializeReplicaState is used exclusively when ReplicaStateMachine is requested to start up.

doHandleStateChanges Internal Method

doHandleStateChanges(
  replicaId: Int,
  partitions: Seq[TopicPartition],
  targetState: ReplicaState,
  callbacks: Callbacks): Unit

doHandleStateChanges…​FIXME

Note
doHandleStateChanges is used exclusively when ReplicaStateMachine is requested to handleStateChanges.

Creating ReplicaStateMachine Instance

ReplicaStateMachine takes the following to be created:

results matching ""

    No results matching ""