ZkReplicaStateMachine

ZkReplicaStateMachine is a concrete ReplicaStateMachine to handle changes of the state of partition replicas (replica state changes).

ZkReplicaStateMachine is created exclusively for KafkaController.

ZkReplicaStateMachine uses [ReplicaStateMachine controllerId=[brokerId]] as the logging prefix (aka logIdent).

Tip

Enable ALL logging levels for kafka.controller.ZkReplicaStateMachine logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.ZkReplicaStateMachine=ALL

Refer to Logging.

Creating ZkReplicaStateMachine Instance

ZkReplicaStateMachine takes the following to be created:

Handling State Changes of Replicas — handleStateChanges Method

handleStateChanges(
  replicas: Seq[PartitionAndReplica],
  targetState: ReplicaState): Unit
Note
handleStateChanges is part of the ReplicaStateMachine Contract to handle state changes of replicas (replica state changes).

handleStateChanges requests the ControllerBrokerRequestBatch to prepare a new batch.

handleStateChanges groups the replicas (PartitionAndReplicas) by ID and for every group doHandleStateChanges.

In the end, handleStateChanges requests the ControllerBrokerRequestBatch to send controller requests to brokers.

In case of a ControllerMovedException, handleStateChanges prints out the following ERROR message to the logs and re-throws the exception.

Controller moved to another broker when moving some replicas to [targetState] state

In case of any other error (Throwable), handleStateChanges simply prints out the following ERROR message to the logs:

Error while moving some replicas to [targetState] state

handleStateChanges does nothing and returns when there are no PartitionAndReplica to work on (i.e. the given replicas is empty).

doHandleStateChanges Internal Method

doHandleStateChanges(
  replicaId: Int,
  replicas: Seq[PartitionAndReplica],
  targetState: ReplicaState): Unit

For every replica (in the replicas), doHandleStateChanges requests the ControllerContext to putReplicaStateIfNotExists.

doHandleStateChanges requests the ControllerContext to checkValidReplicaStateChange (that gives valid and invalid replicas).

For every invalid replica, doHandleStateChanges logInvalidTransition.

doHandleStateChanges branches off per the target state (ReplicaState): OfflineReplica,…​FIXME

Note
doHandleStateChanges is used exclusively when ZkReplicaStateMachine is requested to handle replica state changes.

OfflineReplica

For OfflineReplica target state, doHandleStateChanges…​FIXME

doRemoveReplicasFromIsr Internal Method

doRemoveReplicasFromIsr(
  replicaId: Int,
  partitions: Seq[TopicPartition]):
(Map[TopicPartition, LeaderIsrAndControllerEpoch],
 Seq[TopicPartition],
 Map[TopicPartition, Exception])

doRemoveReplicasFromIsr…​FIXME

Note
doRemoveReplicasFromIsr is used exclusively when ZkReplicaStateMachine is requested to removeReplicasFromIsr.

getTopicPartitionStatesFromZk Internal Method

getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition]):
  (Map[TopicPartition, LeaderAndIsr],
   Seq[TopicPartition],
   Map[TopicPartition, Exception])

getTopicPartitionStatesFromZk…​FIXME

Note
getTopicPartitionStatesFromZk is used exclusively when ZkReplicaStateMachine is requested to doRemoveReplicasFromIsr.

logFailedStateChange Internal Method

logFailedStateChange(
  replica: PartitionAndReplica,
  currState: ReplicaState,
  targetState: ReplicaState,
  t: Throwable): Unit

logFailedStateChange…​FIXME

Note
logFailedStateChange is used when ZkReplicaStateMachine is requested to…​FIXME

logInvalidTransition Internal Method

logInvalidTransition(
  replica: PartitionAndReplica,
  targetState: ReplicaState): Unit

logInvalidTransition…​FIXME

Note
logInvalidTransition is used exclusively when ZkReplicaStateMachine is requested to doHandleStateChanges.

logSuccessfulTransition Internal Method

logSuccessfulTransition(
  replicaId: Int,
  partition: TopicPartition,
  currState: ReplicaState,
  targetState: ReplicaState): Unit

logSuccessfulTransition…​FIXME

Note
logSuccessfulTransition is used when ZkReplicaStateMachine is requested to…​FIXME

removeReplicasFromIsr Internal Method

removeReplicasFromIsr(
  replicaId: Int,
  partitions: Seq[TopicPartition]):
Map[TopicPartition, LeaderIsrAndControllerEpoch]

removeReplicasFromIsr…​FIXME

Note
removeReplicasFromIsr is used exclusively when ZkReplicaStateMachine is requested to doHandleStateChanges.

results matching ""

    No results matching ""