KafkaController

KafkaController is created and immediately started when KafkaServer is requested to start up.

KafkaController.png
Figure 1. KafkaController

KafkaController is the default and only known ControllerEventProcessor to process and preempt controller events.

KafkaController is in one of the ControllerStates (that is the state of the ControllerEventManager).

KafkaController uses the KafkaZkClient to be notified about changes in the state of a Kafka cluster (that are reflected in changes in znodes of Apache Zookeeper) and propagate the state changes to other brokers.

KafkaController uses listeners as a notification system to monitor znodes in Zookeeper and react accordingly.

KafkaController emulates a state machine using controller events.

Table 1. KafkaController’s Controller Events
Event ControllerState process Handler

ApiPartitionReassignment

processApiPartitionReassignment

AutoPreferredReplicaLeaderElection

processAutoPreferredReplicaLeaderElection

BrokerChange

BrokerChange

processBrokerChange

BrokerModifications

processBrokerModification

ControlledShutdown

  • ID

  • controlledShutdownCallback: Try[Set[TopicAndPartition]] ⇒ Unit

ControlledShutdown

processControlledShutdown

ControllerChange

  • newControllerId: Int

ControllerChange

  1. Assigns the current controller ID as the input newControllerId

  2. (only when the broker is no longer an active controller) Resigns as the active controller

Note
Similar to Reelect event with the only difference that it does not trigger election

Expire

processExpire

IsrChangeNotification

processIsrChangeNotification

LeaderAndIsrResponseReceived

processLeaderAndIsrResponseReceived

ListPartitionReassignments

processListPartitionReassignments

LogDirEventNotification

processLogDirEventNotification

PartitionModifications

processPartitionModifications

PartitionReassignmentIsrChange

processPartitionReassignmentIsrChange

Reelect

ControllerChange

RegisterBrokerAndReelect

processRegisterBrokerAndReelect

ReplicaLeaderElection

processReplicaLeaderElection

ShutdownEventThread

Startup

ControllerChange

TopicChange

processTopicChange

TopicDeletion

processTopicDeletion

TopicDeletionStopReplicaResponseReceived

processTopicDeletionStopReplicaResponseReceived

TopicUncleanLeaderElectionEnable

processTopicUncleanLeaderElectionEnable

UncleanLeaderElectionEnable

processUncleanLeaderElectionEnable

ZkPartitionReassignment

processZkPartitionReassignment

Table 2. KafkaController’s ZNodeChangeHandler and ZNodeChildChangeHandlers
Name Description

brokerChangeHandler

ZNodeChildChangeHandler of /brokers/ids path

On handleChildChange, brokerChangeHandler simply sends BrokerChange event to the ControllerEventManager.

isrChangeNotificationHandler

ZNodeChildChangeHandler of /isr_change_notification path

On handleChildChange, isrChangeNotificationHandler simply sends IsrChangeNotification event to the ControllerEventManager.

logDirEventNotificationHandler

ZNodeChildChangeHandler of /log_dir_event_notification path

On handleChildChange, logDirEventNotificationHandler simply sends LogDirEventNotification event to the ControllerEventManager.

partitionModificationsHandlers

ZNodeChangeHandlers per topic of /brokers/topics/[topic] path

On handleDataChange, partitionModificationsHandlers simply send PartitionModifications event to the ControllerEventManager.

partitionReassignmentHandler

ZNodeChangeHandler of /admin/reassign_partitions path

On handleCreation, partitionReassignmentHandler simply sends PartitionReassignment event to the ControllerEventManager.

preferredReplicaElectionHandler

ZNodeChangeHandler of /admin/preferred_replica_election path

On handleCreation, preferredReplicaElectionHandler simply sends PreferredReplicaLeaderElection event to the ControllerEventManager.

topicChangeHandler

ZNodeChildChangeHandler of /brokers/topics path

On handleChildChange, topicChangeHandler simply sends TopicChange event to the ControllerEventManager.

topicDeletionHandler

ZNodeChildChangeHandler of /admin/delete_topics path

On handleChildChange, topicDeletionHandler simply sends TopicDeletion event to the ControllerEventManager.

Table 3. KafkaController’s Listeners
Listener Description

brokerChangeListener

BrokerChangeListener for this KafkaController and eventManager

isrChangeNotificationListener

IsrChangeNotificationListener for this KafkaController and eventManager

Registered in registerIsrChangeNotificationListener when KafkaController does onControllerFailover.

De-registered in deregisterIsrChangeNotificationListener when KafkaController resigns as the active controller.

logDirEventNotificationListener

LogDirEventNotificationListener

partitionModificationsListeners

PartitionModificationsListener by name

partitionReassignmentListener

PartitionReassignmentListener for this KafkaController and ControllerEventManager

preferredReplicaElectionListener

PreferredReplicaElectionListener for this KafkaController and ControllerEventManager

topicDeletionListener

TopicDeletionListener (for this KafkaController and ControllerEventManager)

Registered in registerTopicDeletionListener when KafkaController does onControllerFailover.

De-registered in deregisterTopicDeletionListener when KafkaController resigns as the active controller.

KafkaController uses [Controller id=[brokerId]] as the logging prefix (aka logIdent).

Tip

Enable ALL logging level for kafka.controller.KafkaController logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.KafkaController=ALL

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.controller logger in config/log4j.properties:

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

That means that the logs of KafkaController go to logs/controller.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

Creating KafkaController Instance

KafkaController takes the following to be created:

KafkaController initializes the internal properties.

KafkaController and ControllerContext

controllerContext: ControllerContext

When created, KafkaController creates a new ControllerContext.

KafkaController and ControllerChannelManager

controllerChannelManager: ControllerChannelManager

When created, KafkaController creates a new ControllerChannelManager.

ControllerChannelManager is used to create separate ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine.

ControllerChannelManager is requested to start up when KafkaController is requested to start controller election (and a broker is successfully elected as the active controller).

