KafkaServer — Kafka Broker
KafkaServer is a Kafka broker that manages Kafka services.
When created, KafkaServer registers itself in the JMX system under the kafka.server node.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating KafkaServer Instance
KafkaServer takes the following when created:
-
A collection of KafkaMetricsReporters (defaults to no reporters)
|
Caution
|
FIXME |
|
Note
|
KafkaServer is created when KafkaServerStartable is created.
|
Starting Up — startup Method
startup(): Unit
startup starts the Kafka broker.
Internally, startup first prints out the following INFO message to the logs:
starting
startup sets BrokerState as Starting.
startup requests KafkaScheduler to start.
startup connects to Zookeeper (and initializes ZkUtils).
startup getOrGenerateClusterId (that is recorded as cluster id).
You should see the following INFO message in the logs:
Cluster ID = [clusterId]
startup requests the KafkaConfig to set the brokerId to the broker ID.
startup creates the LogContext with [KafkaServer id=[brokerId]] prefix.
startup requests the KafkaConfig for the DynamicBrokerConfig that is in turn requested to initialize (fetching broker configuration from Zookeeper).
startup creates a new KafkaScheduler (with the number of threads as specified by background.threads configuration property) and immediately requests it to start up.
startup creates and configures metrics.
|
Caution
|
FIXME |
startup registers broker topic metrics (by initializing BrokerTopicStats).
startup initializes QuotaManagers.
startup notifies cluster resource listeners (i.e. KafkaMetricsReporters and the configured instances of metric reporters).
startup creates the LogDirFailureChannel
startup creates the LogManager and requests it to start up.
startup creates the MetadataCache (for the broker ID).
startup creates the CredentialProvider (per sasl.enabled.mechanisms property).
startup creates the SocketServer (for KafkaConfig, Metrics and CredentialProvider) and requests it to start up.
startup creates the ReplicaManager and requests it to start up.
startup creates the KafkaController (for KafkaConfig, ZkUtils, Metrics and the optional threadNamePrefix) and requests it to start up.
startup creates the AdminManager (for KafkaConfig, Metrics, MetadataCache and ZkUtils).
startup creates the GroupCoordinator (for KafkaConfig, ZkUtils and ReplicaManager) and requests it to start up.
startup creates the TransactionCoordinator (for KafkaConfig, ReplicaManager, a new dedicated KafkaScheduler with transaction-log-manager- thread name prefix, ZkUtils, Metrics and MetadataCache) and requests it to start up.
startup creates a Authorizer (if defined using authorizer.class.name property) and requests it to configure.
startup creates the KafkaApis (for SocketServer, ReplicaManager, AdminManager, GroupCoordinator, TransactionCoordinator, KafkaController, ZkUtils, broker ID, KafkaConfig, MetadataCache, Metrics, Authorizer, QuotaManagers, BrokerTopicStats, cluster ID).
|
Note
|
At this point KafkaServer may start processing requests.
|
startup creates the KafkaRequestHandlerPool (for broker ID, SocketServer, KafkaApis and num.io.threads).
startup starts the HTTP interface of mx4j (if configured).
startup creates the DynamicConfigManager (for ZkUtils and dynamicConfigHandlers) and requests it to start up.
startup configures the advertised listeners (if defined).
startup creates the KafkaHealthcheck (for broker ID, the advertised listeners, ZkUtils, broker.rack and inter.broker.protocol.version Kafka properties) and requests it to start up.
startup checkpoints the broker ID.
startup sets BrokerState as RunningAsBroker, creates the CountDownLatch, enables the startupComplete flag, disables isStartingUp flag
startup registers AppInfo as an MBean with the MBean server as kafka.server:type=app-info,id=[brokerId].
In the end, you should see the following INFO message in the logs:
INFO [Kafka Server [brokerId]], started (kafka.server.KafkaServer)
|
Note
|
The INFO message above uses so-called log ident with the value of broker.id property and is always in the format [Kafka Server [brokerId]], after a Kafka server has fully started.
|
|
Note
|
startup is used exclusively when KafkaServerStartable is requested to starts up.
|
Authorizer
authorizer: Option[Authorizer] = None
authorizer is an Authorizer based on authorizer.class.name configuration property (default: (empty)).
Authorizer is used to create the data-plane and control-plane KafkaApis (for authorizing operations).
Sending Updated Cluster Metadata to ClusterResourceListeners — notifyClusterListeners Internal Method
notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit
notifyClusterListeners creates a ClusterResourceListeners (with the objects from the input clusterListeners of type ClusterResourceListener) and sends the updated cluster metadata to them.
|
Note
|
|
Creating ReplicaManager — createReplicaManager Internal Method
createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager
createReplicaManager simply creates the ReplicaManager (passing in the references to the services, e.g. Metrics, KafkaScheduler, LogManager, QuotaManagers, MetadataCache, LogDirFailureChannel).
|
Note
|
createReplicaManager is used exclusively when KafkaServer is requested to start up.
|
getOrGenerateClusterId Internal Method
getOrGenerateClusterId(zkClient: KafkaZkClient): String
getOrGenerateClusterId simply requests the given KafkaZkClient for the cluster ID or createOrGetClusterId with a random UUID (as Base64).
|
Note
|
getOrGenerateClusterId is used exclusively when KafkaServer is requested to start up (and initializes the internal cluster ID).
|
initZkClient Internal Method
initZkClient(time: Time): Unit
initZkClient prints out the following INFO message to the logs:
Connecting to zookeeper on [zkConnect]
(only if the chroot path is used) initZkClient…FIXME
initZkClient…FIXME (secureAclsEnabled)
initZkClient creates a KafkaZkClient (with the following configuration properties: KafkaConfig.zkConnect, KafkaConfig.secureAclsEnabled, KafkaConfig.zkSessionTimeoutMs, KafkaConfig.zkConnectionTimeoutMs, KafkaConfig.zkMaxInFlightRequests).
In the end, initZkClient requests the KafkaZkClient to createTopLevelPaths.
|
Note
|
initZkClient is used exclusively when KafkaServer is requested to start up.
|
controlledShutdown Internal Method
controlledShutdown(): Unit
controlledShutdown…FIXME
|
Note
|
controlledShutdown is used when KafkaServer is requested to shut down.
|
Checkpointing Broker — checkpointBrokerId Internal Method
checkpointBrokerId(brokerId: Int): Unit
For every directory in KafkaConfig.logDirs that is isLogDirOnline (according to the LogManager), checkpointBrokerId finds the corresponding BrokerMetadataCheckpoint (with the path to the meta.properties file) in the brokerMetadataCheckpoints registry and requests it to read it.
Unless the meta.properties file was already available, checkpointBrokerId requests the BrokerMetadataCheckpoints (of the log directories with no meta files) to write the broker metadata.
|
Note
|
checkpointBrokerId is used exclusively when KafkaServer is requested to start up.
|
Getting Broker ID and Initial Offline Directories — getBrokerIdAndOfflineDirs Internal Method
getBrokerIdAndOfflineDirs: (Int, Seq[String])
getBrokerIdAndOfflineDirs…FIXME
|
Note
|
getBrokerIdAndOfflineDirs is used exclusively when KafkaServer is requested to start up.
|
generateBrokerId Internal Method
generateBrokerId: Int
generateBrokerId…FIXME
|
Note
|
generateBrokerId is used exclusively when KafkaServer is requested to getBrokerIdAndOfflineDirs.
|
createBrokerInfo Internal Method
createBrokerInfo: BrokerInfo
createBrokerInfo…FIXME
|
Note
|
|
Cluster ID — _clusterId Internal Property
_clusterId: String
KafkaServer uses Cluster ID that is a random UUID (encoded to Base64).
When requested to start up, KafkaServer initializes the internal _clusterId which is immediately printed out as an INFO message to the logs:
Cluster ID = [clusterId]
Cluster ID is persisted in Zookeeper in /cluster/id znode (in JSON format).
Cluster ID is registered as kafka.server:type=KafkaServer,name=ClusterId MBean in the JMX system.
Cluster ID is used for the following:
-
Creating KafkaApis (for dataPlaneRequestProcessor and
controlPlaneRequestChannelOpt) at startup -
Sending an updated cluster metadata to ClusterResourceListeners
Default Configuration Properties of Logs (for LogManager and AdminManager) — copyKafkaConfigToLog Internal Utility
copyKafkaConfigToLog(
kafkaConfig: KafkaConfig): Map[String, Object]
copyKafkaConfigToLog sets the topic-level configuration properties based on the given KafkaConfig.
|
Note
|
copyKafkaConfigToLog uses the same configuration properties as TopicConfigSynonyms. The keys of the configuration properties of LogConfig are simply aliases of TopicConfig.
|
| LogConfig | KafkaConfig |
|---|---|
|
Note
|
|
Internal Properties
| Name | Description |
|---|---|
|
|
|
|
|
|
|
|
|
Created when Used (as |
|
Control-plane KafkaRequestHandlerPool for the control-plane KafkaApis |
|
Control-plane KafkaApis request processor (handler) for the optional control-plane KafkaRequestHandlerPool of the SocketServer |
|
|
|
Data-plane KafkaApis request handler for the data-plane RequestChannel of the SocketServer |
|
Data-plane KafkaRequestHandlerPool for the data-plane KafkaApis |
|
ConfigHandlers by name:
Initialized when |
|
|
|
GroupCoordinator (for the only purpose of creating the KafkaApis) Created and immediately started up when |
|
|
|
|
|
|
|
KafkaScheduler with the number of daemon threads as configured using background.threads configuration property (default: |
|
|
|
|
|
Used when:
|
|
MetadataCache that is created for the sake of creating the following services (at startup): |
|
ReplicaManager used to create:
|
|
Collection of MetricsReporter Used when…FIXME |
|
|
|
|
|
|
|
|
|
|
|
|
|