KafkaController

KafkaController is a Kafka service that runs on every Kafka broker.

Only one KafkaController can be active (elected) among all the Kafka brokers in a Kafka cluster. The process of promoting a KafkaController to be active among brokers is Kafka Controller Election.

KafkaController is a Kafka service responsible for:

In a Kafka cluster, one of the brokers serves as the controller, which is responsible for managing the states of partitions and replicas and for performing administrative tasks like reassigning partitions.

KafkaController is created and immediately started when KafkaServer is requested to start up.

KafkaController.png
Figure 1. KafkaController

KafkaController is in one of the ControllerStates (that is the state of the ControllerEventManager).

KafkaController uses the KafkaZkClient for the following:

KafkaController uses listeners as a notification system to monitor znodes in Zookeeper and react accordingly.

KafkaController emulates a state machine using controller events.

Table 1. KafkaController’s Controller Events
Event ControllerState process Handler

BrokerChange

BrokerChange

ControllerChange

  • newControllerId: Int

ControllerChange

  1. Assigns the current controller ID as the input newControllerId

  2. (only when the broker is no longer an active controller) Resigns as the active controller

Note
Similar to Reelect event with the only difference that it does not trigger election

ControlledShutdown

  • ID

  • controlledShutdownCallback: Try[Set[TopicAndPartition]] ⇒ Unit

ControlledShutdown

IsrChangeNotification

LogDirEventNotification

PartitionReassignment

PreferredReplicaLeaderElection

Reelect

ControllerChange

Startup

ControllerChange

TopicChange

TopicDeletion

Table 2. KafkaController’s ZNodeChangeHandler and ZNodeChildChangeHandlers
Name Description

brokerChangeHandler

ZNodeChildChangeHandler of /brokers/ids path

On handleChildChange, brokerChangeHandler simply sends BrokerChange event to the ControllerEventManager.

isrChangeNotificationHandler

ZNodeChildChangeHandler of /isr_change_notification path

On handleChildChange, isrChangeNotificationHandler simply sends IsrChangeNotification event to the ControllerEventManager.

logDirEventNotificationHandler

ZNodeChildChangeHandler of /log_dir_event_notification path

On handleChildChange, logDirEventNotificationHandler simply sends LogDirEventNotification event to the ControllerEventManager.

partitionModificationsHandlers

ZNodeChangeHandlers per topic of /brokers/topics/[topic] path

On handleDataChange, partitionModificationsHandlers simply send PartitionModifications event to the ControllerEventManager.

partitionReassignmentHandler

ZNodeChangeHandler of /admin/reassign_partitions path

On handleCreation, partitionReassignmentHandler simply sends PartitionReassignment event to the ControllerEventManager.

preferredReplicaElectionHandler

ZNodeChangeHandler of /admin/preferred_replica_election path

On handleCreation, preferredReplicaElectionHandler simply sends PreferredReplicaLeaderElection event to the ControllerEventManager.

topicChangeHandler

ZNodeChildChangeHandler of /brokers/topics path

On handleChildChange, topicChangeHandler simply sends TopicChange event to the ControllerEventManager.

topicDeletionHandler

ZNodeChildChangeHandler of /admin/delete_topics path

On handleChildChange, topicDeletionHandler simply sends TopicDeletion event to the ControllerEventManager.

Table 3. KafkaController’s Internal Properties (e.g. Registries and Counters)
Name Description

activeControllerId

The ID of the active KafkaController

  • Initialized to -1

controllerChangeHandler

A ZNodeChangeHandler (for the KafkaController and the ControllerEventManager) that listens to change events on /controller znode.

controllerChangeHandler emits controller events as follows:

  • ControllerChange when the znode is created or the znode data changed

  • Reelect when the znode is deleted

controllerContext

eventManager

kafkaScheduler

KafkaScheduler with 1 daemon thread with kafka-scheduler prefix

partitionStateMachine

replicaStateMachine

ReplicaStateMachine

stateChangeLogger

StateChangeLogger with the broker ID and inControllerContext flag enabled