KafkaController uses the ControllerChannelManager to add or remove brokers when processing broker changes in Zookeeper (a new or updated znode under /brokers/ids path).

ControllerChannelManager is requested to shut down when KafkaController is requested to resign as the active controller.

KafkaController and ReplicaStateMachine (ZkReplicaStateMachine)

replicaStateMachine: ReplicaStateMachine

When created, KafkaController creates a new ZkReplicaStateMachine.

ZkReplicaStateMachine is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.

ZkReplicaStateMachine is requested to handle state changes of partition replicas at the following events:

KafkaController uses the ZkReplicaStateMachine to create the TopicDeletionManager.

KafkaController and PartitionStateMachine (ZkPartitionStateMachine)

partitionStateMachine: PartitionStateMachine

When created, KafkaController creates a new ZkPartitionStateMachine.

ZkPartitionStateMachine is requested to start up at onControllerFailover (when a broker is successfully elected as the controller) and shut down at controller resignation.

ZkPartitionStateMachine is requested to triggerOnlinePartitionStateChange at the following events:

ZkPartitionStateMachine is requested to handleStateChanges at the following events:

KafkaController uses the ZkPartitionStateMachine to create the TopicDeletionManager.

Preempting Controller Events — preempt Method

preempt(event: ControllerEvent): Unit
Note
preempt is part of the ControllerEventProcessor Contract to preempt controller events.

preempt…​FIXME

Processing Controller Events — process Method

process(event: ControllerEvent): Unit
Note
process is part of the ControllerEventProcessor Contract to process controller events.

process handles the ControllerEvent using ControllerEvent handlers.

Table 4. ControllerEvent Handlers
Name Description

ShutdownEventThread

AutoPreferredReplicaLeaderElection

processAutoPreferredReplicaLeaderElection

ReplicaLeaderElection

processReplicaLeaderElection

UncleanLeaderElectionEnable

processUncleanLeaderElectionEnable

TopicUncleanLeaderElectionEnable

processTopicUncleanLeaderElectionEnable

ControlledShutdown

processControlledShutdown

LeaderAndIsrResponseReceived

processLeaderAndIsrResponseReceived

TopicDeletionStopReplicaResponseReceived

processTopicDeletionStopReplicaResponseReceived

BrokerChange

processBrokerChange

BrokerModifications

processBrokerModification

ControllerChange

processControllerChange

Reelect

processReelect

RegisterBrokerAndReelect

processRegisterBrokerAndReelect

Expire

processExpire

TopicChange

processTopicChange

LogDirEventNotification

processLogDirEventNotification

PartitionModifications

processPartitionModifications

TopicDeletion

processTopicDeletion

ApiPartitionReassignment

processApiPartitionReassignment

ZkPartitionReassignment

processZkPartitionReassignment

ListPartitionReassignments

processListPartitionReassignments

PartitionReassignmentIsrChange

processPartitionReassignmentIsrChange

IsrChangeNotification

processIsrChangeNotification

Startup

processStartup

In the end, process updateMetrics.

In case of a ControllerMovedException, process prints out the following INFO message to the logs and maybeResign.

Controller moved to another broker when processing [event].

In case of any error (Throwable), process simply prints out the following ERROR message to the logs:

Error processing event [event]

initiateReassignReplicasForTopicPartition Method

initiateReassignReplicasForTopicPartition

initiateReassignReplicasForTopicPartition…​FIXME

Note
initiateReassignReplicasForTopicPartition is used when…​FIXME

deregisterPartitionReassignmentIsrChangeListeners Method

deregisterPartitionReassignmentIsrChangeListeners

deregisterPartitionReassignmentIsrChangeListeners…​FIXME

Note
deregisterPartitionReassignmentIsrChangeListeners is used when…​FIXME

resetControllerContext Method

resetControllerContext

resetControllerContext…​FIXME

Note
resetControllerContext is used when…​FIXME

deregisterBrokerChangeListener Method

deregisterBrokerChangeListener

deregisterBrokerChangeListener…​FIXME

Note
deregisterBrokerChangeListener is used when…​FIXME

deregisterTopicChangeListener Method

deregisterTopicChangeListener

deregisterTopicChangeListener…​FIXME

Note
deregisterTopicChangeListener is used when…​FIXME

Resigning As Active Controller — onControllerResignation Method

onControllerResignation(): Unit

onControllerResignation starts by printing out the following DEBUG message to the logs:

Resigning

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znodes in order:

onControllerResignation requests TopicDeletionManager to reset.

onControllerResignation requests KafkaScheduler to shutdown.

onControllerResignation resets the following internal counters:

onControllerResignation requests PartitionStateMachine to shutdown.

onControllerResignation deregisterTopicChangeListener.

onControllerResignation deregisterTopicDeletionListener.

onControllerResignation requests ReplicaStateMachine to shutdown.

onControllerResignation deregisterBrokerChangeListener.

onControllerResignation resetControllerContext.

In the end, onControllerResignation prints out the following DEBUG message to the logs:

Resigned
Note

onControllerResignation is used when:

Unsubscribing from Child Changes to /isr_change_notification ZNode — deregisterIsrChangeNotificationListener Internal Method

deregisterIsrChangeNotificationListener(): Unit

deregisterIsrChangeNotificationListener prints out the following DEBUG message to the logs:

De-registering IsrChangeNotificationListener

deregisterIsrChangeNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /isr_change_notification znode with IsrChangeNotificationListener.

Note
deregisterIsrChangeNotificationListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Child Changes to /log_dir_event_notification ZNode — deregisterLogDirEventNotificationListener Internal Method

deregisterLogDirEventNotificationListener(): Unit

deregisterLogDirEventNotificationListener prints out the following DEBUG message to the logs:

De-registering logDirEventNotificationListener

deregisterLogDirEventNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /log_dir_event_notification znode with LogDirEventNotificationListener.

