log4j.logger.kafka.controller.ZkReplicaStateMachine=ALL
ZkReplicaStateMachine
ZkReplicaStateMachine
is a concrete ReplicaStateMachine to handle changes of the state of partition replicas (replica state changes).
ZkReplicaStateMachine
uses ControllerBrokerRequestBatch to propagate the changes to all brokers in a cluster.
ZkReplicaStateMachine
is created exclusively for KafkaController.
ZkReplicaStateMachine
uses [ReplicaStateMachine controllerId=[brokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to Refer to Logging. |
Handling State Changes of Replicas — handleStateChanges
Method
handleStateChanges(
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
Note
|
handleStateChanges is part of the ReplicaStateMachine Contract to handle state changes of replicas (replica state changes).
|
handleStateChanges
requests the ControllerBrokerRequestBatch to prepare a new batch.
handleStateChanges
groups the replicas (PartitionAndReplicas
) by ID and for every group doHandleStateChanges.
In the end, handleStateChanges
requests the ControllerBrokerRequestBatch to send controller requests to brokers.
In case of a ControllerMovedException
, handleStateChanges
prints out the following ERROR message to the logs and re-throws the exception.
Controller moved to another broker when moving some replicas to [targetState] state
In case of any other error (Throwable
), handleStateChanges
simply prints out the following ERROR message to the logs:
Error while moving some replicas to [targetState] state
handleStateChanges
does nothing and returns when there are no PartitionAndReplica
to work on (i.e. the given replicas is empty).
doHandleStateChanges
Internal Method
doHandleStateChanges(
replicaId: Int,
replicas: Seq[PartitionAndReplica],
targetState: ReplicaState): Unit
For every replica (in the replicas
), doHandleStateChanges
requests the ControllerContext to putReplicaStateIfNotExists.
doHandleStateChanges
requests the ControllerContext to checkValidReplicaStateChange (that gives valid and invalid replicas).
For every invalid replica, doHandleStateChanges
logInvalidTransition.
doHandleStateChanges
branches off per the target state (ReplicaState
): OfflineReplica,…FIXME
Note
|
doHandleStateChanges is used exclusively when ZkReplicaStateMachine is requested to handle replica state changes.
|
doRemoveReplicasFromIsr
Internal Method
doRemoveReplicasFromIsr(
replicaId: Int,
partitions: Seq[TopicPartition]):
(Map[TopicPartition, LeaderIsrAndControllerEpoch],
Seq[TopicPartition],
Map[TopicPartition, Exception])
doRemoveReplicasFromIsr
…FIXME
Note
|
doRemoveReplicasFromIsr is used exclusively when ZkReplicaStateMachine is requested to removeReplicasFromIsr.
|
getTopicPartitionStatesFromZk
Internal Method
getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition]):
(Map[TopicPartition, LeaderAndIsr],
Seq[TopicPartition],
Map[TopicPartition, Exception])
getTopicPartitionStatesFromZk
…FIXME
Note
|
getTopicPartitionStatesFromZk is used exclusively when ZkReplicaStateMachine is requested to doRemoveReplicasFromIsr.
|
logFailedStateChange
Internal Method
logFailedStateChange(
replica: PartitionAndReplica,
currState: ReplicaState,
targetState: ReplicaState,
t: Throwable): Unit
logFailedStateChange
…FIXME
Note
|
logFailedStateChange is used when ZkReplicaStateMachine is requested to…FIXME
|
logInvalidTransition
Internal Method
logInvalidTransition(
replica: PartitionAndReplica,
targetState: ReplicaState): Unit
logInvalidTransition
…FIXME
Note
|
logInvalidTransition is used exclusively when ZkReplicaStateMachine is requested to doHandleStateChanges.
|
logSuccessfulTransition
Internal Method
logSuccessfulTransition(
replicaId: Int,
partition: TopicPartition,
currState: ReplicaState,
targetState: ReplicaState): Unit
logSuccessfulTransition
…FIXME
Note
|
logSuccessfulTransition is used when ZkReplicaStateMachine is requested to…FIXME
|
removeReplicasFromIsr
Internal Method
removeReplicasFromIsr(
replicaId: Int,
partitions: Seq[TopicPartition]):
Map[TopicPartition, LeaderIsrAndControllerEpoch]
removeReplicasFromIsr
…FIXME
Note
|
removeReplicasFromIsr is used exclusively when ZkReplicaStateMachine is requested to doHandleStateChanges.
|