tokenCleanScheduler

KafkaScheduler with 1 daemon thread with delegation-token-cleaner prefix

topicDeletionManager

TopicDeletionManager

Table 4. KafkaController’s Listeners
Listener Description

brokerChangeListener

BrokerChangeListener for this KafkaController and eventManager

isrChangeNotificationListener

IsrChangeNotificationListener for this KafkaController and eventManager

Registered in registerIsrChangeNotificationListener when KafkaController does onControllerFailover.

De-registered in deregisterIsrChangeNotificationListener when KafkaController resigns as the active controller.

logDirEventNotificationListener

LogDirEventNotificationListener

partitionModificationsListeners

PartitionModificationsListener by name

partitionReassignmentListener

PartitionReassignmentListener for this KafkaController and ControllerEventManager

preferredReplicaElectionListener

PreferredReplicaElectionListener for this KafkaController and ControllerEventManager

topicDeletionListener

TopicDeletionListener (for this KafkaController and ControllerEventManager)

Registered in registerTopicDeletionListener when KafkaController does onControllerFailover.

De-registered in deregisterTopicDeletionListener when KafkaController resigns as the active controller.

KafkaController uses [Controller id=[brokerId]] as the logging prefix (aka logIdent).

Tip

Enable WARN, INFO or DEBUG logging levels for kafka.controller.KafkaController logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.KafkaController=DEBUG

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.controller logger in config/log4j.properties:

log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.controller=TRACE, controllerAppender
log4j.additivity.kafka.controller=false

That means that the logs of KafkaController go to logs/controller.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

initiateReassignReplicasForTopicPartition Method

initiateReassignReplicasForTopicPartition

initiateReassignReplicasForTopicPartition…​FIXME

Note
initiateReassignReplicasForTopicPartition is used when…​FIXME

deregisterPartitionReassignmentIsrChangeListeners Method

deregisterPartitionReassignmentIsrChangeListeners

deregisterPartitionReassignmentIsrChangeListeners…​FIXME

Note
deregisterPartitionReassignmentIsrChangeListeners is used when…​FIXME

resetControllerContext Method

resetControllerContext

resetControllerContext…​FIXME

Note
resetControllerContext is used when…​FIXME

deregisterBrokerChangeListener Method

deregisterBrokerChangeListener

deregisterBrokerChangeListener…​FIXME

Note
deregisterBrokerChangeListener is used when…​FIXME

deregisterTopicChangeListener Method

deregisterTopicChangeListener

deregisterTopicChangeListener…​FIXME

Note
deregisterTopicChangeListener is used when…​FIXME

Resigning As Active Controller — onControllerResignation Method

onControllerResignation(): Unit

onControllerResignation starts by printing out the following DEBUG message to the logs:

Resigning

onControllerResignation unsubscribes from intercepting Zookeeper events for the following znodes in order:

onControllerResignation requests TopicDeletionManager to reset.

onControllerResignation requests KafkaScheduler to shutdown.

onControllerResignation resets the following internal counters:

onControllerResignation requests PartitionStateMachine to shutdown.

onControllerResignation deregisterTopicChangeListener.

onControllerResignation deregisterTopicDeletionListener.

onControllerResignation requests ReplicaStateMachine to shutdown.

onControllerResignation deregisterBrokerChangeListener.

onControllerResignation resetControllerContext.

In the end, onControllerResignation prints out the following DEBUG message to the logs:

Resigned
Note

onControllerResignation is used when:

Unsubscribing from Child Changes to /isr_change_notification ZNode — deregisterIsrChangeNotificationListener Internal Method

deregisterIsrChangeNotificationListener(): Unit

deregisterIsrChangeNotificationListener prints out the following DEBUG message to the logs:

De-registering IsrChangeNotificationListener

deregisterIsrChangeNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /isr_change_notification znode with IsrChangeNotificationListener.

Note
deregisterIsrChangeNotificationListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Child Changes to /log_dir_event_notification ZNode — deregisterLogDirEventNotificationListener Internal Method

deregisterLogDirEventNotificationListener(): Unit