Note
deregisterLogDirEventNotificationListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode — deregisterPreferredReplicaElectionListener Method

deregisterPreferredReplicaElectionListener(): Unit

deregisterPreferredReplicaElectionListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/preferred_replica_election znode with PreferredReplicaElectionListener.

Note
deregisterPreferredReplicaElectionListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Data Changes to /admin/reassign_partitions ZNode — deregisterPartitionReassignmentListener Method

deregisterPartitionReassignmentListener(): Unit

deregisterPartitionReassignmentListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/reassign_partitions znode with PartitionReassignmentListener.

Note
deregisterPartitionReassignmentListener is used exclusively when KafkaController resigns as the active controller.

triggerControllerMove Internal Method

triggerControllerMove(): Unit

triggerControllerMove…​FIXME

Note
triggerControllerMove is used when KafkaController is requested to handleIllegalState and elect an active controller (and failed).

handleIllegalState Internal Method

handleIllegalState(e: IllegalStateException): Nothing

handleIllegalState…​FIXME

Note
handleIllegalState is used when KafkaController catches an IllegalStateException in updateLeaderEpochAndSendRequest, sendUpdateMetadataRequest and when processing a ControlledShutdown event.

sendUpdateMetadataRequest Method

sendUpdateMetadataRequest(): Unit

sendUpdateMetadataRequest requests the ControllerBrokerRequestBatch to newBatch and addUpdateMetadataRequestForBrokers.

In the end, sendUpdateMetadataRequest requests the ControllerBrokerRequestBatch to sendRequestsToBrokers with the current epoch.

In case of IllegalStateException, sendUpdateMetadataRequest handleIllegalState (that triggers controller movement).

Note

sendUpdateMetadataRequest is used when:

updateLeaderEpochAndSendRequest Internal Method

updateLeaderEpochAndSendRequest(
  partition: TopicPartition,
  replicasToReceiveRequest: Seq[Int],
  newAssignedReplicas: Seq[Int]): Unit

updateLeaderEpochAndSendRequest updates leader epoch for the partition and branches off per result: a LeaderIsrAndControllerEpoch or none at all.

updateLeaderEpochAndSendRequest and LeaderIsrAndControllerEpoch

When updating leader epoch for the partition returns a LeaderIsrAndControllerEpoch, updateLeaderEpochAndSendRequest requests the ControllerBrokerRequestBatch to prepare a new batch. updateLeaderEpochAndSendRequest requests the ControllerBrokerRequestBatch to addLeaderAndIsrRequestForBrokers followed by sendRequestsToBrokers.

In the end, updateLeaderEpochAndSendRequest prints out the following TRACE message to the logs:

Sent LeaderAndIsr request [updatedLeaderIsrAndControllerEpoch] with new assigned replica list [newAssignedReplicas] to leader [leader] for partition being reassigned [partition]

updateLeaderEpochAndSendRequest and No LeaderIsrAndControllerEpoch

When updating leader epoch for the partition returns None, updateLeaderEpochAndSendRequest prints out the following ERROR message to the logs:

Failed to send LeaderAndIsr request with new assigned replica list [newAssignedReplicas] to leader for partition being reassigned [partition]
Note
updateLeaderEpochAndSendRequest is used when KafkaController is requested to onPartitionReassignment and moveReassignedPartitionLeaderIfRequired.

Shutting Down — shutdown Method

shutdown(): Unit

shutdown requests the ControllerEventManager to close followed by onControllerResignation.

Note
shutdown is used exclusively when KafkaServer is requested to shutdown.

updateMetrics Internal Method

updateMetrics(): Unit

updateMetrics…​FIXME

Note
updateMetrics is used exclusively when KafkaController is created (and creates the ControllerEventManager).

onBrokerStartup Method

onBrokerStartup(
  newBrokers: Seq[Int]): Unit

onBrokerStartup prints out the following INFO message to the logs:

New broker startup callback for [newBrokers]

onBrokerStartup requests the ControllerContext for the replicasOnOfflineDirs and removes the given broker IDs (in newBrokers).

onBrokerStartup requests the ControllerContext for the replicas on the given newBrokers.

onBrokerStartup requests the ReplicaStateMachine to handleStateChanges for the replicas on the new brokers and OnlineReplica target state.

onBrokerStartup requests the PartitionStateMachine to triggerOnlinePartitionStateChange.

onBrokerStartup requests the ControllerContext for the partitionsBeingReassigned and collects the partitions that have replicas on the new brokers. For every partition with a replica on the new brokers, onBrokerStartup onPartitionReassignment.

onBrokerStartup collects replicas (on the new brokers) that are scheduled to be deleted by requesting the TopicDeletionManager to see whether isTopicQueuedUpForDeletion. If there are any, onBrokerStartup prints out the following INFO message to the logs and requests the TopicDeletionManager to resumeDeletionForTopics.

Some replicas [replicasForTopicsToBeDeleted] for topics scheduled for deletion [topicsToBeDeleted] are on the newly restarted brokers [newBrokers]. Signaling restart of topic deletion for these topics

In the end, onBrokerStartup registerBrokerModificationsHandler for the new brokers.

Note
onBrokerStartup is used exclusively when KafkaController is requested to process a BrokerChange controller event.

Controller Election — elect Method

elect(): Unit

elect requests the KafkaZkClient for the active controller ID (or assumes -1 if not available) and saves it to the activeControllerId internal registry.

elect stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:

Broker [activeControllerId] has been elected as the controller, so stopping the election process.

Otherwise, with no active controller, elect requests the KafkaZkClient to registerControllerAndIncrementControllerEpoch (with the broker ID).

elect saves the controller epoch and the zookeeper epoch as the epoch and epochZkVersion of the ControllerContext, respectively.

elect saves the broker ID as the activeControllerId internal registry.

elect prints out the following INFO message to the logs:

[brokerId] successfully elected as the controller. Epoch incremented to [epoch] and epoch zk version is now [epochZkVersion]

In the end, elect onControllerFailover.

Note
elect is used when ControllerEventThread is requested to process Startup and Reelect controller events.

elect and ControllerMovedException

In case of a ControllerMovedException, elect maybeResign and prints out either DEBUG or WARN message to the logs per the activeControllerId internal registry:

Broker [activeControllerId] was elected as controller instead of broker [brokerId]
A controller has been elected but just resigned, this will result in another round of election

elect and Throwable

In case of a Throwable, elect prints out the following ERROR message to the logs and triggerControllerMove.

Error while electing or becoming controller on broker [brokerId]. Trigger controller movement immediately

Is Broker The Active Controller? — isActive Method

isActive: Boolean

isActive indicates whether the current broker (by the broker ID) hosts the active KafkaController (given the activeControllerId) or not.

Note
isActive is on (true) after the KafkaController of a Kafka broker has been elected.
Note

isActive is used (as a valve to stop processing early) when:

registerIsrChangeNotificationListener Internal Method

registerIsrChangeNotificationListener(): Option[Seq[String]]

registerIsrChangeNotificationListener…​FIXME

Note
registerIsrChangeNotificationListener is used when…​FIXME

deregisterIsrChangeNotificationListener Internal Method

deregisterIsrChangeNotificationListener(): Unit

deregisterIsrChangeNotificationListener…​FIXME

Note
deregisterIsrChangeNotificationListener is used when…​FIXME

Starting Up — startup Method

startup(): Unit

startup requests the KafkaZkClient to register a StateChangeHandler (under the name controller-state-change-handler) that is does the following:

  • On afterInitializingSession, the StateChangeHandler simply puts RegisterBrokerAndReelect event on the event queue of the ControllerEventManager

  • On beforeInitializingSession, the StateChangeHandler simply puts Expire event on the event queue of the ControllerEventManager

startup then puts Startup event at the end of the event queue of the ControllerEventManager and immediately requests it to start.

Note
startup is used exclusively when KafkaServer is requested to start.

Registering SessionExpirationListener To Control Session Recreation — registerSessionExpirationListener Internal Method

registerSessionExpirationListener(): Unit

registerSessionExpirationListener requests ZkUtils to subscribe to state changes with a SessionExpirationListener (with the KafkaController and ControllerEventManager).

Note
SessionExpirationListener puts Reelect event on the event queue of ControllerEventManager every time the Zookeeper session has expired and a new session has been created.
Note
registerSessionExpirationListener is used exclusively when Startup event is processed (after ControllerEventThread is started).

Registering ControllerChangeListener for /controller ZNode Changes — registerControllerChangeListener Internal Method

registerControllerChangeListener(): Unit

registerControllerChangeListener requests ZkUtils to subscribe to data changes for /controller znode with a ControllerChangeListener (with the KafkaController and ControllerEventManager).

Note

ControllerChangeListener emits:

  1. ControllerChange event with the current controller ID (on the event queue of ControllerEventManager) every time the data of a znode changes

  2. Reelect event when the data associated with a znode has been deleted

Note
registerControllerChangeListener is used exclusively when Startup event is processed (after ControllerEventThread is started).

registerBrokerChangeListener Internal Method

registerBrokerChangeListener(): Option[Seq[String]]

registerBrokerChangeListener requests ZkUtils to subscribeChildChanges for /brokers/ids path with BrokerChangeListener.

Note
registerBrokerChangeListener is used exclusively when KafkaController does onControllerFailover.

Getting Active Controller ID (from JSON under /controller znode) — getControllerID Method

getControllerID(): Int

getControllerID returns the ID of the active Kafka controller that is associated with /controller znode in JSON format or -1 otherwise.

Internally, getControllerID requests ZkUtils for data associated with /controller znode.

If available, getControllerID parses the data (being the current controller info in JSON format) to extract brokerid field.

$ ./bin/zookeeper-shell.sh :2181 get /controller

{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0

Otherwise, when no /controller znode is available, getControllerID returns -1.

Note

getControllerID is used when:

  1. Processing Reelect controller event

  2. elect

Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — registerTopicDeletionListener Internal Method

registerTopicDeletionListener(): Option[Seq[String]]

registerTopicDeletionListener requests ZkUtils to subscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.

Note
registerTopicDeletionListener is used exclusively when KafkaController does onControllerFailover.

De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — deregisterTopicDeletionListener Internal Method

deregisterTopicDeletionListener(): Unit

deregisterTopicDeletionListener requests ZkUtils to unsubscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.

Note
deregisterTopicDeletionListener is used exclusively when KafkaController resigns as the active controller.

onReplicasBecomeOffline Internal Method

onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit

onReplicasBecomeOffline…​FIXME

Note
onReplicasBecomeOffline is used when…​FIXME

onPartitionReassignment Internal Method

onPartitionReassignment(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext): Unit

onPartitionReassignment…​FIXME

Note
onPartitionReassignment is used when KafkaController is requested to onBrokerStartup, maybeTriggerPartitionReassignment and process a PartitionReassignmentIsrChange event.

onBrokerUpdate Internal Method

onBrokerUpdate(updatedBrokerId: Int): Unit

onBrokerUpdate…​FIXME

Note
onBrokerUpdate is used when…​FIXME

updateBrokerInfo Internal Method

updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit

updateBrokerInfo…​FIXME

Note
updateBrokerInfo is used exclusively when DynamicListenerConfig is requested to reconfigure.

registerBrokerModificationsHandler Internal Method

registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit

registerBrokerModificationsHandler…​FIXME

Note
registerBrokerModificationsHandler is used when KafkaController is requested to onBrokerStartup and onControllerFailover (indirectly through initializeControllerContext).

Initializing ControllerContext — initializeControllerContext Internal Method

initializeControllerContext(): Unit

initializeControllerContext…​FIXME

In the end, initializeControllerContext prints out the following INFO messages to the logs (with the current state based on the ControllerContext):

Currently active brokers in the cluster: [liveBrokerIds]
Currently shutting brokers in the cluster: [shuttingDownBrokerIds]
Current list of topics in the cluster: [allTopics]
Note
initializeControllerContext is used exclusively when KafkaController is requested to onControllerFailover.

unregisterBrokerModificationsHandler Internal Method

unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit

unregisterBrokerModificationsHandler…​FIXME

Note
unregisterBrokerModificationsHandler is used when KafkaController is requested to onControllerResignation and onBrokerFailure.

onBrokerFailure Internal Method

onBrokerFailure(deadBrokers: Seq[Int]): Unit

onBrokerFailure…​FIXME

Note
onBrokerFailure is used exclusively when KafkaController is requested to handle a BrokerChange controller event.

maybeTriggerPartitionReassignment Internal Method

maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]): Unit

