log4j.logger.kafka.controller.ZkReplicaStateMachine=ALL
ZkReplicaStateMachine
ZkReplicaStateMachine is a concrete ReplicaStateMachine to handle changes of the state of partition replicas (replica state changes).
ZkReplicaStateMachine uses ControllerBrokerRequestBatch to propagate the changes to all brokers in a cluster.
ZkReplicaStateMachine is created exclusively for KafkaController.
ZkReplicaStateMachine uses [ReplicaStateMachine controllerId=[brokerId]] as the logging prefix (aka logIdent).
|
Tip
|
Enable Add the following line to Refer to Logging. |
Handling State Changes of Replicas — handleStateChanges Method
handleStateChanges(
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
|
Note
|
handleStateChanges is part of the ReplicaStateMachine Contract to handle state changes of replicas (replica state changes).
|
handleStateChanges requests the ControllerBrokerRequestBatch to prepare a new batch.
handleStateChanges groups the replicas (PartitionAndReplicas) by ID and for every group doHandleStateChanges.
In the end, handleStateChanges requests the ControllerBrokerRequestBatch to send controller requests to brokers.
In case of a ControllerMovedException, handleStateChanges prints out the following ERROR message to the logs and re-throws the exception.
Controller moved to another broker when moving some replicas to [targetState] state
In case of any other error (Throwable), handleStateChanges simply prints out the following ERROR message to the logs:
Error while moving some replicas to [targetState] state
handleStateChanges does nothing and returns when there are no PartitionAndReplica to work on (i.e. the given replicas is empty).
doHandleStateChanges Internal Method
doHandleStateChanges(
replicaId: Int,
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
For every replica (in the replicas), doHandleStateChanges requests the ControllerContext to putReplicaStateIfNotExists.
doHandleStateChanges requests the ControllerContext to checkValidReplicaStateChange (that gives valid and invalid replicas).
For every invalid replica, doHandleStateChanges logInvalidTransition.
doHandleStateChanges branches off per the target state (ReplicaState): OfflineReplica,…FIXME
|
Note
|
doHandleStateChanges is used exclusively when ZkReplicaStateMachine is requested to handle replica state changes.
|
doRemoveReplicasFromIsr Internal Method
doRemoveReplicasFromIsr(
replicaId: Int,
partitions: Seq[TopicPartition]):
(Map[TopicPartition, LeaderIsrAndControllerEpoch],
Seq[TopicPartition],
Map[TopicPartition, Exception])
doRemoveReplicasFromIsr…FIXME
|
Note
|
doRemoveReplicasFromIsr is used exclusively when ZkReplicaStateMachine is requested to removeReplicasFromIsr.
|
getTopicPartitionStatesFromZk Internal Method
getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition]):
(Map[TopicPartition, LeaderAndIsr],
Seq[TopicPartition],
Map[TopicPartition, Exception])
getTopicPartitionStatesFromZk…FIXME
|
Note
|
getTopicPartitionStatesFromZk is used exclusively when ZkReplicaStateMachine is requested to doRemoveReplicasFromIsr.
|
logFailedStateChange Internal Method
logFailedStateChange(
replica: PartitionAndReplica,
currState: ReplicaState,
targetState: ReplicaState,
t: Throwable): Unit
logFailedStateChange…FIXME
|
Note
|
logFailedStateChange is used when ZkReplicaStateMachine is requested to…FIXME
|
logInvalidTransition Internal Method
logInvalidTransition(
replica: PartitionAndReplica,
targetState: ReplicaState): Unit
logInvalidTransition…FIXME
|
Note
|
logInvalidTransition is used exclusively when ZkReplicaStateMachine is requested to doHandleStateChanges.
|
logSuccessfulTransition Internal Method
logSuccessfulTransition(
replicaId: Int,
partition: TopicPartition,
currState: ReplicaState,
targetState: ReplicaState): Unit
logSuccessfulTransition…FIXME
|
Note
|
logSuccessfulTransition is used when ZkReplicaStateMachine is requested to…FIXME
|
removeReplicasFromIsr Internal Method
removeReplicasFromIsr(
replicaId: Int,
partitions: Seq[TopicPartition]):
Map[TopicPartition, LeaderIsrAndControllerEpoch]
removeReplicasFromIsr…FIXME
|
Note
|
removeReplicasFromIsr is used exclusively when ZkReplicaStateMachine is requested to doHandleStateChanges.
|