deregisterLogDirEventNotificationListener prints out the following DEBUG message to the logs:

De-registering logDirEventNotificationListener

deregisterLogDirEventNotificationListener requests ZkUtils to unsubscribe from intercepting changes to /log_dir_event_notification znode with LogDirEventNotificationListener.

Note
deregisterLogDirEventNotificationListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Data Changes to /admin/preferred_replica_election ZNode — deregisterPreferredReplicaElectionListener Method

deregisterPreferredReplicaElectionListener(): Unit

deregisterPreferredReplicaElectionListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/preferred_replica_election znode with PreferredReplicaElectionListener.

Note
deregisterPreferredReplicaElectionListener is used exclusively when KafkaController resigns as the active controller.

Unsubscribing from Data Changes to /admin/reassign_partitions ZNode — deregisterPartitionReassignmentListener Method

deregisterPartitionReassignmentListener(): Unit

deregisterPartitionReassignmentListener requests ZkUtils to unsubscribe from intercepting data changes to /admin/reassign_partitions znode with PartitionReassignmentListener.

Note
deregisterPartitionReassignmentListener is used exclusively when KafkaController resigns as the active controller.

triggerControllerMove Internal Method

triggerControllerMove(): Unit

triggerControllerMove…​FIXME

Note

triggerControllerMove is used when:

  1. KafkaController handleIllegalState

  2. KafkaController caught an exception while electing or becoming a controller

handleIllegalState Internal Method

handleIllegalState(e: IllegalStateException): Nothing

handleIllegalState…​FIXME

Note
handleIllegalState is used when KafkaController catches a IllegalStateException in updateLeaderEpochAndSendRequest, sendUpdateMetadataRequest and ControlledShutdown event.

sendUpdateMetadataRequest Method

sendUpdateMetadataRequest(): Unit

sendUpdateMetadataRequest…​FIXME

Note

sendUpdateMetadataRequest is used when:

updateLeaderEpochAndSendRequest Internal Method

updateLeaderEpochAndSendRequest(): Unit

updateLeaderEpochAndSendRequest…​FIXME

Note
updateLeaderEpochAndSendRequest is used when KafkaController is requested to onPartitionReassignment and moveReassignedPartitionLeaderIfRequired.

Shutting Down — shutdown Method

shutdown(): Unit

shutdown requests the ControllerEventManager to close followed by onControllerResignation.

Note
shutdown is used exclusively when KafkaServer is requested to shutdown.

updateMetrics Internal Method

Caution
FIXME

onBrokerStartup Method

onBrokerStartup(newBrokers: Seq[Int]): Unit

onBrokerStartup…​FIXME

Note
onBrokerStartup is used exclusively when KafkaController processes BrokerChange controller event.

Controller Election — elect Method

elect(): Unit

elect requests the KafkaZkClient for the active controller ID.

elect stops the controller election if there is an active controller ID available and prints out the following DEBUG message to the logs:

Broker [activeControllerId] has been elected as the controller, so stopping the election process.

Otherwise, elect requests the KafkaZkClient to create an ephemeral znode at /controller path with the znode data in JSON:

{"version":1,"brokerid":[brokerId],"timestamp":[timestamp]}
Note
elect always uses 1 for the version.
Note
elect is used when ControllerEventThread is requested to process Startup and Reelect controller events (while processing controller events).

Controller Elected

If successful, elect prints out the following INFO message to the logs and records the current broker ID as the activeControllerId.

[brokerId] successfully elected as the controller

In the end, elect does onControllerFailover.

Controller Has Already Been Elected (NodeExistsException)

If unsuccessful (and a NodeExistsException was reported), elect requests the KafkaZkClient for the active controller ID.

elect then prints out the following DEBUG message to the logs:

Broker [activeControllerId] was elected as controller instead of broker [brokerId]

If however the active controller ID is still unavailable, elect prints out the following WARN message to the logs:

A controller has been elected but just resigned, this will result in another round of election

Other Errors (Throwable)