maybeTriggerPartitionReassignment…​FIXME

Note
maybeTriggerPartitionReassignment is used when KafkaController is requested to onControllerFailover and process the PartitionReassignment controller event.

incrementControllerEpoch Internal Method

incrementControllerEpoch(): Unit

incrementControllerEpoch…​FIXME

Note
incrementControllerEpoch is used exclusively when KafkaController is requested to onControllerFailover.

fetchPendingPreferredReplicaElections Internal Method

fetchPendingPreferredReplicaElections(): Set[TopicPartition]

fetchPendingPreferredReplicaElections…​FIXME

Note
fetchPendingPreferredReplicaElections is used exclusively when KafkaController is requested to onControllerFailover.

initializePartitionReassignment Internal Method

initializePartitionReassignment(): Unit

initializePartitionReassignment…​FIXME

Note
initializePartitionReassignment is used exclusively when KafkaController is requested to initializeControllerContext.

fetchTopicDeletionsInProgress Internal Method

fetchTopicDeletionsInProgress(): (Set[String], Set[String])

fetchTopicDeletionsInProgress…​FIXME

Note
fetchTopicDeletionsInProgress is used exclusively when KafkaController is requested to onControllerFailover.

updateLeaderAndIsrCache Internal Method

updateLeaderAndIsrCache(partitions: Seq[TopicPartition]

Unless given, updateLeaderAndIsrCache defaults to allPartitions of the ControllerContext for the partitions.

updateLeaderAndIsrCache requests the KafkaZkClient to getTopicPartitionStates (with the given partitions) and updates the partitionLeadershipInfo of the ControllerContext.

Note
updateLeaderAndIsrCache is used when KafkaController is requested to initializeControllerContext (with no partitions) and process an IsrChangeNotification controller event (with partitions given).

areReplicasInIsr Internal Method

areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean

areReplicasInIsr…​FIXME

Note
areReplicasInIsr is used exclusively when KafkaController is requested to onPartitionReassignment.

updateAssignedReplicasForPartition Internal Method

updateAssignedReplicasForPartition(
  partition: TopicPartition,
  replicas: Seq[Int]): Unit

updateAssignedReplicasForPartition…​FIXME

Note
updateAssignedReplicasForPartition is used exclusively when KafkaController is requested to onPartitionReassignment.

registerPartitionModificationsHandlers Internal Method

registerPartitionModificationsHandlers(topics: Seq[String]): Unit

registerPartitionModificationsHandlers…​FIXME

Note
registerPartitionModificationsHandlers is used when KafkaController is requested to initializeControllerContext and a TopicChange controller event is processed.

unregisterPartitionModificationsHandlers Internal Method

unregisterPartitionModificationsHandlers(topics: Seq[String]): Unit

unregisterPartitionModificationsHandlers…​FIXME

Note

unregisterPartitionModificationsHandlers is used when:

unregisterPartitionReassignmentIsrChangeHandlers Internal Method

unregisterPartitionReassignmentIsrChangeHandlers(): Unit

unregisterPartitionReassignmentIsrChangeHandlers…​FIXME

Note
unregisterPartitionReassignmentIsrChangeHandlers is used exclusively when KafkaController is requested to onControllerResignation.

readControllerEpochFromZooKeeper Internal Method

readControllerEpochFromZooKeeper(): Unit

readControllerEpochFromZooKeeper…​FIXME

Note
readControllerEpochFromZooKeeper is used exclusively when KafkaController is requested to onControllerFailover.

removePartitionsFromReassignedPartitions Internal Method

removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]): Unit

removePartitionsFromReassignedPartitions…​FIXME

Note
removePartitionsFromReassignedPartitions is used when KafkaController is requested to onPartitionReassignment and maybeTriggerPartitionReassignment.

removePartitionsFromPreferredReplicaElection Internal Method

removePartitionsFromPreferredReplicaElection(
  partitionsToBeRemoved: Set[TopicPartition],
  isTriggeredByAutoRebalance : Boolean): Unit

removePartitionsFromPreferredReplicaElection…​FIXME

Note
removePartitionsFromPreferredReplicaElection is used exclusively when KafkaController is requested to onPreferredReplicaElection (and the election type is not AdminClientTriggered).

Preferred Replica Leader Election — onPreferredReplicaElection Internal Method

onPreferredReplicaElection(
  partitions: Set[TopicPartition],
  electionType: ElectionType): Map[TopicPartition, Throwable]

onPreferredReplicaElection prints out the following INFO message to the logs:

Starting preferred replica leader election for partitions [partitions]

onPreferredReplicaElection requests the PartitionStateMachine to handle partition state changes for the partitions (with OnlinePartition target state and PreferredReplicaPartitionLeaderElectionStrategy).

(only for election types that are not AdminClientTriggered) In the end, onPreferredReplicaElection removePartitionsFromPreferredReplicaElection.

(only for election types that are not AdminClientTriggered) In case of an error

