KafkaController
KafkaController is the default and only known ControllerEventProcessor to process and preempt controller events.
KafkaController is in one of the ControllerStates (that is the state of the ControllerEventManager).
KafkaController uses the KafkaZkClient to be notified about changes in the state of a Kafka cluster (that are reflected in changes in znodes of Apache Zookeeper) and propagate the state changes to other brokers.
KafkaController uses listeners as a notification system to monitor znodes in Zookeeper and react accordingly.
KafkaController emulates a state machine using controller events.
| Event | ControllerState | process Handler |
|---|---|---|
ApiPartitionReassignment |
||
AutoPreferredReplicaLeaderElection |
||
BrokerChange |
||
BrokerModifications |
||
ControlledShutdown
|
||
ControllerChange
|
|
|
Expire |
||
ListPartitionReassignments |
||
LogDirEventNotification |
||
PartitionModifications |
||
PartitionReassignmentIsrChange |
||
Reelect |
|
|
RegisterBrokerAndReelect |
||
ReplicaLeaderElection |
||
ShutdownEventThread |
||
Startup |
||
TopicChange |
||
TopicDeletion |
||
TopicDeletionStopReplicaResponseReceived |
||
TopicUncleanLeaderElectionEnable |
||
UncleanLeaderElectionEnable |
||
ZkPartitionReassignment |
| Name | Description |
|---|---|
|
On |
|
On |
|
On |
|
On |
|
On |
|
On |
|
On |
|
On |
| Listener | Description |
|---|---|
|
|
Registered in registerIsrChangeNotificationListener when De-registered in deregisterIsrChangeNotificationListener when |
|
|
|
|
|
|
|
|
|
Registered in registerTopicDeletionListener when De-registered in deregisterTopicDeletionListener when |
KafkaController uses [Controller id=[brokerId]] as the logging prefix (aka logIdent).
|
Tip
|
Enable Add the following line to
Refer to Logging. Please note that Kafka comes with a preconfigured
That means that the logs of |
Creating KafkaController Instance
KafkaController takes the following to be created:
KafkaController initializes the internal properties.
KafkaController and ControllerContext
controllerContext: ControllerContext
When created, KafkaController creates a new ControllerContext.
KafkaController and ControllerChannelManager
controllerChannelManager: ControllerChannelManager
When created, KafkaController creates a new ControllerChannelManager.
ControllerChannelManager is used to create separate ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine.
ControllerChannelManager is requested to start up when KafkaController is requested to start controller election (and a broker is successfully elected as the active controller).
KafkaController uses the ControllerChannelManager to add or remove brokers when processing broker changes in Zookeeper (a new or updated znode under /brokers/ids path).
ControllerChannelManager is requested to shut down when KafkaController is requested to resign as the active controller.
KafkaController and ReplicaStateMachine (ZkReplicaStateMachine)
replicaStateMachine: ReplicaStateMachine
When created, KafkaController creates a new ZkReplicaStateMachine.
ZkReplicaStateMachine is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.
ZkReplicaStateMachine is requested to handle state changes of partition replicas at the following events:
-
onBrokerLogDirFailure to transition replicas to
OnlineReplicastate -
onBrokerStartup to transition replicas to
OnlineReplicastate -
onReplicasBecomeOffline to transition replicas to
OfflineReplicastate -
onNewPartitionCreation to transition replicas to
NewReplicastate first and then toOnlineReplica -
onPartitionReassignment to transition replicas to
OnlineReplicastate -
stopOldReplicasOfReassignedPartition to transition replicas to
OfflineReplicastate first and then toReplicaDeletionStarted,ReplicaDeletionSuccessful, andNonExistentReplicain the end -
startNewReplicasForReassignedPartition to transition replicas to
NewReplicastate -
doControlledShutdown to transition replicas to
OfflineReplicastate
KafkaController uses the ZkReplicaStateMachine to create the TopicDeletionManager.
KafkaController and PartitionStateMachine (ZkPartitionStateMachine)
partitionStateMachine: PartitionStateMachine
When created, KafkaController creates a new ZkPartitionStateMachine.
ZkPartitionStateMachine is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.
ZkPartitionStateMachine is requested to triggerOnlinePartitionStateChange at the following events:
ZkPartitionStateMachine is requested to handleStateChanges at the following events:
-
onNewPartitionCreation to transition partitions to
NewPartitionstate first and then toOnlinePartition -
onPreferredReplicaElection to transition partitions to
OnlinePartition -
moveReassignedPartitionLeaderIfRequired to transition partitions to
OnlinePartition -
doControlledShutdown to transition partitions to
OnlinePartitionstate
KafkaController uses the ZkPartitionStateMachine to create the TopicDeletionManager.
Preempting Controller Events — preempt Method
preempt(event: ControllerEvent): Unit
|
Note
|
preempt is part of the ControllerEventProcessor Contract to preempt controller events.
|
preempt…FIXME
Processing Controller Events — process Method
process(event: ControllerEvent): Unit
|
Note
|
process is part of the ControllerEventProcessor Contract to process controller events.
|
process handles the ControllerEvent using ControllerEvent handlers.
| Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
In the end, process updateMetrics.
In case of a ControllerMovedException, process prints out the following INFO message to the logs and maybeResign.
Controller moved to another broker when processing [event].
In case of any error (Throwable), process simply prints out the following ERROR message to the logs:
Error processing event [event]
initiateReassignReplicasForTopicPartition Method
initiateReassignReplicasForTopicPartition
initiateReassignReplicasForTopicPartition…FIXME
|
Note
|
initiateReassignReplicasForTopicPartition is used when…FIXME
|
deregisterPartitionReassignmentIsrChangeListeners Method
deregisterPartitionReassignmentIsrChangeListeners
deregisterPartitionReassignmentIsrChangeListeners…FIXME
|
Note
|
deregisterPartitionReassignmentIsrChangeListeners is used when…FIXME
|
resetControllerContext Method
resetControllerContext
resetControllerContext…FIXME
|
Note
|
resetControllerContext is used when…FIXME
|
deregisterBrokerChangeListener Method
deregisterBrokerChangeListener
deregisterBrokerChangeListener…FIXME
|
Note
|
deregisterBrokerChangeListener is used when…FIXME
|
deregisterTopicChangeListener Method
deregisterTopicChangeListener
deregisterTopicChangeListener…FIXME
|
Note
|
deregisterTopicChangeListener is used when…FIXME
|
Resigning As Active Controller — onControllerResignation Method
onControllerResignation(): Unit
onControllerResignation starts by printing out the following DEBUG message to the logs:
Resigning
onControllerResignation unsubscribes from intercepting Zookeeper events for the following znodes in order:
onControllerResignation requests TopicDeletionManager to reset.
onControllerResignation requests KafkaScheduler to shutdown.
onControllerResignation resets the following internal counters:
onControllerResignation deregisterPartitionReassignmentIsrChangeListeners.
onControllerResignation requests PartitionStateMachine to shutdown.
onControllerResignation deregisterTopicChangeListener.
onControllerResignation deregisterPartitionModificationsListener every listener in partitionModificationsListeners.
onControllerResignation deregisterTopicDeletionListener.
onControllerResignation requests ReplicaStateMachine to shutdown.
onControllerResignation deregisterBrokerChangeListener.
onControllerResignation resetControllerContext.
In the end, onControllerResignation prints out the following DEBUG message to the logs:
Resigned
|
Note
|
|
Unsubscribing from Child Changes to /isr_change_notification ZNode — deregisterIsrChangeNotificationListener Internal Method
deregisterIsrChangeNotificationListener(): Unit
deregisterIsrChangeNotificationListener prints out the following DEBUG message to the logs:
De-registering IsrChangeNotificationListener
deregisterIsrChangeNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /isr_change_notification znode with IsrChangeNotificationListener.
|
Note
|
deregisterIsrChangeNotificationListener is used exclusively when KafkaController resigns as the active controller.
|
Unsubscribing from Child Changes to /log_dir_event_notification ZNode — deregisterLogDirEventNotificationListener Internal Method
deregisterLogDirEventNotificationListener(): Unit
deregisterLogDirEventNotificationListener prints out the following DEBUG message to the logs:
De-registering logDirEventNotificationListener
deregisterLogDirEventNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /log_dir_event_notification znode with LogDirEventNotificationListener.
|
Note
|
deregisterLogDirEventNotificationListener is used exclusively when KafkaController resigns as the active controller.
|
Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode — deregisterPreferredReplicaElectionListener Method
deregisterPreferredReplicaElectionListener(): Unit
deregisterPreferredReplicaElectionListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/preferred_replica_election znode with PreferredReplicaElectionListener.
|
Note
|
deregisterPreferredReplicaElectionListener is used exclusively when KafkaController resigns as the active controller.
|
Unsubscribing from Data Changes to /admin/reassign_partitions ZNode — deregisterPartitionReassignmentListener Method
deregisterPartitionReassignmentListener(): Unit
deregisterPartitionReassignmentListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/reassign_partitions znode with PartitionReassignmentListener.
|
Note
|
deregisterPartitionReassignmentListener is used exclusively when KafkaController resigns as the active controller.
|
triggerControllerMove Internal Method
triggerControllerMove(): Unit
triggerControllerMove…FIXME
|
Note
|
triggerControllerMove is used when KafkaController is requested to handleIllegalState and elect an active controller (and failed).
|
handleIllegalState Internal Method
handleIllegalState(e: IllegalStateException): Nothing
handleIllegalState…FIXME
|
Note
|
handleIllegalState is used when KafkaController catches an IllegalStateException in updateLeaderEpochAndSendRequest, sendUpdateMetadataRequest and when processing a ControlledShutdown event.
|
sendUpdateMetadataRequest Method
sendUpdateMetadataRequest(): Unit
sendUpdateMetadataRequest requests the ControllerBrokerRequestBatch to newBatch and addUpdateMetadataRequestForBrokers.
In the end, sendUpdateMetadataRequest requests the ControllerBrokerRequestBatch to sendRequestsToBrokers with the current epoch.
In case of IllegalStateException, sendUpdateMetadataRequest handleIllegalState (that triggers controller movement).
|
Note
|
|
updateLeaderEpochAndSendRequest Internal Method
updateLeaderEpochAndSendRequest(
partition: TopicPartition,
replicasToReceiveRequest: Seq[Int],
newAssignedReplicas: Seq[Int]): Unit
updateLeaderEpochAndSendRequest updates leader epoch for the partition and branches off per result: a LeaderIsrAndControllerEpoch or none at all.
updateLeaderEpochAndSendRequest and LeaderIsrAndControllerEpoch
When updating leader epoch for the partition returns a LeaderIsrAndControllerEpoch, updateLeaderEpochAndSendRequest requests the ControllerBrokerRequestBatch to prepare a new batch. updateLeaderEpochAndSendRequest requests the ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers followed by sendRequestsToBrokers.
In the end, updateLeaderEpochAndSendRequest prints out the following TRACE message to the logs:
Sent LeaderAndIsr request [updatedLeaderIsrAndControllerEpoch] with new assigned replica list [newAssignedReplicas] to leader [leader] for partition being reassigned [partition]
updateLeaderEpochAndSendRequest and No LeaderIsrAndControllerEpoch
When updating leader epoch for the partition returns None, updateLeaderEpochAndSendRequest prints out the following ERROR message to the logs:
Failed to send LeaderAndIsr request with new assigned replica list [newAssignedReplicas] to leader for partition being reassigned [partition]
|
Note
|
updateLeaderEpochAndSendRequest is used when KafkaController is requested to onPartitionReassignment and moveReassignedPartitionLeaderIfRequired.
|
Shutting Down — shutdown Method
shutdown(): Unit
shutdown requests the ControllerEventManager to close followed by onControllerResignation.
|
Note
|
shutdown is used exclusively when KafkaServer is requested to shutdown.
|
updateMetrics Internal Method
updateMetrics(): Unit
updateMetrics…FIXME
|
Note
|
updateMetrics is used exclusively when KafkaController is created (and creates the ControllerEventManager).
|
onBrokerStartup Method
onBrokerStartup(
newBrokers: Seq[Int]): Unit
onBrokerStartup prints out the following INFO message to the logs:
New broker startup callback for [newBrokers]
onBrokerStartup requests the ControllerContext for the replicasOnOfflineDirs and removes the given broker IDs (in newBrokers).
onBrokerStartup sendUpdateMetadataRequest to the liveOrShuttingDownBrokerIds (of the ControllerContext).
onBrokerStartup requests the ControllerContext for the replicas on the given newBrokers.
onBrokerStartup requests the ReplicaStateMachine to handleStateChanges for the replicas on the new brokers and OnlineReplica target state.
onBrokerStartup requests the PartitionStateMachine to triggerOnlinePartitionStateChange.
onBrokerStartup requests the ControllerContext for the partitionsBeingReassigned and collects the partitions that have replicas on the new brokers. For every partition with a replica on the new brokers, onBrokerStartup onPartitionReassignment.
onBrokerStartup collects replicas (on the new brokers) that are scheduled to be deleted by requesting the TopicDeletionManager to see whether isTopicQueuedUpForDeletion. If there are any, onBrokerStartup prints out the following INFO message to the logs and requests the TopicDeletionManager to resumeDeletionForTopics.
Some replicas [replicasForTopicsToBeDeleted] for topics scheduled for deletion [topicsToBeDeleted] are on the newly restarted brokers [newBrokers]. Signaling restart of topic deletion for these topics
In the end, onBrokerStartup registerBrokerModificationsHandler for the new brokers.
|
Note
|
onBrokerStartup is used exclusively when KafkaController is requested to process a BrokerChange controller event.
|
Controller Election — elect Method
elect(): Unit
elect requests the KafkaZkClient for the active controller ID (or assumes -1 if not available) and saves it to the activeControllerId internal registry.
elect stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:
Broker [activeControllerId] has been elected as the controller, so stopping the election process.
Otherwise, with no active controller, elect requests the KafkaZkClient to registerControllerAndIncrementControllerEpoch (with the broker ID).
elect saves the controller epoch and the zookeeper epoch as the epoch and epochZkVersion of the ControllerContext, respectively.
elect saves the broker ID as the activeControllerId internal registry.
elect prints out the following INFO message to the logs:
[brokerId] successfully elected as the controller. Epoch incremented to [epoch] and epoch zk version is now [epochZkVersion]
In the end, elect onControllerFailover.
|
Note
|
elect is used when ControllerEventThread is requested to process Startup and Reelect controller events.
|
elect and ControllerMovedException
In case of a ControllerMovedException, elect maybeResign and prints out either DEBUG or WARN message to the logs per the activeControllerId internal registry:
Broker [activeControllerId] was elected as controller instead of broker [brokerId]
A controller has been elected but just resigned, this will result in another round of election
elect and Throwable
In case of a Throwable, elect prints out the following ERROR message to the logs and triggerControllerMove.
Error while electing or becoming controller on broker [brokerId]. Trigger controller movement immediately
Is Broker The Active Controller? — isActive Method
isActive: Boolean
isActive indicates whether the current broker (by the broker ID) hosts the active KafkaController (given the activeControllerId) or not.
|
Note
|
isActive is on (true) after the KafkaController of a Kafka broker has been elected.
|
|
Note
|
|
registerIsrChangeNotificationListener Internal Method
registerIsrChangeNotificationListener(): Option[Seq[String]]
registerIsrChangeNotificationListener…FIXME
|
Note
|
registerIsrChangeNotificationListener is used when…FIXME
|
deregisterIsrChangeNotificationListener Internal Method
deregisterIsrChangeNotificationListener(): Unit
deregisterIsrChangeNotificationListener…FIXME
|
Note
|
deregisterIsrChangeNotificationListener is used when…FIXME
|
Starting Up — startup Method
startup(): Unit
startup requests the KafkaZkClient to register a StateChangeHandler (under the name controller-state-change-handler) that is does the following:
-
On
afterInitializingSession, theStateChangeHandlersimply putsRegisterBrokerAndReelectevent on the event queue of the ControllerEventManager -
On
beforeInitializingSession, theStateChangeHandlersimply putsExpireevent on the event queue of the ControllerEventManager
startup then puts Startup event at the end of the event queue of the ControllerEventManager and immediately requests it to start.
|
Note
|
startup is used exclusively when KafkaServer is requested to start.
|
Registering SessionExpirationListener To Control Session Recreation — registerSessionExpirationListener Internal Method
registerSessionExpirationListener(): Unit
registerSessionExpirationListener requests ZkUtils to subscribe to state changes with a SessionExpirationListener (with the KafkaController and ControllerEventManager).
|
Note
|
SessionExpirationListener puts Reelect event on the event queue of ControllerEventManager every time the Zookeeper session has expired and a new session has been created.
|
Registering ControllerChangeListener for /controller ZNode Changes — registerControllerChangeListener Internal Method
registerControllerChangeListener(): Unit
registerControllerChangeListener requests ZkUtils to subscribe to data changes for /controller znode with a ControllerChangeListener (with the KafkaController and ControllerEventManager).
|
Note
|
|
registerBrokerChangeListener Internal Method
registerBrokerChangeListener(): Option[Seq[String]]
registerBrokerChangeListener requests ZkUtils to subscribeChildChanges for /brokers/ids path with BrokerChangeListener.
|
Note
|
registerBrokerChangeListener is used exclusively when KafkaController does onControllerFailover.
|
Getting Active Controller ID (from JSON under /controller znode) — getControllerID Method
getControllerID(): Int
getControllerID returns the ID of the active Kafka controller that is associated with /controller znode in JSON format or -1 otherwise.
Internally, getControllerID requests ZkUtils for data associated with /controller znode.
If available, getControllerID parses the data (being the current controller info in JSON format) to extract brokerid field.
$ ./bin/zookeeper-shell.sh :2181 get /controller
{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0
Otherwise, when no /controller znode is available, getControllerID returns -1.
|
Note
|
|
Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — registerTopicDeletionListener Internal Method
registerTopicDeletionListener(): Option[Seq[String]]
registerTopicDeletionListener requests ZkUtils to subscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.
|
Note
|
registerTopicDeletionListener is used exclusively when KafkaController does onControllerFailover.
|
De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — deregisterTopicDeletionListener Internal Method
deregisterTopicDeletionListener(): Unit
deregisterTopicDeletionListener requests ZkUtils to unsubscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.
|
Note
|
deregisterTopicDeletionListener is used exclusively when KafkaController resigns as the active controller.
|
onReplicasBecomeOffline Internal Method
onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit
onReplicasBecomeOffline…FIXME
|
Note
|
onReplicasBecomeOffline is used when…FIXME
|
onPartitionReassignment Internal Method
onPartitionReassignment(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext): Unit
onPartitionReassignment…FIXME
|
Note
|
onPartitionReassignment is used when KafkaController is requested to onBrokerStartup, maybeTriggerPartitionReassignment and process a PartitionReassignmentIsrChange event.
|
onBrokerUpdate Internal Method
onBrokerUpdate(updatedBrokerId: Int): Unit
onBrokerUpdate…FIXME
|
Note
|
onBrokerUpdate is used when…FIXME
|
updateBrokerInfo Internal Method
updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit
updateBrokerInfo…FIXME
|
Note
|
updateBrokerInfo is used exclusively when DynamicListenerConfig is requested to reconfigure.
|
registerBrokerModificationsHandler Internal Method
registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit
registerBrokerModificationsHandler…FIXME
|
Note
|
registerBrokerModificationsHandler is used when KafkaController is requested to onBrokerStartup and onControllerFailover (indirectly through initializeControllerContext).
|
Initializing ControllerContext — initializeControllerContext Internal Method
initializeControllerContext(): Unit
initializeControllerContext…FIXME
In the end, initializeControllerContext prints out the following INFO messages to the logs (with the current state based on the ControllerContext):
Currently active brokers in the cluster: [liveBrokerIds] Currently shutting brokers in the cluster: [shuttingDownBrokerIds] Current list of topics in the cluster: [allTopics]
|
Note
|
initializeControllerContext is used exclusively when KafkaController is requested to onControllerFailover.
|
unregisterBrokerModificationsHandler Internal Method
unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit
unregisterBrokerModificationsHandler…FIXME
|
Note
|
unregisterBrokerModificationsHandler is used when KafkaController is requested to onControllerResignation and onBrokerFailure.
|
onBrokerFailure Internal Method
onBrokerFailure(deadBrokers: Seq[Int]): Unit
onBrokerFailure…FIXME
|
Note
|
onBrokerFailure is used exclusively when KafkaController is requested to handle a BrokerChange controller event.
|
maybeTriggerPartitionReassignment Internal Method
maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]): Unit
maybeTriggerPartitionReassignment…FIXME
|
Note
|
maybeTriggerPartitionReassignment is used when KafkaController is requested to onControllerFailover and process the PartitionReassignment controller event.
|
incrementControllerEpoch Internal Method
incrementControllerEpoch(): Unit
incrementControllerEpoch…FIXME
|
Note
|
incrementControllerEpoch is used exclusively when KafkaController is requested to onControllerFailover.
|
fetchPendingPreferredReplicaElections Internal Method
fetchPendingPreferredReplicaElections(): Set[TopicPartition]
fetchPendingPreferredReplicaElections…FIXME
|
Note
|
fetchPendingPreferredReplicaElections is used exclusively when KafkaController is requested to onControllerFailover.
|
initializePartitionReassignment Internal Method
initializePartitionReassignment(): Unit
initializePartitionReassignment…FIXME
|
Note
|
initializePartitionReassignment is used exclusively when KafkaController is requested to initializeControllerContext.
|
fetchTopicDeletionsInProgress Internal Method
fetchTopicDeletionsInProgress(): (Set[String], Set[String])
fetchTopicDeletionsInProgress…FIXME
|
Note
|
fetchTopicDeletionsInProgress is used exclusively when KafkaController is requested to onControllerFailover.
|
updateLeaderAndIsrCache Internal Method
updateLeaderAndIsrCache(partitions: Seq[TopicPartition]
Unless given, updateLeaderAndIsrCache defaults to allPartitions of the ControllerContext for the partitions.
updateLeaderAndIsrCache requests the KafkaZkClient to getTopicPartitionStates (with the given partitions) and updates the partitionLeadershipInfo of the ControllerContext.
|
Note
|
updateLeaderAndIsrCache is used when KafkaController is requested to initializeControllerContext (with no partitions) and process an IsrChangeNotification controller event (with partitions given).
|
areReplicasInIsr Internal Method
areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean
areReplicasInIsr…FIXME
|
Note
|
areReplicasInIsr is used exclusively when KafkaController is requested to onPartitionReassignment.
|
updateAssignedReplicasForPartition Internal Method
updateAssignedReplicasForPartition(
partition: TopicPartition,
replicas: Seq[Int]): Unit
updateAssignedReplicasForPartition…FIXME
|
Note
|
updateAssignedReplicasForPartition is used exclusively when KafkaController is requested to onPartitionReassignment.
|
registerPartitionModificationsHandlers Internal Method
registerPartitionModificationsHandlers(topics: Seq[String]): Unit
registerPartitionModificationsHandlers…FIXME
|
Note
|
registerPartitionModificationsHandlers is used when KafkaController is requested to initializeControllerContext and a TopicChange controller event is processed.
|
unregisterPartitionModificationsHandlers Internal Method
unregisterPartitionModificationsHandlers(topics: Seq[String]): Unit
unregisterPartitionModificationsHandlers…FIXME
|
Note
|
|
unregisterPartitionReassignmentIsrChangeHandlers Internal Method
unregisterPartitionReassignmentIsrChangeHandlers(): Unit
unregisterPartitionReassignmentIsrChangeHandlers…FIXME
|
Note
|
unregisterPartitionReassignmentIsrChangeHandlers is used exclusively when KafkaController is requested to onControllerResignation.
|
readControllerEpochFromZooKeeper Internal Method
readControllerEpochFromZooKeeper(): Unit
readControllerEpochFromZooKeeper…FIXME
|
Note
|
readControllerEpochFromZooKeeper is used exclusively when KafkaController is requested to onControllerFailover.
|
removePartitionsFromReassignedPartitions Internal Method
removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]): Unit
removePartitionsFromReassignedPartitions…FIXME
|
Note
|
removePartitionsFromReassignedPartitions is used when KafkaController is requested to onPartitionReassignment and maybeTriggerPartitionReassignment.
|
removePartitionsFromPreferredReplicaElection Internal Method
removePartitionsFromPreferredReplicaElection(
partitionsToBeRemoved: Set[TopicPartition],
isTriggeredByAutoRebalance : Boolean): Unit
removePartitionsFromPreferredReplicaElection…FIXME
|
Note
|
removePartitionsFromPreferredReplicaElection is used exclusively when KafkaController is requested to onPreferredReplicaElection (and the election type is not AdminClientTriggered).
|
Preferred Replica Leader Election — onPreferredReplicaElection Internal Method
onPreferredReplicaElection(
partitions: Set[TopicPartition],
electionType: ElectionType): Map[TopicPartition, Throwable]
onPreferredReplicaElection prints out the following INFO message to the logs:
Starting preferred replica leader election for partitions [partitions]
onPreferredReplicaElection requests the PartitionStateMachine to handle partition state changes for the partitions (with OnlinePartition target state and PreferredReplicaPartitionLeaderElectionStrategy).
(only for election types that are not AdminClientTriggered) In the end, onPreferredReplicaElection removePartitionsFromPreferredReplicaElection.
(only for election types that are not AdminClientTriggered) In case of an error
|
Note
|
|
updateLeaderEpoch Internal Method
updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]
updateLeaderEpoch…FIXME
|
Note
|
updateLeaderEpoch is used exclusively when KafkaController is requested to updateLeaderEpochAndSendRequest.
|
moveReassignedPartitionLeaderIfRequired Internal Method
moveReassignedPartitionLeaderIfRequired(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext): Unit
moveReassignedPartitionLeaderIfRequired…FIXME
|
Note
|
moveReassignedPartitionLeaderIfRequired is used exclusively when KafkaController is requested to onPartitionReassignment.
|
onControllerFailover Internal Method
onControllerFailover(): Unit
onControllerFailover prints out the following INFO message to the logs:
Registering handlers
onControllerFailover requests the KafkaZkClient to registerZNodeChildChangeHandlers:
onControllerFailover requests the KafkaZkClient to registerZNodeChangeHandlerAndCheckExistence:
onControllerFailover prints out the following INFO message to the logs:
Deleting log dir event notifications
onControllerFailover requests the KafkaZkClient to deleteLogDirEventNotifications (with the epochZkVersion of the ControllerContext).
onControllerFailover prints out the following INFO message to the logs:
Deleting isr change notifications
onControllerFailover requests the KafkaZkClient to deleteIsrChangeNotifications (with the epochZkVersion of the ControllerContext).
onControllerFailover prints out the following INFO message to the logs:
Initializing controller context
onControllerFailover initializeControllerContext.
onControllerFailover prints out the following INFO message to the logs:
Fetching topic deletions in progress
onControllerFailover fetchTopicDeletionsInProgress.
onControllerFailover prints out the following INFO message to the logs:
Initializing topic deletion manager
onControllerFailover requests the TopicDeletionManager to initialize (with the topics to be deleted and ineligible for deletion).
onControllerFailover prints out the following INFO message to the logs:
Sending update metadata request
onControllerFailover sendUpdateMetadataRequest (with the liveOrShuttingDownBrokerIds of the ControllerContext).
onControllerFailover requests the ReplicaStateMachine to start up.
onControllerFailover requests the PartitionStateMachine to start up.
onControllerFailover prints out the following INFO message to the logs:
Ready to serve as the new controller with epoch [epoch]
onControllerFailover maybeTriggerPartitionReassignment (with the partitionsBeingReassigned of the ControllerContext).
onControllerFailover requests the TopicDeletionManager to tryTopicDeletion.
onControllerFailover onPreferredReplicaElection with the fetchPendingPreferredReplicaElections.
onControllerFailover prints out the following INFO message to the logs:
Starting the controller scheduler
onControllerFailover requests the KafkaScheduler to startup.
With auto.leader.rebalance.enable enabled (default: true), onControllerFailover scheduleAutoLeaderRebalanceTask with the initial delay of 5 seconds.
With delegation.token.master.key password set (default: (empty)), onControllerFailover prints out the following INFO message to the logs:
starting the token expiry check scheduler
onControllerFailover requests the tokenCleanScheduler KafkaScheduler to startup and requests it to schedule the delete-expired-tokens task (FIXME).
|
Note
|
onControllerFailover is used when KafkaController is requested to elect (and a broker is successfully elected as the active controller).
|
scheduleAutoLeaderRebalanceTask Internal Method
scheduleAutoLeaderRebalanceTask(
delay: Long,
unit: TimeUnit): Unit
scheduleAutoLeaderRebalanceTask simply requests the KafkaScheduler to schedule a task called auto-leader-rebalance-task with the given initial delay.
The auto-leader-rebalance-task simply requests the ControllerEventManager to enqueue an AutoPreferredReplicaLeaderElection controller event.
|
Note
|
scheduleAutoLeaderRebalanceTask is used when KafkaController is requested to onControllerFailover and processAutoPreferredReplicaLeaderElection
|
processAutoPreferredReplicaLeaderElection Internal Method
processAutoPreferredReplicaLeaderElection(): Unit
|
Note
|
processAutoPreferredReplicaLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.
|
processAutoPreferredReplicaLeaderElection prints out the following INFO message to the logs:
Processing automatic preferred replica leader election
processAutoPreferredReplicaLeaderElection checkAndTriggerAutoLeaderRebalance.
In the end, processAutoPreferredReplicaLeaderElection scheduleAutoLeaderRebalanceTask with the initial delay based on leader.imbalance.check.interval.seconds configuration property (default: 300 seconds).
|
Note
|
processAutoPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to process a AutoPreferredReplicaLeaderElection event.
|
checkAndTriggerAutoLeaderRebalance Internal Method
checkAndTriggerAutoLeaderRebalance(): Unit
checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:
Checking need to trigger auto leader balancing
checkAndTriggerAutoLeaderRebalance…FIXME
checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:
Preferred replicas by broker [preferredReplicasForTopicsByBrokers]
For every broker with one or more partition leaders, checkAndTriggerAutoLeaderRebalance…FIXME
checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:
Topics not in preferred replica for broker [leaderBroker] [topicsNotInPreferredReplica]
checkAndTriggerAutoLeaderRebalance calculates an imbalance ratio of the broker which is the number of topicsNotInPreferredReplica divided by the total number of partitions (topicPartitionsForBroker).
checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:
Leader imbalance ratio for broker [leaderBroker] is [imbalanceRatio]
With the imbalance ratio greater than the desired ratio (per leader.imbalance.per.broker.percentage configuration property with the default: 10%), checkAndTriggerAutoLeaderRebalance onPreferredReplicaElection for…FIXME (with AutoTriggered election type).
|
Note
|
checkAndTriggerAutoLeaderRebalance is used exclusively when KafkaController is requested to processAutoPreferredReplicaLeaderElection.
|
onNewPartitionCreation Internal Method
onNewPartitionCreation(
newPartitions: Set[TopicPartition]): Unit
onNewPartitionCreation…FIXME
|
Note
|
onNewPartitionCreation is used when TopicChange and PartitionModifications controller events are processed.
|
onReplicaElection Internal Method
onReplicaElection(
partitions: Set[TopicPartition],
electionType: ElectionType,
electionTrigger: ElectionTrigger): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
onReplicaElection…FIXME
|
Note
|
onReplicaElection is used when…FIXME
|
Handling Log Directory Failures for Brokers — onBrokerLogDirFailure Internal Method
onBrokerLogDirFailure(
brokerIds: Seq[Int]): Unit
onBrokerLogDirFailure prints out the following INFO message to the logs:
Handling log directory failure for brokers [brokerIds]
onBrokerLogDirFailure requests the ControllerContext for the replicas on the brokers and then requests the ReplicaStateMachine to handle state changes for the replicas to enter OnlineReplica state.
|
Note
|
onBrokerLogDirFailure is used exclusively when KafkaController is requested to process a LogDirEventNotification controller event (on controller-event-thread).
|
stopOldReplicasOfReassignedPartition Internal Method
stopOldReplicasOfReassignedPartition(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
oldReplicas: Set[Int]): Unit
stopOldReplicasOfReassignedPartition…FIXME
|
Note
|
stopOldReplicasOfReassignedPartition is used when…FIXME
|
startNewReplicasForReassignedPartition Internal Method
startNewReplicasForReassignedPartition(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
newReplicas: Set[Int]): Unit
startNewReplicasForReassignedPartition…FIXME
|
Note
|
startNewReplicasForReassignedPartition is used when…FIXME
|
enableDefaultUncleanLeaderElection Method
enableDefaultUncleanLeaderElection(): Unit
|
Note
|
enableDefaultUncleanLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.
|
enableDefaultUncleanLeaderElection simply requests the ControllerEventManager to enqueue a UncleanLeaderElectionEnable event.
|
Note
|
enableDefaultUncleanLeaderElection is used when DynamicLogConfig is requested to reconfigure (for unclean.leader.election.enable configuration property).
|
Preferred Replica Leader Election — electPreferredLeaders Method
electPreferredLeaders(
partitions: Set[TopicPartition],
callback: ElectPreferredLeadersCallback = { (_, _) => }): Unit
electPreferredLeaders simply requests the ControllerEventManager to enqueue an PreferredReplicaLeaderElection event (with AdminClientTriggered election type)
|
Note
|
electPreferredLeaders is used exclusively when ReplicaManager is requested to trigger preferred replica leader election.
|
maybeResign Internal Method
maybeResign(): Unit
maybeResign…FIXME
|
Note
|
maybeResign is used when KafkaController is requested to…FIXME
|
processControlledShutdown Internal Method
processControlledShutdown(
id: Int,
brokerEpoch: Long,
controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit
processControlledShutdown…FIXME
|
Note
|
processControlledShutdown is used exclusively when KafkaController is requested to process a ControlledShutdown controller event.
|
doControlledShutdown Internal Method
doControlledShutdown(
id: Int,
brokerEpoch: Long): Set[TopicPartition]
doControlledShutdown…FIXME
|
Note
|
doControlledShutdown is used exclusively when KafkaController is requested to processControlledShutdown.
|
processUncleanLeaderElectionEnable Internal Method
processUncleanLeaderElectionEnable(): Unit
|
Note
|
processUncleanLeaderElectionEnable does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.
|
processUncleanLeaderElectionEnable prints out the following INFO message to the logs:
Unclean leader election has been enabled by default
processUncleanLeaderElectionEnable requests the PartitionStateMachine to triggerOnlinePartitionStateChange.
|
Note
|
processUncleanLeaderElectionEnable is used exclusively when KafkaController is requested to process a UncleanLeaderElectionEnable controller event.
|
processTopicUncleanLeaderElectionEnable Internal Method
processTopicUncleanLeaderElectionEnable(topic: String): Unit
processTopicUncleanLeaderElectionEnable…FIXME
|
Note
|
processTopicUncleanLeaderElectionEnable is used when KafkaController is requested to process a TopicUncleanLeaderElectionEnable controller event.
|
Processing LeaderAndIsrResponseReceived Controller Event (On controller-event-thread) — processLeaderAndIsrResponseReceived Internal Method
processLeaderAndIsrResponseReceived(
leaderAndIsrResponseObj: AbstractResponse,
brokerId: Int): Unit
processLeaderAndIsrResponseReceived…FIXME
|
Note
|
processLeaderAndIsrResponseReceived is used exclusively when KafkaController is requested to process a LeaderAndIsrResponseReceived controller event (on the controller-event-thread).
|
processTopicDeletionStopReplicaResponseReceived Internal Method
processTopicDeletionStopReplicaResponseReceived(
replicaId: Int,
requestError: Errors,
partitionErrors: Map[TopicPartition, Errors]): Unit
processTopicDeletionStopReplicaResponseReceived…FIXME
|
Note
|
processTopicDeletionStopReplicaResponseReceived is used exclusively when KafkaController is requested to process a TopicDeletionStopReplicaResponseReceived controller event.
|
Processing BrokerChange Controller Event (On controller-event-thread) — processBrokerChange Internal Method
processBrokerChange(): Unit
|
Note
|
processBrokerChange does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.
|
processBrokerChange requests the KafkaZkClient for the brokers in a Kafka cluster and compares the broker list with the current brokers (of the ControllerContext).
At this point in time, processBrokerChange knows what brokers are new, dead or bounced.
processBrokerChange prints out the following INFO message to the logs:
Newly added brokers: [ids], deleted brokers: [ids], bounced brokers: [ids], all live brokers: [ids]
processBrokerChange notifies (updates) the ControllerChannelManager:
-
For every newly-added broker,
processBrokerChangerequests to register it -
For bounced brokers,
processBrokerChangerequests to deregister them all first followed by registering them -
For every deleted broker,
processBrokerChangerequests to deregister it
processBrokerChange updates the ControllerContext:
-
For newly-added brokers (if there were any),
processBrokerChangerequests to addLiveBrokersAndEpochs followed by onBrokerStartup -
For bounced brokers (if there were any),
processBrokerChangefirst requests to remove them followed by onBrokerFailure and then requests to addLiveBrokersAndEpochs followed by onBrokerStartup -
For deleted brokers (if there were any),
processBrokerChangerequests to remove them followed by onBrokerFailure
In the end, only when they were any updates (new, dead or bounced brokers), processBrokerChange prints out the following INFO message to the logs:
Updated broker epochs cache: [liveBrokerIdAndEpochs]
|
Note
|
processBrokerChange is used exclusively when KafkaController is requested to process a BrokerChange controller event (on the controller-event-thread).
|
processBrokerModification Internal Method
processBrokerModification(
brokerId: Int): Unit
processBrokerModification…FIXME
|
Note
|
processBrokerModification is used exclusively when KafkaController is requested to process a BrokerModifications controller event.
|
processControllerChange Internal Method
processControllerChange(): Unit
processControllerChange…FIXME
|
Note
|
processControllerChange is used exclusively when KafkaController is requested to process a ControllerChange controller event.
|
processReelect Internal Method
processReelect(): Unit
processReelect…FIXME
|
Note
|
processReelect is used when KafkaController is requested to processRegisterBrokerAndReelect and process a Reelect controller event.
|
processRegisterBrokerAndReelect Internal Method
processRegisterBrokerAndReelect(): Unit
processRegisterBrokerAndReelect…FIXME
|
Note
|
processRegisterBrokerAndReelect is used exclusively when KafkaController is requested to process a RegisterBrokerAndReelect controller event.
|
processExpire Internal Method
processExpire(): Unit
processExpire…FIXME
|
Note
|
processExpire is used exclusively when KafkaController is requested to process a Expire controller event.
|
processTopicChange Internal Method
processTopicChange(): Unit
processTopicChange…FIXME
|
Note
|
processTopicChange is used exclusively when KafkaController is requested to process a TopicChange controller event.
|
Processing LogDirEventNotification Controller Event (On controller-event-thread) — processLogDirEventNotification Internal Method
processLogDirEventNotification(): Unit
|
Note
|
processLogDirEventNotification does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.
|
processLogDirEventNotification requests the KafkaZkClient for the LogDirEvent notifications (from Zookeeper) (sequence numbers).
processLogDirEventNotification requests the KafkaZkClient for the convert the LogDirEvent notifications to broker IDs and then handles the log directory failures for the brokers.
In the end, processLogDirEventNotification requests the KafkaZkClient to delete LogDirEvent notifications (from Zookeeper).
|
Note
|
processLogDirEventNotification is used exclusively when KafkaController is requested to process a LogDirEventNotification controller event.
|
processPartitionModifications Internal Method
processPartitionModifications(
topic: String): Unit
processPartitionModifications…FIXME
|
Note
|
processPartitionModifications is used exclusively when KafkaController is requested to process a PartitionModifications controller event.
|
processTopicDeletion Internal Method
processTopicDeletion(): Unit
processTopicDeletion…FIXME
|
Note
|
processTopicDeletion is used exclusively when KafkaController is requested to process a TopicDeletion controller event.
|
processPartitionReassignmentIsrChange Internal Method
processPartitionReassignmentIsrChange(
partition: TopicPartition): Unit
processPartitionReassignmentIsrChange…FIXME
|
Note
|
processPartitionReassignmentIsrChange is used exclusively when KafkaController is requested to process a PartitionReassignmentIsrChange controller event.
|
processIsrChangeNotification Internal Method
processIsrChangeNotification(): Unit
processIsrChangeNotification…FIXME
|
Note
|
processIsrChangeNotification is used exclusively when KafkaController is requested to process a IsrChangeNotification controller event.
|
processStartup Internal Method
processStartup(): Unit
processStartup requests the KafkaZkClient to registerZNodeChangeHandlerAndCheckExistence (with the ControllerChangeHandler).
In the end, processStartup starts controller election.
|
Note
|
processStartup is used exclusively when KafkaController is requested to process a Startup controller event (on the controller-event-thread).
|
processZkPartitionReassignment Internal Method
processZkPartitionReassignment(): Unit
processZkPartitionReassignment…FIXME
|
Note
|
processZkPartitionReassignment is used when KafkaController is requested to process a ZkPartitionReassignment controller event and initializePartitionReassignments (for KafkaController to process Startup and Reelect controller events).
|
initializePartitionReassignments Internal Method
initializePartitionReassignments(): Unit
initializePartitionReassignments…FIXME
|
Note
|
initializePartitionReassignments is used when KafkaController is requested to onControllerFailover (for KafkaController to process Startup and Reelect controller events).
|
processReplicaLeaderElection Internal Method
processReplicaLeaderElection(
partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
electionType: ElectionType,
electionTrigger: ElectionTrigger,
callback: ElectLeadersCallback): Unit
processReplicaLeaderElection…FIXME
|
Note
|
processReplicaLeaderElection is used when KafkaController is requested to process a ReplicaLeaderElection controller event.
|
processListPartitionReassignments Internal Method
processListPartitionReassignments(
partitionsOpt: Option[Set[TopicPartition]],
callback: ListReassignmentsCallback): Unit
processListPartitionReassignments…FIXME
|
Note
|
processListPartitionReassignments is used when KafkaController is requested to process a ListPartitionReassignments controller event.
|
processApiPartitionReassignment Internal Method
processApiPartitionReassignment(
reassignments: Map[TopicPartition, Option[Seq[Int]]],
callback: AlterReassignmentsCallback): Unit
processApiPartitionReassignment…FIXME
|
Note
|
processApiPartitionReassignment is used when KafkaController is requested to process a ApiPartitionReassignment controller event.
|
Preempting PreferredReplicaLeaderElection — preemptPreferredReplicaLeaderElection Internal Method
preemptPreferredReplicaLeaderElection(
partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
callback: ElectPreferredLeadersCallback = (_, _) => {}): Unit
preemptPreferredReplicaLeaderElection…FIXME
|
Note
|
preemptPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to preempt a PreferredReplicaLeaderElection controller event.
|
Internal Properties
| Name | Description |
|---|---|
|
|
|
|
|
A
|
|
ControllerEventManager (with rateAndTimeMetrics of the ControllerContext, the updateMetrics as the eventProcessedListener` and the maybeResign as the controllerMovedListener)
|
|
KafkaScheduler with 1 daemon thread with kafka-scheduler prefix |
|
StateChangeLogger with the broker ID and |
|
KafkaScheduler with 1 daemon thread with delegation-token-cleaner prefix |
|