If unsuccessful (and a Throwable was reported), elect prints out the following ERROR message to the logs and does triggerControllerMove:

Error while electing or becoming controller on broker [brokerId]

Is KafkaController The Active Controller? — isActive Method

isActive: Boolean

isActive flag says whether the current broker (by the ID) is the active controller (given the activeControllerId).

Note
isActive is on (true) after the KafkaController of a Kafka broker has been elected.
Note

isActive is used (as a valve to stop processing early) when:

registerIsrChangeNotificationListener Internal Method

registerIsrChangeNotificationListener(): Option[Seq[String]]

registerIsrChangeNotificationListener…​FIXME

Note
registerIsrChangeNotificationListener is used when…​FIXME

deregisterIsrChangeNotificationListener Internal Method

deregisterIsrChangeNotificationListener(): Unit

deregisterIsrChangeNotificationListener…​FIXME

Note
deregisterIsrChangeNotificationListener is used when…​FIXME

Creating KafkaController Instance

KafkaController takes the following when created:

KafkaController initializes the internal registries and counters.

Starting Up — startup Method

startup(): Unit

startup requests the KafkaZkClient to register a StateChangeHandler (under the name controller-state-change-handler) that is does the following:

  • On afterInitializingSession, the StateChangeHandler simply puts RegisterBrokerAndReelect event on the event queue of the ControllerEventManager

  • On beforeInitializingSession, the StateChangeHandler simply puts Expire event on the event queue of the ControllerEventManager

startup then puts Startup event at the end of the event queue of the ControllerEventManager and immediately requests it to start.

Note
startup is used exclusively when KafkaServer is requested to start.

Registering SessionExpirationListener To Control Session Recreation — registerSessionExpirationListener Internal Method

registerSessionExpirationListener(): Unit

registerSessionExpirationListener requests ZkUtils to subscribe to state changes with a SessionExpirationListener (with the KafkaController and ControllerEventManager).

Note
SessionExpirationListener puts Reelect event on the event queue of ControllerEventManager every time the Zookeeper session has expired and a new session has been created.
Note
registerSessionExpirationListener is used exclusively when Startup event is processed (after ControllerEventThread is started).

Registering ControllerChangeListener for /controller ZNode Changes — registerControllerChangeListener Internal Method

registerControllerChangeListener(): Unit

registerControllerChangeListener requests ZkUtils to subscribe to data changes for /controller znode with a ControllerChangeListener (with the KafkaController and ControllerEventManager).

Note

ControllerChangeListener emits:

  1. ControllerChange event with the current controller ID (on the event queue of ControllerEventManager) every time the data of a znode changes

  2. Reelect event when the data associated with a znode has been deleted

Note
registerControllerChangeListener is used exclusively when Startup event is processed (after ControllerEventThread is started).

registerBrokerChangeListener Internal Method

registerBrokerChangeListener(): Option[Seq[String]]

registerBrokerChangeListener requests ZkUtils to subscribeChildChanges for /brokers/ids path with BrokerChangeListener.

Note
registerBrokerChangeListener is used exclusively when KafkaController does onControllerFailover.

Getting Active Controller ID (from JSON under /controller znode) — getControllerID Method

getControllerID(): Int

getControllerID returns the ID of the active Kafka controller that is associated with /controller znode in JSON format or -1 otherwise.

Internally, getControllerID requests ZkUtils for data associated with /controller znode.

If available, getControllerID parses the data (being the current controller info in JSON format) to extract brokerid field.

$ ./bin/zookeeper-shell.sh :2181 get /controller

{"version":1,"brokerid":0,"timestamp":"1543499076007"}
cZxid = 0x60
ctime = Thu Nov 29 14:44:36 CET 2018
mZxid = 0x60
mtime = Thu Nov 29 14:44:36 CET 2018
pZxid = 0x60
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100073f07ba0003
dataLength = 54
numChildren = 0

Otherwise, when no /controller znode is available, getControllerID returns -1.

Note

getControllerID is used when:

  1. Processing Reelect controller event

  2. elect

Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — registerTopicDeletionListener Internal Method