Note

onPreferredReplicaElection is used when KafkaController is requested for the following:

updateLeaderEpoch Internal Method

updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]

updateLeaderEpoch…​FIXME

Note
updateLeaderEpoch is used exclusively when KafkaController is requested to updateLeaderEpochAndSendRequest.

moveReassignedPartitionLeaderIfRequired Internal Method

moveReassignedPartitionLeaderIfRequired(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext): Unit

moveReassignedPartitionLeaderIfRequired…​FIXME

Note
moveReassignedPartitionLeaderIfRequired is used exclusively when KafkaController is requested to onPartitionReassignment.

onControllerFailover Internal Method

onControllerFailover(): Unit

onControllerFailover prints out the following INFO message to the logs:

Registering handlers

onControllerFailover requests the KafkaZkClient to registerZNodeChildChangeHandlers:

onControllerFailover requests the KafkaZkClient to registerZNodeChangeHandlerAndCheckExistence:

onControllerFailover prints out the following INFO message to the logs:

Deleting log dir event notifications

onControllerFailover requests the KafkaZkClient to deleteLogDirEventNotifications (with the epochZkVersion of the ControllerContext).

onControllerFailover prints out the following INFO message to the logs:

Deleting isr change notifications

onControllerFailover requests the KafkaZkClient to deleteIsrChangeNotifications (with the epochZkVersion of the ControllerContext).

onControllerFailover prints out the following INFO message to the logs:

Initializing controller context

onControllerFailover initializeControllerContext.

onControllerFailover prints out the following INFO message to the logs:

Fetching topic deletions in progress

onControllerFailover fetchTopicDeletionsInProgress.

onControllerFailover prints out the following INFO message to the logs:

Initializing topic deletion manager

onControllerFailover requests the TopicDeletionManager to initialize (with the topics to be deleted and ineligible for deletion).

onControllerFailover prints out the following INFO message to the logs:

Sending update metadata request

onControllerFailover requests the ReplicaStateMachine to start up.

onControllerFailover requests the PartitionStateMachine to start up.

onControllerFailover prints out the following INFO message to the logs:

Ready to serve as the new controller with epoch [epoch]

onControllerFailover requests the TopicDeletionManager to tryTopicDeletion.

onControllerFailover prints out the following INFO message to the logs:

Starting the controller scheduler

onControllerFailover requests the KafkaScheduler to startup.

With auto.leader.rebalance.enable enabled (default: true), onControllerFailover scheduleAutoLeaderRebalanceTask with the initial delay of 5 seconds.

With delegation.token.master.key password set (default: (empty)), onControllerFailover prints out the following INFO message to the logs:

starting the token expiry check scheduler

onControllerFailover requests the tokenCleanScheduler KafkaScheduler to startup and requests it to schedule the delete-expired-tokens task (FIXME).

Note
onControllerFailover is used when KafkaController is requested to elect (and a broker is successfully elected as the active controller).

scheduleAutoLeaderRebalanceTask Internal Method

scheduleAutoLeaderRebalanceTask(
  delay: Long,
  unit: TimeUnit): Unit

scheduleAutoLeaderRebalanceTask simply requests the KafkaScheduler to schedule a task called auto-leader-rebalance-task with the given initial delay.

The auto-leader-rebalance-task simply requests the ControllerEventManager to enqueue an AutoPreferredReplicaLeaderElection controller event.

Note
scheduleAutoLeaderRebalanceTask is used when KafkaController is requested to onControllerFailover and processAutoPreferredReplicaLeaderElection

processAutoPreferredReplicaLeaderElection Internal Method

processAutoPreferredReplicaLeaderElection(): Unit
Note
processAutoPreferredReplicaLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.

processAutoPreferredReplicaLeaderElection prints out the following INFO message to the logs:

Processing automatic preferred replica leader election

processAutoPreferredReplicaLeaderElection checkAndTriggerAutoLeaderRebalance.

In the end, processAutoPreferredReplicaLeaderElection scheduleAutoLeaderRebalanceTask with the initial delay based on leader.imbalance.check.interval.seconds configuration property (default: 300 seconds).

Note
processAutoPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to process a AutoPreferredReplicaLeaderElection event.

checkAndTriggerAutoLeaderRebalance Internal Method

checkAndTriggerAutoLeaderRebalance(): Unit

checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:

Checking need to trigger auto leader balancing

checkAndTriggerAutoLeaderRebalance…​FIXME

checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:

Preferred replicas by broker [preferredReplicasForTopicsByBrokers]

For every broker with one or more partition leaders, checkAndTriggerAutoLeaderRebalance…​FIXME

checkAndTriggerAutoLeaderRebalance prints out the following DEBUG message to the logs:

Topics not in preferred replica for broker [leaderBroker] [topicsNotInPreferredReplica]

checkAndTriggerAutoLeaderRebalance calculates an imbalance ratio of the broker which is the number of topicsNotInPreferredReplica divided by the total number of partitions (topicPartitionsForBroker).

checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:

Leader imbalance ratio for broker [leaderBroker] is [imbalanceRatio]

With the imbalance ratio greater than the desired ratio (per leader.imbalance.per.broker.percentage configuration property with the default: 10%), checkAndTriggerAutoLeaderRebalance onPreferredReplicaElection for…​FIXME (with AutoTriggered election type).

Note
checkAndTriggerAutoLeaderRebalance is used exclusively when KafkaController is requested to processAutoPreferredReplicaLeaderElection.

onNewPartitionCreation Internal Method

onNewPartitionCreation(
  newPartitions: Set[TopicPartition]): Unit

onNewPartitionCreation…​FIXME

Note
onNewPartitionCreation is used when TopicChange and PartitionModifications controller events are processed.

onReplicaElection Internal Method

onReplicaElection(
  partitions: Set[TopicPartition],
  electionType: ElectionType,
  electionTrigger: ElectionTrigger): Map[TopicPartition, Either[Throwable, LeaderAndIsr]]

