KafkaController
KafkaController
is the default and only known ControllerEventProcessor to process and preempt controller events.
KafkaController
is in one of the ControllerStates (that is the state of the ControllerEventManager).
KafkaController
uses the KafkaZkClient to be notified about changes in the state of a Kafka cluster (that are reflected in changes in znodes of Apache Zookeeper) and propagate the state changes to other brokers.
KafkaController
uses listeners as a notification system to monitor znodes in Zookeeper and react accordingly.
KafkaController
emulates a state machine using controller events.
Event | ControllerState | process Handler |
---|---|---|
ApiPartitionReassignment |
||
AutoPreferredReplicaLeaderElection |
||
BrokerChange |
||
BrokerModifications |
||
ControlledShutdown
|
||
ControllerChange
|
|
|
Expire |
||
ListPartitionReassignments |
||
LogDirEventNotification |
||
PartitionModifications |
||
PartitionReassignmentIsrChange |
||
Reelect |
|
|
RegisterBrokerAndReelect |
||
ReplicaLeaderElection |
||
ShutdownEventThread |
||
Startup |
||
TopicChange |
||
TopicDeletion |
||
TopicDeletionStopReplicaResponseReceived |
||
TopicUncleanLeaderElectionEnable |
||
UncleanLeaderElectionEnable |
||
ZkPartitionReassignment |
Name | Description |
---|---|
|
On |
|
On |
|
On |
|
On |
|
On |
|
On |
|
On |
|
On |
Listener | Description |
---|---|
|
|
Registered in registerIsrChangeNotificationListener when De-registered in deregisterIsrChangeNotificationListener when |
|
|
|
|
|
|
|
|
|
Registered in registerTopicDeletionListener when De-registered in deregisterTopicDeletionListener when |
KafkaController
uses [Controller id=[brokerId]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to
Refer to Logging. Please note that Kafka comes with a preconfigured
That means that the logs of |
Creating KafkaController Instance
KafkaController
takes the following to be created:
KafkaController
initializes the internal properties.
KafkaController and ControllerContext
controllerContext: ControllerContext
When created, KafkaController
creates a new ControllerContext.
KafkaController and ControllerChannelManager
controllerChannelManager: ControllerChannelManager
When created, KafkaController
creates a new ControllerChannelManager.
ControllerChannelManager
is used to create separate ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine.
ControllerChannelManager
is requested to start up when KafkaController
is requested to start controller election (and a broker is successfully elected as the active controller).
KafkaController
uses the ControllerChannelManager
to add or remove brokers when processing broker changes in Zookeeper (a new or updated znode under /brokers/ids
path).
ControllerChannelManager
is requested to shut down when KafkaController
is requested to resign as the active controller.
KafkaController and ReplicaStateMachine (ZkReplicaStateMachine)
replicaStateMachine: ReplicaStateMachine
When created, KafkaController
creates a new ZkReplicaStateMachine.
ZkReplicaStateMachine
is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.
ZkReplicaStateMachine
is requested to handle state changes of partition replicas at the following events:
-
onBrokerLogDirFailure to transition replicas to
OnlineReplica
state -
onBrokerStartup to transition replicas to
OnlineReplica
state -
onReplicasBecomeOffline to transition replicas to
OfflineReplica
state -
onNewPartitionCreation to transition replicas to
NewReplica
state first and then toOnlineReplica
-
onPartitionReassignment to transition replicas to
OnlineReplica
state -
stopOldReplicasOfReassignedPartition to transition replicas to
OfflineReplica
state first and then toReplicaDeletionStarted
,ReplicaDeletionSuccessful
, andNonExistentReplica
in the end -
startNewReplicasForReassignedPartition to transition replicas to
NewReplica
state -
doControlledShutdown to transition replicas to
OfflineReplica
state
KafkaController
uses the ZkReplicaStateMachine
to create the TopicDeletionManager.
KafkaController and PartitionStateMachine (ZkPartitionStateMachine)
partitionStateMachine: PartitionStateMachine
When created, KafkaController
creates a new ZkPartitionStateMachine.
ZkPartitionStateMachine
is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.
ZkPartitionStateMachine
is requested to triggerOnlinePartitionStateChange at the following events:
ZkPartitionStateMachine
is requested to handleStateChanges at the following events:
-
onNewPartitionCreation to transition partitions to
NewPartition
state first and then toOnlinePartition
-
onPreferredReplicaElection to transition partitions to
OnlinePartition
-
moveReassignedPartitionLeaderIfRequired to transition partitions to
OnlinePartition
-
doControlledShutdown to transition partitions to
OnlinePartition
state
KafkaController
uses the ZkPartitionStateMachine
to create the TopicDeletionManager.
Preempting Controller Events — preempt
Method
preempt(event: ControllerEvent): Unit
Note
|
preempt is part of the ControllerEventProcessor Contract to preempt controller events.
|
preempt
…FIXME
Processing Controller Events — process
Method
process(event: ControllerEvent): Unit
Note
|
process is part of the ControllerEventProcessor Contract to process controller events.
|
process
handles the ControllerEvent using ControllerEvent handlers.
Name | Description |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
In the end, process
updateMetrics.
In case of a ControllerMovedException
, process
prints out the following INFO message to the logs and maybeResign.
Controller moved to another broker when processing [event].
In case of any error (Throwable
), process
simply prints out the following ERROR message to the logs:
Error processing event [event]
initiateReassignReplicasForTopicPartition
Method
initiateReassignReplicasForTopicPartition
initiateReassignReplicasForTopicPartition
…FIXME
Note
|
initiateReassignReplicasForTopicPartition is used when…FIXME
|
deregisterPartitionReassignmentIsrChangeListeners
Method
deregisterPartitionReassignmentIsrChangeListeners
deregisterPartitionReassignmentIsrChangeListeners
…FIXME
Note
|
deregisterPartitionReassignmentIsrChangeListeners is used when…FIXME
|
resetControllerContext
Method
resetControllerContext
resetControllerContext
…FIXME
Note
|
resetControllerContext is used when…FIXME
|
deregisterBrokerChangeListener
Method
deregisterBrokerChangeListener
deregisterBrokerChangeListener
…FIXME
Note
|
deregisterBrokerChangeListener is used when…FIXME
|
deregisterTopicChangeListener
Method
deregisterTopicChangeListener
deregisterTopicChangeListener
…FIXME
Note
|
deregisterTopicChangeListener is used when…FIXME
|
Resigning As Active Controller — onControllerResignation
Method
onControllerResignation(): Unit
onControllerResignation
starts by printing out the following DEBUG message to the logs:
Resigning
onControllerResignation
unsubscribes from intercepting Zookeeper events for the following znodes in order:
onControllerResignation
requests TopicDeletionManager to reset.
onControllerResignation
requests KafkaScheduler to shutdown.
onControllerResignation
resets the following internal counters:
onControllerResignation
deregisterPartitionReassignmentIsrChangeListeners.
onControllerResignation
requests PartitionStateMachine to shutdown.
onControllerResignation
deregisterTopicChangeListener.
onControllerResignation
deregisterPartitionModificationsListener every listener in partitionModificationsListeners.
onControllerResignation
deregisterTopicDeletionListener.
onControllerResignation
requests ReplicaStateMachine to shutdown.
onControllerResignation
deregisterBrokerChangeListener.
onControllerResignation
resetControllerContext.
In the end, onControllerResignation
prints out the following DEBUG message to the logs:
Resigned
Note
|
|
Unsubscribing from Child Changes to /isr_change_notification ZNode — deregisterIsrChangeNotificationListener
Internal Method
deregisterIsrChangeNotificationListener(): Unit
deregisterIsrChangeNotificationListener
prints out the following DEBUG message to the logs:
De-registering IsrChangeNotificationListener
deregisterIsrChangeNotificationListener
requests ZkUtils to unsubscribe from intercepting changes to /isr_change_notification
znode with IsrChangeNotificationListener.
Note
|
deregisterIsrChangeNotificationListener is used exclusively when KafkaController resigns as the active controller.
|
Unsubscribing from Child Changes to /log_dir_event_notification ZNode — deregisterLogDirEventNotificationListener
Internal Method
deregisterLogDirEventNotificationListener(): Unit
deregisterLogDirEventNotificationListener
prints out the following DEBUG message to the logs:
De-registering logDirEventNotificationListener
deregisterLogDirEventNotificationListener
requests ZkUtils to unsubscribe from intercepting changes to /log_dir_event_notification
znode with LogDirEventNotificationListener.
Note
|
deregisterLogDirEventNotificationListener is used exclusively when KafkaController resigns as the active controller.
|
Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode — deregisterPreferredReplicaElectionListener
Method
deregisterPreferredReplicaElectionListener(): Unit
deregisterPreferredReplicaElectionListener
requests ZkUtils to unsubscribe from intercepting data changes to /admin/preferred_replica_election
znode with PreferredReplicaElectionListener.
Note
|
deregisterPreferredReplicaElectionListener is used exclusively when KafkaController resigns as the active controller.
|
Unsubscribing from Data Changes to /admin/reassign_partitions ZNode — deregisterPartitionReassignmentListener
Method
deregisterPartitionReassignmentListener(): Unit
deregisterPartitionReassignmentListener
requests ZkUtils to unsubscribe from intercepting data changes to /admin/reassign_partitions
znode with PartitionReassignmentListener.
Note
|
deregisterPartitionReassignmentListener is used exclusively when KafkaController resigns as the active controller.
|
triggerControllerMove
Internal Method
triggerControllerMove(): Unit
triggerControllerMove
…FIXME
Note
|
triggerControllerMove is used when KafkaController is requested to handleIllegalState and elect an active controller (and failed).
|
handleIllegalState
Internal Method
handleIllegalState(e: IllegalStateException): Nothing
handleIllegalState
…FIXME
Note
|
handleIllegalState is used when KafkaController catches an IllegalStateException in updateLeaderEpochAndSendRequest, sendUpdateMetadataRequest and when processing a ControlledShutdown event.
|
sendUpdateMetadataRequest
Method
sendUpdateMetadataRequest(): Unit
sendUpdateMetadataRequest
requests the ControllerBrokerRequestBatch to newBatch and addUpdateMetadataRequestForBrokers.
In the end, sendUpdateMetadataRequest
requests the ControllerBrokerRequestBatch to sendRequestsToBrokers with the current epoch.
In case of IllegalStateException
, sendUpdateMetadataRequest
handleIllegalState (that triggers controller movement).
Note
|
|
updateLeaderEpochAndSendRequest
Internal Method
updateLeaderEpochAndSendRequest(
partition: TopicPartition,
replicasToReceiveRequest: Seq[Int],
newAssignedReplicas: Seq[Int]): Unit
updateLeaderEpochAndSendRequest
updates leader epoch for the partition and branches off per result: a LeaderIsrAndControllerEpoch or none at all.
updateLeaderEpochAndSendRequest
and LeaderIsrAndControllerEpoch
When updating leader epoch for the partition returns a LeaderIsrAndControllerEpoch
, updateLeaderEpochAndSendRequest
requests the ControllerBrokerRequestBatch to prepare a new batch. updateLeaderEpochAndSendRequest
requests the ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers followed by sendRequestsToBrokers.
In the end, updateLeaderEpochAndSendRequest
prints out the following TRACE message to the logs:
Sent LeaderAndIsr request [updatedLeaderIsrAndControllerEpoch] with new assigned replica list [newAssignedReplicas] to leader [leader] for partition being reassigned [partition]
updateLeaderEpochAndSendRequest
and No LeaderIsrAndControllerEpoch
When updating leader epoch for the partition returns None
, updateLeaderEpochAndSendRequest
prints out the following ERROR message to the logs:
Failed to send LeaderAndIsr request with new assigned replica list [newAssignedReplicas] to leader for partition being reassigned [partition]
Note
|
updateLeaderEpochAndSendRequest is used when KafkaController is requested to onPartitionReassignment and moveReassignedPartitionLeaderIfRequired.
|
Shutting Down — shutdown
Method
shutdown(): Unit
shutdown
requests the ControllerEventManager to close followed by onControllerResignation.
Note
|
shutdown is used exclusively when KafkaServer is requested to shutdown.
|
updateMetrics
Internal Method
updateMetrics(): Unit
updateMetrics
…FIXME
Note
|
updateMetrics is used exclusively when KafkaController is created (and creates the ControllerEventManager).
|
onBrokerStartup
Method
onBrokerStartup(
newBrokers: Seq[Int]): Unit
onBrokerStartup
prints out the following INFO message to the logs:
New broker startup callback for [newBrokers]
onBrokerStartup
requests the ControllerContext for the replicasOnOfflineDirs and removes the given broker IDs (in newBrokers
).
onBrokerStartup
sendUpdateMetadataRequest to the liveOrShuttingDownBrokerIds (of the ControllerContext).
onBrokerStartup
requests the ControllerContext for the replicas on the given newBrokers
.
onBrokerStartup
requests the ReplicaStateMachine to handleStateChanges for the replicas on the new brokers and OnlineReplica
target state.
onBrokerStartup
requests the PartitionStateMachine to triggerOnlinePartitionStateChange.
onBrokerStartup
requests the ControllerContext for the partitionsBeingReassigned and collects the partitions that have replicas on the new brokers. For every partition with a replica on the new brokers, onBrokerStartup
onPartitionReassignment.
onBrokerStartup
collects replicas (on the new brokers) that are scheduled to be deleted by requesting the TopicDeletionManager to see whether isTopicQueuedUpForDeletion. If there are any, onBrokerStartup
prints out the following INFO message to the logs and requests the TopicDeletionManager to resumeDeletionForTopics.
Some replicas [replicasForTopicsToBeDeleted] for topics scheduled for deletion [topicsToBeDeleted] are on the newly restarted brokers [newBrokers]. Signaling restart of topic deletion for these topics
In the end, onBrokerStartup
registerBrokerModificationsHandler for the new brokers.
Note
|
onBrokerStartup is used exclusively when KafkaController is requested to process a BrokerChange controller event.
|
Controller Election — elect
Method
elect(): Unit
elect
requests the KafkaZkClient for the active controller ID (or assumes -1
if not available) and saves it to the activeControllerId internal registry.
elect
stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:
Broker [activeControllerId] has been elected as the controller, so stopping the election process.
Otherwise, with no active controller, elect
requests the KafkaZkClient to registerControllerAndIncrementControllerEpoch (with the broker ID).
elect
saves the controller epoch and the zookeeper epoch as the epoch and epochZkVersion of the ControllerContext, respectively.
elect
saves the broker ID as the activeControllerId internal registry.
elect
prints out the following INFO message to the logs:
[brokerId] successfully elected as the controller. Epoch incremented to [epoch] and epoch zk version is now [epochZkVersion]
In the end, elect
onControllerFailover.
Note
|
elect is used when ControllerEventThread is requested to process Startup and Reelect controller events.
|
elect
and ControllerMovedException
In case of a ControllerMovedException
, elect
maybeResign and prints out either DEBUG or WARN message to the logs per the activeControllerId internal registry:
Broker [activeControllerId] was elected as controller instead of broker [brokerId]
A controller has been elected but just resigned, this will result in another round of election
elect
and Throwable
In case of a Throwable
, elect
prints out the following ERROR message to the logs and triggerControllerMove.
Error while electing or becoming controller on broker [brokerId]. Trigger controller movement immediately
Is Broker The Active Controller? — isActive
Method
isActive: Boolean
isActive
indicates whether the current broker (by the broker ID) hosts the active KafkaController
(given the activeControllerId) or not.
Note
|
isActive is on (true ) after the KafkaController of a Kafka broker has been elected.
|
Note
|
|
registerIsrChangeNotificationListener
Internal Method
registerIsrChangeNotificationListener(): Option[Seq[String]]
registerIsrChangeNotificationListener
…FIXME
Note
|
registerIsrChangeNotificationListener is used when…FIXME
|
deregisterIsrChangeNotificationListener
Internal Method
deregisterIsrChangeNotificationListener(): Unit
deregisterIsrChangeNotificationListener
…FIXME
Note
|
deregisterIsrChangeNotificationListener is used when…FIXME
|
Starting Up — startup
Method
startup(): Unit
startup
requests the KafkaZkClient to register a StateChangeHandler (under the name controller-state-change-handler) that is does the following:
-
On
afterInitializingSession
, theStateChangeHandler
simply putsRegisterBrokerAndReelect
event on the event queue of the ControllerEventManager -
On
beforeInitializingSession
, theStateChangeHandler
simply putsExpire
event on the event queue of the ControllerEventManager
startup
then puts Startup
event at the end of the event queue of the ControllerEventManager and immediately requests it to start.
Note
|
startup is used exclusively when KafkaServer is requested to start.
|
Registering SessionExpirationListener To Control Session Recreation — registerSessionExpirationListener
Internal Method
registerSessionExpirationListener(): Unit
registerSessionExpirationListener
requests ZkUtils to subscribe to state changes with a SessionExpirationListener
(with the KafkaController
and ControllerEventManager).
Note
|
SessionExpirationListener puts Reelect event on the event queue of ControllerEventManager every time the Zookeeper session has expired and a new session has been created.
|
Registering ControllerChangeListener for /controller ZNode Changes — registerControllerChangeListener
Internal Method
registerControllerChangeListener(): Unit
registerControllerChangeListener
requests ZkUtils to subscribe to data changes for /controller
znode with a ControllerChangeListener
(with the KafkaController
and ControllerEventManager).
Note
|
|
registerBrokerChangeListener
Internal Method
registerBrokerChangeListener(): Option[Seq[String]]
registerBrokerChangeListener
requests ZkUtils to subscribeChildChanges for /brokers/ids
path with BrokerChangeListener.
Note
|
registerBrokerChangeListener is used exclusively when KafkaController does onControllerFailover.
|
Getting Active Controller ID (from JSON under /controller znode) — getControllerID
Method
getControllerID(): Int
getControllerID
returns the ID of the active Kafka controller that is associated with /controller
znode in JSON format or -1
otherwise.
Internally, getControllerID
requests ZkUtils for data associated with /controller
znode.
If available, getControllerID
parses the data (being the current controller info in JSON format) to extract brokerid
field.
$ ./bin/zookeeper-shell.sh :2181 get /controller
{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0
Otherwise, when no /controller
znode is available, getControllerID
returns -1
.
Note
|
|
Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — registerTopicDeletionListener
Internal Method
registerTopicDeletionListener(): Option[Seq[String]]
registerTopicDeletionListener
requests ZkUtils to subscribeChildChanges to /admin/delete_topics
znode with TopicDeletionListener.
Note
|
registerTopicDeletionListener is used exclusively when KafkaController does onControllerFailover.
|
De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — deregisterTopicDeletionListener
Internal Method
deregisterTopicDeletionListener(): Unit
deregisterTopicDeletionListener
requests ZkUtils to unsubscribeChildChanges to /admin/delete_topics
znode with TopicDeletionListener.
Note
|
deregisterTopicDeletionListener is used exclusively when KafkaController resigns as the active controller.
|
onReplicasBecomeOffline
Internal Method
onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit
onReplicasBecomeOffline
…FIXME
Note
|
onReplicasBecomeOffline is used when…FIXME
|
onPartitionReassignment
Internal Method
onPartitionReassignment(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext): Unit
onPartitionReassignment
…FIXME
Note
|
onPartitionReassignment is used when KafkaController is requested to onBrokerStartup, maybeTriggerPartitionReassignment and process a PartitionReassignmentIsrChange event.
|
onBrokerUpdate
Internal Method
onBrokerUpdate(updatedBrokerId: Int): Unit
onBrokerUpdate
…FIXME
Note
|
onBrokerUpdate is used when…FIXME
|
updateBrokerInfo
Internal Method
updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit
updateBrokerInfo
…FIXME
Note
|
updateBrokerInfo is used exclusively when DynamicListenerConfig is requested to reconfigure.
|
registerBrokerModificationsHandler
Internal Method
registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit
registerBrokerModificationsHandler
…FIXME
Note
|
registerBrokerModificationsHandler is used when KafkaController is requested to onBrokerStartup and onControllerFailover (indirectly through initializeControllerContext).
|
Initializing ControllerContext — initializeControllerContext
Internal Method
initializeControllerContext(): Unit
initializeControllerContext
…FIXME
In the end, initializeControllerContext
prints out the following INFO messages to the logs (with the current state based on the ControllerContext):
Currently active brokers in the cluster: [liveBrokerIds] Currently shutting brokers in the cluster: [shuttingDownBrokerIds] Current list of topics in the cluster: [allTopics]
Note
|
initializeControllerContext is used exclusively when KafkaController is requested to onControllerFailover.
|
unregisterBrokerModificationsHandler
Internal Method
unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit
unregisterBrokerModificationsHandler
…FIXME
Note
|
unregisterBrokerModificationsHandler is used when KafkaController is requested to onControllerResignation and onBrokerFailure.
|
onBrokerFailure
Internal Method
onBrokerFailure(deadBrokers: Seq[Int]): Unit
onBrokerFailure
…FIXME
Note
|
onBrokerFailure is used exclusively when KafkaController is requested to handle a BrokerChange controller event.
|
maybeTriggerPartitionReassignment
Internal Method
maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]): Unit
maybeTriggerPartitionReassignment
…FIXME
Note
|
maybeTriggerPartitionReassignment is used when KafkaController is requested to onControllerFailover and process the PartitionReassignment controller event.
|
incrementControllerEpoch
Internal Method
incrementControllerEpoch(): Unit
incrementControllerEpoch
…FIXME
Note
|
incrementControllerEpoch is used exclusively when KafkaController is requested to onControllerFailover.
|
fetchPendingPreferredReplicaElections
Internal Method
fetchPendingPreferredReplicaElections(): Set[TopicPartition]
fetchPendingPreferredReplicaElections
…FIXME
Note
|
fetchPendingPreferredReplicaElections is used exclusively when KafkaController is requested to onControllerFailover.
|
initializePartitionReassignment
Internal Method
initializePartitionReassignment(): Unit
initializePartitionReassignment
…FIXME
Note
|
initializePartitionReassignment is used exclusively when KafkaController is requested to initializeControllerContext.
|
fetchTopicDeletionsInProgress
Internal Method
fetchTopicDeletionsInProgress(): (Set[String], Set[String])
fetchTopicDeletionsInProgress
…FIXME
Note
|
fetchTopicDeletionsInProgress is used exclusively when KafkaController is requested to onControllerFailover.
|
updateLeaderAndIsrCache
Internal Method
updateLeaderAndIsrCache(partitions: Seq[TopicPartition]
Unless given, updateLeaderAndIsrCache
defaults to allPartitions of the ControllerContext for the partitions.
updateLeaderAndIsrCache
requests the KafkaZkClient to getTopicPartitionStates (with the given partitions) and updates the partitionLeadershipInfo of the ControllerContext.
Note
|
updateLeaderAndIsrCache is used when KafkaController is requested to initializeControllerContext (with no partitions) and process an IsrChangeNotification controller event (with partitions given).
|
areReplicasInIsr
Internal Method
areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean
areReplicasInIsr
…FIXME
Note
|
areReplicasInIsr is used exclusively when KafkaController is requested to onPartitionReassignment.
|
updateAssignedReplicasForPartition
Internal Method
updateAssignedReplicasForPartition(
partition: TopicPartition,
replicas: Seq[Int]): Unit
updateAssignedReplicasForPartition
…FIXME
Note
|
updateAssignedReplicasForPartition is used exclusively when KafkaController is requested to onPartitionReassignment.
|
registerPartitionModificationsHandlers
Internal Method
registerPartitionModificationsHandlers(topics: Seq[String]): Unit
registerPartitionModificationsHandlers
…FIXME
Note
|
registerPartitionModificationsHandlers is used when KafkaController is requested to initializeControllerContext and a TopicChange controller event is processed.
|
unregisterPartitionModificationsHandlers
Internal Method
unregisterPartitionModificationsHandlers(topics: Seq[String]): Unit
unregisterPartitionModificationsHandlers
…FIXME
Note
|
|
unregisterPartitionReassignmentIsrChangeHandlers
Internal Method
unregisterPartitionReassignmentIsrChangeHandlers(): Unit
unregisterPartitionReassignmentIsrChangeHandlers
…FIXME
Note
|
unregisterPartitionReassignmentIsrChangeHandlers is used exclusively when KafkaController is requested to onControllerResignation.
|
readControllerEpochFromZooKeeper
Internal Method
readControllerEpochFromZooKeeper(): Unit
readControllerEpochFromZooKeeper
…FIXME
Note
|
readControllerEpochFromZooKeeper is used exclusively when KafkaController is requested to onControllerFailover.
|
removePartitionsFromReassignedPartitions
Internal Method
removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]): Unit
removePartitionsFromReassignedPartitions
…FIXME
Note
|
removePartitionsFromReassignedPartitions is used when KafkaController is requested to onPartitionReassignment and maybeTriggerPartitionReassignment.
|
removePartitionsFromPreferredReplicaElection
Internal Method
removePartitionsFromPreferredReplicaElection(
partitionsToBeRemoved: Set[TopicPartition],
isTriggeredByAutoRebalance : Boolean): Unit
removePartitionsFromPreferredReplicaElection
…FIXME
Note
|
removePartitionsFromPreferredReplicaElection is used exclusively when KafkaController is requested to onPreferredReplicaElection (and the election type is not AdminClientTriggered).
|
Preferred Replica Leader Election — onPreferredReplicaElection
Internal Method
onPreferredReplicaElection(
partitions: Set[TopicPartition],
electionType: ElectionType): Map[TopicPartition, Throwable]
onPreferredReplicaElection
prints out the following INFO message to the logs:
Starting preferred replica leader election for partitions [partitions]
onPreferredReplicaElection
requests the PartitionStateMachine to handle partition state changes for the partitions (with OnlinePartition
target state and PreferredReplicaPartitionLeaderElectionStrategy).
(only for election types that are not AdminClientTriggered) In the end, onPreferredReplicaElection
removePartitionsFromPreferredReplicaElection.
(only for election types that are not AdminClientTriggered) In case of an error
Note
|
|
updateLeaderEpoch
Internal Method
updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]
updateLeaderEpoch
…FIXME
Note
|
updateLeaderEpoch is used exclusively when KafkaController is requested to updateLeaderEpochAndSendRequest.
|
moveReassignedPartitionLeaderIfRequired
Internal Method
moveReassignedPartitionLeaderIfRequired(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext): Unit
moveReassignedPartitionLeaderIfRequired
…FIXME
Note
|
moveReassignedPartitionLeaderIfRequired is used exclusively when KafkaController is requested to onPartitionReassignment.
|
onControllerFailover
Internal Method
onControllerFailover(): Unit
onControllerFailover
prints out the following INFO message to the logs:
Registering handlers
onControllerFailover
requests the KafkaZkClient to registerZNodeChildChangeHandlers:
onControllerFailover
requests the KafkaZkClient to registerZNodeChangeHandlerAndCheckExistence:
onControllerFailover
prints out the following INFO message to the logs:
Deleting log dir event notifications
onControllerFailover
requests the KafkaZkClient to deleteLogDirEventNotifications (with the epochZkVersion of the ControllerContext).
onControllerFailover
prints out the following INFO message to the logs:
Deleting isr change notifications
onControllerFailover
requests the KafkaZkClient to deleteIsrChangeNotifications (with the epochZkVersion of the ControllerContext).
onControllerFailover
prints out the following INFO message to the logs:
Initializing controller context
onControllerFailover
initializeControllerContext.
onControllerFailover
prints out the following INFO message to the logs:
Fetching topic deletions in progress
onControllerFailover
fetchTopicDeletionsInProgress.
onControllerFailover
prints out the following INFO message to the logs:
Initializing topic deletion manager
onControllerFailover
requests the TopicDeletionManager to initialize (with the topics to be deleted and ineligible for deletion).
onControllerFailover
prints out the following INFO message to the logs:
Sending update metadata request
onControllerFailover
sendUpdateMetadataRequest (with the liveOrShuttingDownBrokerIds of the ControllerContext).
onControllerFailover
requests the ReplicaStateMachine to start up.
onControllerFailover
requests the PartitionStateMachine to start up.
onControllerFailover
prints out the following INFO message to the logs:
Ready to serve as the new controller with epoch [epoch]
onControllerFailover
maybeTriggerPartitionReassignment (with the partitionsBeingReassigned of the ControllerContext).
onControllerFailover
requests the TopicDeletionManager to tryTopicDeletion.
onControllerFailover
onPreferredReplicaElection with the fetchPendingPreferredReplicaElections.
onControllerFailover
prints out the following INFO message to the logs:
Starting the controller scheduler
onControllerFailover
requests the KafkaScheduler to startup.
With auto.leader.rebalance.enable enabled (default: true
), onControllerFailover
scheduleAutoLeaderRebalanceTask with the initial delay of 5 seconds.
With delegation.token.master.key password set (default: (empty)
), onControllerFailover
prints out the following INFO message to the logs:
starting the token expiry check scheduler
onControllerFailover
requests the tokenCleanScheduler KafkaScheduler to startup and requests it to schedule the delete-expired-tokens task (FIXME).
Note
|
onControllerFailover is used when KafkaController is requested to elect (and a broker is successfully elected as the active controller).
|
scheduleAutoLeaderRebalanceTask
Internal Method
scheduleAutoLeaderRebalanceTask(
delay: Long,
unit: TimeUnit): Unit
scheduleAutoLeaderRebalanceTask
simply requests the KafkaScheduler to schedule a task called auto-leader-rebalance-task with the given initial delay.
The auto-leader-rebalance-task
simply requests the ControllerEventManager to enqueue an AutoPreferredReplicaLeaderElection controller event.
Note
|
scheduleAutoLeaderRebalanceTask is used when KafkaController is requested to onControllerFailover and processAutoPreferredReplicaLeaderElection
|
processAutoPreferredReplicaLeaderElection
Internal Method
processAutoPreferredReplicaLeaderElection(): Unit
Note
|
processAutoPreferredReplicaLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController ) is an active controller.
|
processAutoPreferredReplicaLeaderElection
prints out the following INFO message to the logs:
Processing automatic preferred replica leader election
processAutoPreferredReplicaLeaderElection
checkAndTriggerAutoLeaderRebalance.
In the end, processAutoPreferredReplicaLeaderElection
scheduleAutoLeaderRebalanceTask with the initial delay based on leader.imbalance.check.interval.seconds configuration property (default: 300
seconds).
Note
|
processAutoPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to process a AutoPreferredReplicaLeaderElection event.
|
checkAndTriggerAutoLeaderRebalance
Internal Method
checkAndTriggerAutoLeaderRebalance(): Unit
checkAndTriggerAutoLeaderRebalance
prints out the following TRACE message to the logs:
Checking need to trigger auto leader balancing
checkAndTriggerAutoLeaderRebalance
…FIXME
checkAndTriggerAutoLeaderRebalance
prints out the following DEBUG message to the logs:
Preferred replicas by broker [preferredReplicasForTopicsByBrokers]
For every broker with one or more partition leaders, checkAndTriggerAutoLeaderRebalance
…FIXME
checkAndTriggerAutoLeaderRebalance
prints out the following DEBUG message to the logs:
Topics not in preferred replica for broker [leaderBroker] [topicsNotInPreferredReplica]
checkAndTriggerAutoLeaderRebalance
calculates an imbalance ratio of the broker which is the number of topicsNotInPreferredReplica
divided by the total number of partitions (topicPartitionsForBroker
).
checkAndTriggerAutoLeaderRebalance
prints out the following TRACE message to the logs:
Leader imbalance ratio for broker [leaderBroker] is [imbalanceRatio]
With the imbalance ratio greater than the desired ratio (per leader.imbalance.per.broker.percentage configuration property with the default: 10%
), checkAndTriggerAutoLeaderRebalance
onPreferredReplicaElection for…FIXME (with AutoTriggered election type).
Note
|
checkAndTriggerAutoLeaderRebalance is used exclusively when KafkaController is requested to processAutoPreferredReplicaLeaderElection.
|
onNewPartitionCreation
Internal Method
onNewPartitionCreation(
newPartitions: Set[TopicPartition]): Unit
onNewPartitionCreation
…FIXME
Note
|
onNewPartitionCreation is used when TopicChange and PartitionModifications controller events are processed.
|
onReplicaElection
Internal Method
onReplicaElection(
partitions: Set[TopicPartition],
electionType: ElectionType,
electionTrigger: ElectionTrigger): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]
onReplicaElection
…FIXME
Note
|
onReplicaElection is used when…FIXME
|
Handling Log Directory Failures for Brokers — onBrokerLogDirFailure
Internal Method
onBrokerLogDirFailure(
brokerIds: Seq[Int]): Unit
onBrokerLogDirFailure
prints out the following INFO message to the logs:
Handling log directory failure for brokers [brokerIds]
onBrokerLogDirFailure
requests the ControllerContext for the replicas on the brokers and then requests the ReplicaStateMachine to handle state changes for the replicas to enter OnlineReplica
state.
Note
|
onBrokerLogDirFailure is used exclusively when KafkaController is requested to process a LogDirEventNotification controller event (on controller-event-thread).
|
stopOldReplicasOfReassignedPartition
Internal Method
stopOldReplicasOfReassignedPartition(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
oldReplicas: Set[Int]): Unit
stopOldReplicasOfReassignedPartition
…FIXME
Note
|
stopOldReplicasOfReassignedPartition is used when…FIXME
|
startNewReplicasForReassignedPartition
Internal Method
startNewReplicasForReassignedPartition(
topicPartition: TopicPartition,
reassignedPartitionContext: ReassignedPartitionsContext,
newReplicas: Set[Int]): Unit
startNewReplicasForReassignedPartition
…FIXME
Note
|
startNewReplicasForReassignedPartition is used when…FIXME
|
enableDefaultUncleanLeaderElection
Method
enableDefaultUncleanLeaderElection(): Unit
Note
|
enableDefaultUncleanLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController ) is an active controller.
|
enableDefaultUncleanLeaderElection
simply requests the ControllerEventManager to enqueue a UncleanLeaderElectionEnable event.
Note
|
enableDefaultUncleanLeaderElection is used when DynamicLogConfig is requested to reconfigure (for unclean.leader.election.enable configuration property).
|
Preferred Replica Leader Election — electPreferredLeaders
Method
electPreferredLeaders(
partitions: Set[TopicPartition],
callback: ElectPreferredLeadersCallback = { (_, _) => }): Unit
electPreferredLeaders
simply requests the ControllerEventManager to enqueue an PreferredReplicaLeaderElection event (with AdminClientTriggered election type)
Note
|
electPreferredLeaders is used exclusively when ReplicaManager is requested to trigger preferred replica leader election.
|
maybeResign
Internal Method
maybeResign(): Unit
maybeResign
…FIXME
Note
|
maybeResign is used when KafkaController is requested to…FIXME
|
processControlledShutdown
Internal Method
processControlledShutdown(
id: Int,
brokerEpoch: Long,
controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit
processControlledShutdown
…FIXME
Note
|
processControlledShutdown is used exclusively when KafkaController is requested to process a ControlledShutdown controller event.
|
doControlledShutdown
Internal Method
doControlledShutdown(
id: Int,
brokerEpoch: Long): Set[TopicPartition]
doControlledShutdown
…FIXME
Note
|
doControlledShutdown is used exclusively when KafkaController is requested to processControlledShutdown.
|
processUncleanLeaderElectionEnable
Internal Method
processUncleanLeaderElectionEnable(): Unit
Note
|
processUncleanLeaderElectionEnable does nothing (and simply returns) unless the Kafka broker (KafkaController ) is an active controller.
|
processUncleanLeaderElectionEnable
prints out the following INFO message to the logs:
Unclean leader election has been enabled by default
processUncleanLeaderElectionEnable
requests the PartitionStateMachine to triggerOnlinePartitionStateChange.
Note
|
processUncleanLeaderElectionEnable is used exclusively when KafkaController is requested to process a UncleanLeaderElectionEnable controller event.
|
processTopicUncleanLeaderElectionEnable
Internal Method
processTopicUncleanLeaderElectionEnable(topic: String): Unit
processTopicUncleanLeaderElectionEnable
…FIXME
Note
|
processTopicUncleanLeaderElectionEnable is used when KafkaController is requested to process a TopicUncleanLeaderElectionEnable controller event.
|
Processing LeaderAndIsrResponseReceived Controller Event (On controller-event-thread) — processLeaderAndIsrResponseReceived
Internal Method
processLeaderAndIsrResponseReceived(
leaderAndIsrResponseObj: AbstractResponse,
brokerId: Int): Unit
processLeaderAndIsrResponseReceived
…FIXME
Note
|
processLeaderAndIsrResponseReceived is used exclusively when KafkaController is requested to process a LeaderAndIsrResponseReceived controller event (on the controller-event-thread).
|
processTopicDeletionStopReplicaResponseReceived
Internal Method
processTopicDeletionStopReplicaResponseReceived(
replicaId: Int,
requestError: Errors,
partitionErrors: Map[TopicPartition, Errors]): Unit
processTopicDeletionStopReplicaResponseReceived
…FIXME
Note
|
processTopicDeletionStopReplicaResponseReceived is used exclusively when KafkaController is requested to process a TopicDeletionStopReplicaResponseReceived controller event.
|
Processing BrokerChange Controller Event (On controller-event-thread) — processBrokerChange
Internal Method
processBrokerChange(): Unit
Note
|
processBrokerChange does nothing (and simply returns) unless the Kafka broker (KafkaController ) is an active controller.
|
processBrokerChange
requests the KafkaZkClient for the brokers in a Kafka cluster and compares the broker list with the current brokers (of the ControllerContext).
At this point in time, processBrokerChange
knows what brokers are new, dead or bounced.
processBrokerChange
prints out the following INFO message to the logs:
Newly added brokers: [ids], deleted brokers: [ids], bounced brokers: [ids], all live brokers: [ids]
processBrokerChange
notifies (updates) the ControllerChannelManager:
-
For every newly-added broker,
processBrokerChange
requests to register it -
For bounced brokers,
processBrokerChange
requests to deregister them all first followed by registering them -
For every deleted broker,
processBrokerChange
requests to deregister it
processBrokerChange
updates the ControllerContext:
-
For newly-added brokers (if there were any),
processBrokerChange
requests to addLiveBrokersAndEpochs followed by onBrokerStartup -
For bounced brokers (if there were any),
processBrokerChange
first requests to remove them followed by onBrokerFailure and then requests to addLiveBrokersAndEpochs followed by onBrokerStartup -
For deleted brokers (if there were any),
processBrokerChange
requests to remove them followed by onBrokerFailure
In the end, only when they were any updates (new, dead or bounced brokers), processBrokerChange
prints out the following INFO message to the logs:
Updated broker epochs cache: [liveBrokerIdAndEpochs]
Note
|
processBrokerChange is used exclusively when KafkaController is requested to process a BrokerChange controller event (on the controller-event-thread).
|
processBrokerModification
Internal Method
processBrokerModification(
brokerId: Int): Unit
processBrokerModification
…FIXME
Note
|
processBrokerModification is used exclusively when KafkaController is requested to process a BrokerModifications controller event.
|
processControllerChange
Internal Method
processControllerChange(): Unit
processControllerChange
…FIXME
Note
|
processControllerChange is used exclusively when KafkaController is requested to process a ControllerChange controller event.
|
processReelect
Internal Method
processReelect(): Unit
processReelect
…FIXME
Note
|
processReelect is used when KafkaController is requested to processRegisterBrokerAndReelect and process a Reelect controller event.
|
processRegisterBrokerAndReelect
Internal Method
processRegisterBrokerAndReelect(): Unit
processRegisterBrokerAndReelect
…FIXME
Note
|
processRegisterBrokerAndReelect is used exclusively when KafkaController is requested to process a RegisterBrokerAndReelect controller event.
|
processExpire
Internal Method
processExpire(): Unit
processExpire
…FIXME
Note
|
processExpire is used exclusively when KafkaController is requested to process a Expire controller event.
|
processTopicChange
Internal Method
processTopicChange(): Unit
processTopicChange
…FIXME
Note
|
processTopicChange is used exclusively when KafkaController is requested to process a TopicChange controller event.
|
Processing LogDirEventNotification Controller Event (On controller-event-thread) — processLogDirEventNotification
Internal Method
processLogDirEventNotification(): Unit
Note
|
processLogDirEventNotification does nothing (and simply returns) unless the Kafka broker (KafkaController ) is an active controller.
|
processLogDirEventNotification
requests the KafkaZkClient for the LogDirEvent notifications (from Zookeeper) (sequence numbers).
processLogDirEventNotification
requests the KafkaZkClient for the convert the LogDirEvent notifications to broker IDs and then handles the log directory failures for the brokers.
In the end, processLogDirEventNotification
requests the KafkaZkClient to delete LogDirEvent notifications (from Zookeeper).
Note
|
processLogDirEventNotification is used exclusively when KafkaController is requested to process a LogDirEventNotification controller event.
|
processPartitionModifications
Internal Method
processPartitionModifications(
topic: String): Unit
processPartitionModifications
…FIXME
Note
|
processPartitionModifications is used exclusively when KafkaController is requested to process a PartitionModifications controller event.
|
processTopicDeletion
Internal Method
processTopicDeletion(): Unit
processTopicDeletion
…FIXME
Note
|
processTopicDeletion is used exclusively when KafkaController is requested to process a TopicDeletion controller event.
|
processPartitionReassignmentIsrChange
Internal Method
processPartitionReassignmentIsrChange(
partition: TopicPartition): Unit
processPartitionReassignmentIsrChange
…FIXME
Note
|
processPartitionReassignmentIsrChange is used exclusively when KafkaController is requested to process a PartitionReassignmentIsrChange controller event.
|
processIsrChangeNotification
Internal Method
processIsrChangeNotification(): Unit
processIsrChangeNotification
…FIXME
Note
|
processIsrChangeNotification is used exclusively when KafkaController is requested to process a IsrChangeNotification controller event.
|
processStartup
Internal Method
processStartup(): Unit
processStartup
requests the KafkaZkClient to registerZNodeChangeHandlerAndCheckExistence (with the ControllerChangeHandler).
In the end, processStartup
starts controller election.
Note
|
processStartup is used exclusively when KafkaController is requested to process a Startup controller event (on the controller-event-thread).
|
processZkPartitionReassignment
Internal Method
processZkPartitionReassignment(): Unit
processZkPartitionReassignment
…FIXME
Note
|
processZkPartitionReassignment is used when KafkaController is requested to process a ZkPartitionReassignment controller event and initializePartitionReassignments (for KafkaController to process Startup and Reelect controller events).
|
initializePartitionReassignments
Internal Method
initializePartitionReassignments(): Unit
initializePartitionReassignments
…FIXME
Note
|
initializePartitionReassignments is used when KafkaController is requested to onControllerFailover (for KafkaController to process Startup and Reelect controller events).
|
processReplicaLeaderElection
Internal Method
processReplicaLeaderElection(
partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
electionType: ElectionType,
electionTrigger: ElectionTrigger,
callback: ElectLeadersCallback): Unit
processReplicaLeaderElection
…FIXME
Note
|
processReplicaLeaderElection is used when KafkaController is requested to process a ReplicaLeaderElection controller event.
|
processListPartitionReassignments
Internal Method
processListPartitionReassignments(
partitionsOpt: Option[Set[TopicPartition]],
callback: ListReassignmentsCallback): Unit
processListPartitionReassignments
…FIXME
Note
|
processListPartitionReassignments is used when KafkaController is requested to process a ListPartitionReassignments controller event.
|
processApiPartitionReassignment
Internal Method
processApiPartitionReassignment(
reassignments: Map[TopicPartition, Option[Seq[Int]]],
callback: AlterReassignmentsCallback): Unit
processApiPartitionReassignment
…FIXME
Note
|
processApiPartitionReassignment is used when KafkaController is requested to process a ApiPartitionReassignment controller event.
|
Preempting PreferredReplicaLeaderElection — preemptPreferredReplicaLeaderElection
Internal Method
preemptPreferredReplicaLeaderElection(
partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
callback: ElectPreferredLeadersCallback = (_, _) => {}): Unit
preemptPreferredReplicaLeaderElection
…FIXME
Note
|
preemptPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to preempt a PreferredReplicaLeaderElection controller event.
|
Internal Properties
Name | Description |
---|---|
|
|
|
|
|
A
|
|
ControllerEventManager (with rateAndTimeMetrics of the ControllerContext, the updateMetrics as the eventProcessedListener` and the maybeResign as the controllerMovedListener)
|
|
KafkaScheduler with 1 daemon thread with kafka-scheduler prefix |
|
StateChangeLogger with the broker ID and |
|
KafkaScheduler with 1 daemon thread with delegation-token-cleaner prefix |
|