registerTopicDeletionListener(): Option[Seq[String]]

registerTopicDeletionListener requests ZkUtils to subscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.

Note
registerTopicDeletionListener is used exclusively when KafkaController does onControllerFailover.

De-Registering TopicDeletionListener for Child Changes to /admin/delete_topics ZNode — deregisterTopicDeletionListener Internal Method

deregisterTopicDeletionListener(): Unit

deregisterTopicDeletionListener requests ZkUtils to unsubscribeChildChanges to /admin/delete_topics znode with TopicDeletionListener.

Note
deregisterTopicDeletionListener is used exclusively when KafkaController resigns as the active controller.

processUpdateNotifications Internal Method

processUpdateNotifications(partitions: Seq[TopicPartition]): Unit

processUpdateNotifications…​FIXME

Note
processUpdateNotifications is used when…​FIXME

onReplicasBecomeOffline Internal Method

onReplicasBecomeOffline(newOfflineReplicas: Set[PartitionAndReplica]): Unit

onReplicasBecomeOffline…​FIXME

Note
onReplicasBecomeOffline is used when…​FIXME

onPartitionReassignment Internal Method

onPartitionReassignment(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext): Unit

onPartitionReassignment…​FIXME

Note
onPartitionReassignment is used when…​FIXME

onBrokerUpdate Internal Method

onBrokerUpdate(updatedBrokerId: Int): Unit

onBrokerUpdate…​FIXME

Note
onBrokerUpdate is used when…​FIXME

updateBrokerInfo Internal Method

updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit

updateBrokerInfo…​FIXME

Note
updateBrokerInfo is used exclusively when DynamicListenerConfig is requested to reconfigure.

registerBrokerModificationsHandler Internal Method

registerBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit

registerBrokerModificationsHandler…​FIXME

Note
registerBrokerModificationsHandler is used when KafkaController is requested to onBrokerStartup and onControllerFailover (indirectly through initializeControllerContext).

initializeControllerContext Internal Method

initializeControllerContext(): Unit

initializeControllerContext…​FIXME

Note
initializeControllerContext is used exclusively when KafkaController is requested to onControllerFailover.

unregisterBrokerModificationsHandler Internal Method

unregisterBrokerModificationsHandler(brokerIds: Iterable[Int]): Unit

unregisterBrokerModificationsHandler…​FIXME

Note
unregisterBrokerModificationsHandler is used when KafkaController is requested to onControllerResignation and onBrokerFailure.

onBrokerFailure Internal Method

onBrokerFailure(deadBrokers: Seq[Int]): Unit

onBrokerFailure…​FIXME

Note
onBrokerFailure is used exclusively when KafkaController is requested to handle a BrokerChange controller event.

maybeTriggerPartitionReassignment Internal Method

maybeTriggerPartitionReassignment(topicPartitions: Set[TopicPartition]): Unit

maybeTriggerPartitionReassignment…​FIXME

Note
maybeTriggerPartitionReassignment is used when KafkaController is requested to onControllerFailover and process the PartitionReassignment controller event.

incrementControllerEpoch Internal Method

incrementControllerEpoch(): Unit

incrementControllerEpoch…​FIXME

Note
incrementControllerEpoch is used exclusively when KafkaController is requested to onControllerFailover.

fetchPendingPreferredReplicaElections Internal Method

fetchPendingPreferredReplicaElections(): Set[TopicPartition]

fetchPendingPreferredReplicaElections…​FIXME

Note
fetchPendingPreferredReplicaElections is used exclusively when KafkaController is requested to onControllerFailover.

initializePartitionReassignment Internal Method

initializePartitionReassignment(): Unit

initializePartitionReassignment…​FIXME

Note
initializePartitionReassignment is used exclusively when KafkaController is requested to initializeControllerContext.

fetchTopicDeletionsInProgress Internal Method

fetchTopicDeletionsInProgress(): (Set[String], Set[String])

fetchTopicDeletionsInProgress…​FIXME

Note
fetchTopicDeletionsInProgress is used exclusively when KafkaController is requested to onControllerFailover.