onReplicaElection…​FIXME

Note
onReplicaElection is used when…​FIXME

Handling Log Directory Failures for Brokers — onBrokerLogDirFailure Internal Method

onBrokerLogDirFailure(
  brokerIds: Seq[Int]): Unit

onBrokerLogDirFailure prints out the following INFO message to the logs:

Handling log directory failure for brokers [brokerIds]

onBrokerLogDirFailure requests the ControllerContext for the replicas on the brokers and then requests the ReplicaStateMachine to handle state changes for the replicas to enter OnlineReplica state.

Note
onBrokerLogDirFailure is used exclusively when KafkaController is requested to process a LogDirEventNotification controller event (on controller-event-thread).

stopOldReplicasOfReassignedPartition Internal Method

stopOldReplicasOfReassignedPartition(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext,
  oldReplicas: Set[Int]): Unit

stopOldReplicasOfReassignedPartition…​FIXME

Note
stopOldReplicasOfReassignedPartition is used when…​FIXME

startNewReplicasForReassignedPartition Internal Method

startNewReplicasForReassignedPartition(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext,
  newReplicas: Set[Int]): Unit

startNewReplicasForReassignedPartition…​FIXME

Note
startNewReplicasForReassignedPartition is used when…​FIXME

enableDefaultUncleanLeaderElection Method

enableDefaultUncleanLeaderElection(): Unit
Note
enableDefaultUncleanLeaderElection does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.

enableDefaultUncleanLeaderElection simply requests the ControllerEventManager to enqueue a UncleanLeaderElectionEnable event.

Note
enableDefaultUncleanLeaderElection is used when DynamicLogConfig is requested to reconfigure (for unclean.leader.election.enable configuration property).

Preferred Replica Leader Election — electPreferredLeaders Method

electPreferredLeaders(
  partitions: Set[TopicPartition],
  callback: ElectPreferredLeadersCallback = { (_, _) => }): Unit

electPreferredLeaders simply requests the ControllerEventManager to enqueue an PreferredReplicaLeaderElection event (with AdminClientTriggered election type)

Note
electPreferredLeaders is used exclusively when ReplicaManager is requested to trigger preferred replica leader election.

maybeResign Internal Method

maybeResign(): Unit

maybeResign…​FIXME

Note
maybeResign is used when KafkaController is requested to…​FIXME

processControlledShutdown Internal Method

processControlledShutdown(
  id: Int,
  brokerEpoch: Long,
  controlledShutdownCallback: Try[Set[TopicPartition]] => Unit): Unit

processControlledShutdown…​FIXME

Note
processControlledShutdown is used exclusively when KafkaController is requested to process a ControlledShutdown controller event.

doControlledShutdown Internal Method

doControlledShutdown(
  id: Int,
  brokerEpoch: Long): Set[TopicPartition]

doControlledShutdown…​FIXME

Note
doControlledShutdown is used exclusively when KafkaController is requested to processControlledShutdown.

processUncleanLeaderElectionEnable Internal Method

processUncleanLeaderElectionEnable(): Unit
Note
processUncleanLeaderElectionEnable does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.

processUncleanLeaderElectionEnable prints out the following INFO message to the logs:

Unclean leader election has been enabled by default

processUncleanLeaderElectionEnable requests the PartitionStateMachine to triggerOnlinePartitionStateChange.

Note
processUncleanLeaderElectionEnable is used exclusively when KafkaController is requested to process a UncleanLeaderElectionEnable controller event.

processTopicUncleanLeaderElectionEnable Internal Method

processTopicUncleanLeaderElectionEnable(topic: String): Unit

processTopicUncleanLeaderElectionEnable…​FIXME

Note
processTopicUncleanLeaderElectionEnable is used when KafkaController is requested to process a TopicUncleanLeaderElectionEnable controller event.

Processing LeaderAndIsrResponseReceived Controller Event (On controller-event-thread) — processLeaderAndIsrResponseReceived Internal Method

processLeaderAndIsrResponseReceived(
  leaderAndIsrResponseObj: AbstractResponse,
  brokerId: Int): Unit

processLeaderAndIsrResponseReceived…​FIXME

Note
processLeaderAndIsrResponseReceived is used exclusively when KafkaController is requested to process a LeaderAndIsrResponseReceived controller event (on the controller-event-thread).

processTopicDeletionStopReplicaResponseReceived Internal Method

processTopicDeletionStopReplicaResponseReceived(
  replicaId: Int,
  requestError: Errors,
  partitionErrors: Map[TopicPartition, Errors]): Unit

processTopicDeletionStopReplicaResponseReceived…​FIXME

Note
processTopicDeletionStopReplicaResponseReceived is used exclusively when KafkaController is requested to process a TopicDeletionStopReplicaResponseReceived controller event.

Processing BrokerChange Controller Event (On controller-event-thread) — processBrokerChange Internal Method

processBrokerChange(): Unit
Note
processBrokerChange does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.

processBrokerChange requests the KafkaZkClient for the brokers in a Kafka cluster and compares the broker list with the current brokers (of the ControllerContext).

At this point in time, processBrokerChange knows what brokers are new, dead or bounced.

processBrokerChange prints out the following INFO message to the logs:

Newly added brokers: [ids], deleted brokers: [ids], bounced brokers: [ids], all live brokers: [ids]

processBrokerChange notifies (updates) the ControllerChannelManager:

processBrokerChange updates the ControllerContext:

In the end, only when they were any updates (new, dead or bounced brokers), processBrokerChange prints out the following INFO message to the logs:

Updated broker epochs cache: [liveBrokerIdAndEpochs]
Note
processBrokerChange is used exclusively when KafkaController is requested to process a BrokerChange controller event (on the controller-event-thread).

processBrokerModification Internal Method

