ControllerChannelManager

ControllerChannelManager manages connections to all brokers for KafkaController.

ControllerChannelManager is created when KafkaController is created (to add or remove brokers as they "announce" themselves in Zookeeper) for ControllerBrokerRequestBatches of the KafkaController itself, the ZkReplicaStateMachine and ZkPartitionStateMachine (so they can send broker and partition changes out to all brokers in a Kafka cluster).

When started, ControllerChannelManager establishes connection to every broker and starts a corresponding RequestSendThread to keep sending queued controller requests.

ControllerChannelManager uses [Channel manager on controller [brokerId]] as the logging prefix (aka logIdent).

Tip

Enable ALL logging levels for kafka.controller.ControllerChannelManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.controller.ControllerChannelManager=ALL

Refer to Logging.

Connection Metadata of All Brokers — brokerStateInfo Internal Registry

brokerStateInfo: HashMap[Int, ControllerBrokerStateInfo]

brokerStateInfo is connection metadata by broker ID.

Request threads for brokers are all started when ControllerChannelManager is requested to start up.

A new broker is added when ControllerChannelManager is requested to add a broker (when addBroker).

A broker is removed when ControllerChannelManager is requested to remove a broker (when removeBroker or shutting down).

Use the TotalQueueSize gauge metric for the queue depth (i.e. how many controller requests are waiting to be sent out to all brokers).

Performance Metrics

ControllerChannelManager is a KafkaMetricsGroup with the following performance metrics.

Table 1. ControllerChannelManager’s Performance Metrics
Metric Name Description

QueueSize

Controller requests (AbstractControlRequests) queue size (per broker)

RequestRateAndQueueTimeMs

For every broker

TotalQueueSize

Total number of controller requests (AbstractControlRequests) to be sent out to brokers

The performance metrics are registered in kafka.controller:type=ControllerChannelManager group.

ControllerChannelManager jconsole.png
Figure 1. ControllerChannelManager in jconsole

Creating ControllerChannelManager Instance

ControllerChannelManager takes the following to be created:

Registering New Broker — addNewBroker Internal Method

addNewBroker(
  broker: Broker): Unit

addNewBroker prints out the following DEBUG message to the logs:

Controller [brokerId] trying to connect to broker [id]

addNewBroker finds the name of the listener to use for communication with the broker based on control.plane.listener.name configuration property (if defined) or inter.broker.listener.name.

addNewBroker finds the security protocol to use for communication with the broker based on control.plane.listener.name and listener.security.protocol.map configuration properties (if defined) or security.inter.broker.protocol.

addNewBroker requests the Broker for the node for the listener name.

addNewBroker creates a new LogContext to use the prefix:

[Controller id=[brokerId], targetBrokerId=[brokerNode]]

addNewBroker creates a NetworkClient. Firstly, addNewBroker creates a ChannelBuilder (for the security protocol, the listener name, SERVER JAAS context type and SASL-related properties) and, if it is a Reconfigurable, adds it to the KafkaConfig as reconfigurable. addNewBroker then creates a Selector with controller-channel metric group (and broker-id of the broker node).

addNewBroker builds a thread name per the optional threadNamePrefix:

[threadNamePrefix]:Controller-[brokerId]-to-broker-[id]-send-thread

addNewBroker creates a new RequestRateAndQueueTimeMs timer metric with the id of the broker to connect to.

addNewBroker creates a daemon RequestSendThread for the broker ID (of the controller broker), the ControllerContext, the NetworkClient, the RequestRateAndQueueTimeMs metric, the StateChangeLogger, and the thread name.

addNewBroker creates a QueueSize gauge metric (with the id of the broker to connect) that is the number of the AbstractControlRequest messages in the queue.

In the end, addNewBroker registers (adds) the id of the broker to connect with a new ControllerBrokerStateInfo to the brokerStateInfo internal registry.

Note
addNewBroker is used when ControllerChannelManager is requested to start up (and connect to brokers) and add a broker.

Registering Newly-Added Broker — addBroker Method

addBroker(broker: Broker): Unit

addBroker…​FIXME

Note
addBroker is used when KafkaController is requested to process a BrokerChange controller event.

Deregistering Broker — removeBroker Method

removeBroker(brokerId: Int): Unit

removeBroker finds the broker metadata in the brokerStateInfo internal registry that is then used to removeExistingBroker.

Note
removeBroker is used exclusively when KafkaController is requested to process a BrokerChange controller event.

Starting Up — startup Method

startup(): Unit

startup…​FIXME

Note
startup is used when KafkaController is requested to initializeControllerContext.

Shutting Down — shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

Sending AbstractControlRequest Out to Broker — sendRequest Method

sendRequest(
  brokerId: Int,
  request: AbstractControlRequest.Builder[_ <: AbstractControlRequest],
  callback: AbstractResponse => Unit = null)

sendRequest…​FIXME

Note
sendRequest is used exclusively when ControllerBrokerRequestBatch is requested to send a controller request to a broker.

removeExistingBroker Internal Method

removeExistingBroker(
  brokerState: ControllerBrokerStateInfo): Unit

removeExistingBroker…​FIXME

Note
removeExistingBroker is used when…​FIXME

Starting RequestSendThread — startRequestSendThread Internal Method

startRequestSendThread(
  brokerId: Int): Unit

startRequestSendThread finds the RequestSendThread in the broker metadata in the brokerStateInfo internal registry and, if the thread has not started yet, startRequestSendThread starts it.

Note
startRequestSendThread is used when ControllerChannelManager is requested to start up and addBroker.

ControllerBrokerStateInfo

ControllerBrokerStateInfo is a broker metadata that holds the following:

results matching ""

    No results matching ""