updateLeaderAndIsrCache Internal Method

updateLeaderAndIsrCache(partitions: Seq[TopicPartition]

Unless given, updateLeaderAndIsrCache defaults to allPartitions of the ControllerContext for the partitions.

updateLeaderAndIsrCache requests the KafkaZkClient to getTopicPartitionStates for the partitions.

For every pair of a TopicPartition and the LeaderIsrAndControllerEpoch, updateLeaderAndIsrCache adds them to the partitionLeadershipInfo of the ControllerContext.

Note
updateLeaderAndIsrCache is used when KafkaController is requested to initializeControllerContext and process a IsrChangeNotification controller event.

areReplicasInIsr Internal Method

areReplicasInIsr(partition: TopicPartition, replicas: Seq[Int]): Boolean

areReplicasInIsr…​FIXME

Note
areReplicasInIsr is used exclusively when KafkaController is requested to onPartitionReassignment.

updateAssignedReplicasForPartition Internal Method

updateAssignedReplicasForPartition(
  partition: TopicPartition,
  replicas: Seq[Int]): Unit

updateAssignedReplicasForPartition…​FIXME

Note
updateAssignedReplicasForPartition is used exclusively when KafkaController is requested to onPartitionReassignment.

registerPartitionModificationsHandlers Internal Method

registerPartitionModificationsHandlers(topics: Seq[String]): Unit

registerPartitionModificationsHandlers…​FIXME

Note
registerPartitionModificationsHandlers is used when KafkaController is requested to initializeControllerContext and a TopicChange controller event is processed.

unregisterPartitionModificationsHandlers Internal Method

unregisterPartitionModificationsHandlers(topics: Seq[String]): Unit

unregisterPartitionModificationsHandlers…​FIXME

Note

unregisterPartitionModificationsHandlers is used when:

unregisterPartitionReassignmentIsrChangeHandlers Internal Method

unregisterPartitionReassignmentIsrChangeHandlers(): Unit

unregisterPartitionReassignmentIsrChangeHandlers…​FIXME

Note
unregisterPartitionReassignmentIsrChangeHandlers is used exclusively when KafkaController is requested to onControllerResignation.

readControllerEpochFromZooKeeper Internal Method

readControllerEpochFromZooKeeper(): Unit

readControllerEpochFromZooKeeper…​FIXME

Note
readControllerEpochFromZooKeeper is used exclusively when KafkaController is requested to onControllerFailover.

removePartitionsFromReassignedPartitions Internal Method

removePartitionsFromReassignedPartitions(partitionsToBeRemoved: Set[TopicPartition]): Unit

removePartitionsFromReassignedPartitions…​FIXME

Note
removePartitionsFromReassignedPartitions is used when KafkaController is requested to onPartitionReassignment and maybeTriggerPartitionReassignment.

removePartitionsFromPreferredReplicaElection Internal Method

removePartitionsFromPreferredReplicaElection(
  partitionsToBeRemoved: Set[TopicPartition],
  isTriggeredByAutoRebalance : Boolean): Unit

removePartitionsFromPreferredReplicaElection…​FIXME

Note
removePartitionsFromPreferredReplicaElection is used exclusively when KafkaController is requested to onPreferredReplicaElection.

onPreferredReplicaElection Internal Method

onPreferredReplicaElection(
  partitions: Set[TopicPartition],
  isTriggeredByAutoRebalance: Boolean = false): Unit

onPreferredReplicaElection…​FIXME

Note
onPreferredReplicaElection is used when KafkaController is requested to onControllerFailover, checkAndTriggerAutoLeaderRebalance and process a PreferredReplicaLeaderElection controller event.

updateLeaderEpoch Internal Method

updateLeaderEpoch(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch]

updateLeaderEpoch…​FIXME

Note
updateLeaderEpoch is used exclusively when KafkaController is requested to updateLeaderEpochAndSendRequest.

moveReassignedPartitionLeaderIfRequired Internal Method

moveReassignedPartitionLeaderIfRequired(
  topicPartition: TopicPartition,
  reassignedPartitionContext: ReassignedPartitionsContext): Unit

moveReassignedPartitionLeaderIfRequired…​FIXME

Note
moveReassignedPartitionLeaderIfRequired is used exclusively when KafkaController is requested to onPartitionReassignment.

onControllerFailover Internal Method

onControllerFailover(): Unit

onControllerFailover prints out the following INFO message to the logs:

Registering handlers

onControllerFailover requests the KafkaZkClient to registerZNodeChildChangeHandlers:

onControllerFailover requests the KafkaZkClient to registerZNodeChangeHandlerAndCheckExistence:

onControllerFailover prints out the following INFO message to the logs:

Deleting log dir event notifications

onControllerFailover requests the KafkaZkClient to deleteLogDirEventNotifications (with the epochZkVersion of the ControllerContext).

onControllerFailover prints out the following INFO message to the logs:

Deleting isr change notifications

onControllerFailover requests the KafkaZkClient to deleteIsrChangeNotifications (with the epochZkVersion of the ControllerContext).

onControllerFailover prints out the following INFO message to the logs:

Initializing controller context

onControllerFailover initializeControllerContext.

onControllerFailover prints out the following INFO message to the logs:

Fetching topic deletions in progress

onControllerFailover fetchTopicDeletionsInProgress.

onControllerFailover prints out the following INFO message to the logs:

Initializing topic deletion manager

onControllerFailover requests the TopicDeletionManager to initialize (with the topics to be deleted and ineligible for deletion).

onControllerFailover prints out the following INFO message to the logs:

Sending update metadata request

onControllerFailover requests the ReplicaStateMachine to start up.

onControllerFailover requests the PartitionStateMachine to start up.

onControllerFailover prints out the following INFO message to the logs:

Ready to serve as the new controller with epoch [epoch]

onControllerFailover requests the TopicDeletionManager to tryTopicDeletion.

onControllerFailover prints out the following INFO message to the logs:

Starting the controller scheduler

onControllerFailover requests the kafkaScheduler KafkaScheduler to startup.

With auto.leader.rebalance.enable enabled, onControllerFailover scheduleAutoLeaderRebalanceTask with the delay of 5 seconds.

With delegation.token.master.key password set, onControllerFailover prints out the following INFO message to the logs:

starting the token expiry check scheduler

onControllerFailover requests the tokenCleanScheduler KafkaScheduler to startup and requests it to schedule the delete-expired-tokens task (FIXME).

Note
onControllerFailover is used exclusively when KafkaController is requested to elect (and a broker is successfully elected as the controller).

scheduleAutoLeaderRebalanceTask Internal Method

scheduleAutoLeaderRebalanceTask(delay: Long, unit: TimeUnit): Unit

scheduleAutoLeaderRebalanceTask simply requests the KafkaScheduler to schedule a one-off task called auto-leader-rebalance-task with initial delay of 5 seconds.

The auto-leader-rebalance-task simply requests the ControllerEventManager to enqueue a AutoPreferredReplicaLeaderElection event.

Note

scheduleAutoLeaderRebalanceTask is used when:

checkAndTriggerAutoLeaderRebalance Internal Method

checkAndTriggerAutoLeaderRebalance(): Unit

checkAndTriggerAutoLeaderRebalance prints out the following TRACE message to the logs:

Checking need to trigger auto leader balancing
Note
checkAndTriggerAutoLeaderRebalance is used exclusively when ControllerEventThread is requested to process a AutoPreferredReplicaLeaderElection event (while processing controller events).

startChannelManager Internal Method

startChannelManager(): Unit

startChannelManager…​FIXME

Note
startChannelManager is used exclusively when KafkaController is requested to initializeControllerContext.

onNewPartitionCreation Internal Method

onNewPartitionCreation(newPartitions: Set[TopicPartition]): Unit

onNewPartitionCreation…​FIXME

Note
onNewPartitionCreation is used when TopicChange and PartitionModifications controller events are processed.

results matching ""

    No results matching ""