processBrokerModification(
  brokerId: Int): Unit

processBrokerModification…​FIXME

Note
processBrokerModification is used exclusively when KafkaController is requested to process a BrokerModifications controller event.

processControllerChange Internal Method

processControllerChange(): Unit

processControllerChange…​FIXME

Note
processControllerChange is used exclusively when KafkaController is requested to process a ControllerChange controller event.

processReelect Internal Method

processReelect(): Unit

processReelect…​FIXME

Note
processReelect is used when KafkaController is requested to processRegisterBrokerAndReelect and process a Reelect controller event.

processRegisterBrokerAndReelect Internal Method

processRegisterBrokerAndReelect(): Unit

processRegisterBrokerAndReelect…​FIXME

Note
processRegisterBrokerAndReelect is used exclusively when KafkaController is requested to process a RegisterBrokerAndReelect controller event.

processExpire Internal Method

processExpire(): Unit

processExpire…​FIXME

Note
processExpire is used exclusively when KafkaController is requested to process a Expire controller event.

processTopicChange Internal Method

processTopicChange(): Unit

processTopicChange…​FIXME

Note
processTopicChange is used exclusively when KafkaController is requested to process a TopicChange controller event.

Processing LogDirEventNotification Controller Event (On controller-event-thread) — processLogDirEventNotification Internal Method

processLogDirEventNotification(): Unit
Note
processLogDirEventNotification does nothing (and simply returns) unless the Kafka broker (KafkaController) is an active controller.

processLogDirEventNotification requests the KafkaZkClient for the LogDirEvent notifications (from Zookeeper) (sequence numbers).

In the end, processLogDirEventNotification requests the KafkaZkClient to delete LogDirEvent notifications (from Zookeeper).

Note
processLogDirEventNotification is used exclusively when KafkaController is requested to process a LogDirEventNotification controller event.

processPartitionModifications Internal Method

processPartitionModifications(
  topic: String): Unit

processPartitionModifications…​FIXME

Note
processPartitionModifications is used exclusively when KafkaController is requested to process a PartitionModifications controller event.

processTopicDeletion Internal Method

processTopicDeletion(): Unit

processTopicDeletion…​FIXME

Note
processTopicDeletion is used exclusively when KafkaController is requested to process a TopicDeletion controller event.

processPartitionReassignmentIsrChange Internal Method

processPartitionReassignmentIsrChange(
  partition: TopicPartition): Unit

processPartitionReassignmentIsrChange…​FIXME

Note
processPartitionReassignmentIsrChange is used exclusively when KafkaController is requested to process a PartitionReassignmentIsrChange controller event.

processIsrChangeNotification Internal Method

processIsrChangeNotification(): Unit

processIsrChangeNotification…​FIXME

Note
processIsrChangeNotification is used exclusively when KafkaController is requested to process a IsrChangeNotification controller event.

processStartup Internal Method

processStartup(): Unit

In the end, processStartup starts controller election.

Note
processStartup is used exclusively when KafkaController is requested to process a Startup controller event (on the controller-event-thread).

processZkPartitionReassignment Internal Method

processZkPartitionReassignment(): Unit

processZkPartitionReassignment…​FIXME

Note
processZkPartitionReassignment is used when KafkaController is requested to process a ZkPartitionReassignment controller event and initializePartitionReassignments (for KafkaController to process Startup and Reelect controller events).

initializePartitionReassignments Internal Method

initializePartitionReassignments(): Unit

initializePartitionReassignments…​FIXME

Note
initializePartitionReassignments is used when KafkaController is requested to onControllerFailover (for KafkaController to process Startup and Reelect controller events).

processReplicaLeaderElection Internal Method

processReplicaLeaderElection(
  partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
  electionType: ElectionType,
  electionTrigger: ElectionTrigger,
  callback: ElectLeadersCallback): Unit

processReplicaLeaderElection…​FIXME

Note
processReplicaLeaderElection is used when KafkaController is requested to process a ReplicaLeaderElection controller event.

processListPartitionReassignments Internal Method

processListPartitionReassignments(
  partitionsOpt: Option[Set[TopicPartition]],
  callback: ListReassignmentsCallback): Unit

processListPartitionReassignments…​FIXME

Note
processListPartitionReassignments is used when KafkaController is requested to process a ListPartitionReassignments controller event.

processApiPartitionReassignment Internal Method

processApiPartitionReassignment(
  reassignments: Map[TopicPartition, Option[Seq[Int]]],
  callback: AlterReassignmentsCallback): Unit

processApiPartitionReassignment…​FIXME

Note
processApiPartitionReassignment is used when KafkaController is requested to process a ApiPartitionReassignment controller event.

Preempting PreferredReplicaLeaderElection — preemptPreferredReplicaLeaderElection Internal Method

preemptPreferredReplicaLeaderElection(
  partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
  callback: ElectPreferredLeadersCallback = (_, _) => {}): Unit

preemptPreferredReplicaLeaderElection…​FIXME

Note
preemptPreferredReplicaLeaderElection is used exclusively when KafkaController is requested to preempt a PreferredReplicaLeaderElection controller event.

Internal Properties

Name Description

activeControllerId

The ID of the active KafkaController

  • Initialized to -1

brokerRequestBatch

controllerChangeHandler

A ZNodeChangeHandler (for the KafkaController and the ControllerEventManager) that listens to change events on /controller znode.

controllerChangeHandler emits controller events as follows:

  • ControllerChange when the znode is created or the znode data changed

  • Reelect when the znode is deleted

eventManager

kafkaScheduler

KafkaScheduler with 1 daemon thread with kafka-scheduler prefix

stateChangeLogger

StateChangeLogger with the broker ID and inControllerContext flag enabled

tokenCleanScheduler

KafkaScheduler with 1 daemon thread with delegation-token-cleaner prefix

topicDeletionManager

TopicDeletionManager

results matching ""

    No results matching ""