KafkaServer — Kafka Broker
KafkaServer
is a Kafka broker that manages Kafka services.
When created, KafkaServer
registers itself in the JMX system under the kafka.server node.
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating KafkaServer Instance
KafkaServer
takes the following when created:
-
A collection of KafkaMetricsReporters (defaults to no reporters)
Caution
|
FIXME |
Note
|
KafkaServer is created when KafkaServerStartable is created.
|
Starting Up — startup
Method
startup(): Unit
startup
starts the Kafka broker.
Internally, startup
first prints out the following INFO message to the logs:
starting
startup
sets BrokerState as Starting
.
startup
requests KafkaScheduler to start.
startup
connects to Zookeeper (and initializes ZkUtils).
startup
getOrGenerateClusterId (that is recorded as cluster id).
You should see the following INFO message in the logs:
Cluster ID = [clusterId]
startup
requests the KafkaConfig to set the brokerId to the broker ID.
startup
creates the LogContext
with [KafkaServer id=[brokerId]] prefix.
startup
requests the KafkaConfig for the DynamicBrokerConfig that is in turn requested to initialize (fetching broker configuration from Zookeeper).
startup
creates a new KafkaScheduler (with the number of threads as specified by background.threads configuration property) and immediately requests it to start up.
startup
creates and configures metrics.
Caution
|
FIXME |
startup
registers broker topic metrics (by initializing BrokerTopicStats).
startup
initializes QuotaManagers.
startup
notifies cluster resource listeners (i.e. KafkaMetricsReporters and the configured instances of metric reporters).
startup
creates the LogDirFailureChannel
startup
creates the LogManager and requests it to start up.
startup
creates the MetadataCache (for the broker ID).
startup
creates the CredentialProvider (per sasl.enabled.mechanisms property).
startup
creates the SocketServer (for KafkaConfig, Metrics and CredentialProvider) and requests it to start up.
startup
creates the ReplicaManager and requests it to start up.
startup
creates the KafkaController (for KafkaConfig, ZkUtils, Metrics and the optional threadNamePrefix) and requests it to start up.
startup
creates the AdminManager (for KafkaConfig, Metrics, MetadataCache and ZkUtils).
startup
creates the GroupCoordinator (for KafkaConfig, ZkUtils and ReplicaManager) and requests it to start up.
startup
creates the TransactionCoordinator (for KafkaConfig, ReplicaManager, a new dedicated KafkaScheduler with transaction-log-manager-
thread name prefix, ZkUtils, Metrics and MetadataCache) and requests it to start up.
startup
creates a Authorizer (if defined using authorizer.class.name property) and requests it to configure.
startup
creates the KafkaApis (for SocketServer, ReplicaManager, AdminManager, GroupCoordinator, TransactionCoordinator, KafkaController, ZkUtils, broker ID, KafkaConfig, MetadataCache, Metrics, Authorizer, QuotaManagers, BrokerTopicStats, cluster ID).
Note
|
At this point KafkaServer may start processing requests.
|
startup
creates the KafkaRequestHandlerPool (for broker ID, SocketServer, KafkaApis and num.io.threads).
startup
starts the HTTP interface of mx4j (if configured).
startup
creates the DynamicConfigManager (for ZkUtils and dynamicConfigHandlers) and requests it to start up.
startup
configures the advertised listeners (if defined).
startup
creates the KafkaHealthcheck (for broker ID, the advertised listeners, ZkUtils, broker.rack and inter.broker.protocol.version Kafka properties) and requests it to start up.
startup
checkpoints the broker ID.
startup
sets BrokerState as RunningAsBroker
, creates the CountDownLatch, enables the startupComplete flag, disables isStartingUp flag
startup
registers AppInfo
as an MBean with the MBean server as kafka.server:type=app-info,id=[brokerId]
.
In the end, you should see the following INFO message in the logs:
INFO [Kafka Server [brokerId]], started (kafka.server.KafkaServer)
Note
|
The INFO message above uses so-called log ident with the value of broker.id property and is always in the format [Kafka Server [brokerId]], after a Kafka server has fully started.
|
Note
|
startup is used exclusively when KafkaServerStartable is requested to starts up.
|
Authorizer
authorizer: Option[Authorizer] = None
authorizer
is an Authorizer based on authorizer.class.name configuration property (default: (empty)
).
Authorizer
is used to create the data-plane and control-plane KafkaApis
(for authorizing operations).
Sending Updated Cluster Metadata to ClusterResourceListeners — notifyClusterListeners
Internal Method
notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit
notifyClusterListeners
creates a ClusterResourceListeners (with the objects from the input clusterListeners
of type ClusterResourceListener
) and sends the updated cluster metadata to them.
Note
|
|
Creating ReplicaManager — createReplicaManager
Internal Method
createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager
createReplicaManager
simply creates the ReplicaManager (passing in the references to the services, e.g. Metrics, KafkaScheduler, LogManager, QuotaManagers, MetadataCache, LogDirFailureChannel).
Note
|
createReplicaManager is used exclusively when KafkaServer is requested to start up.
|
getOrGenerateClusterId
Internal Method
getOrGenerateClusterId(zkClient: KafkaZkClient): String
getOrGenerateClusterId
simply requests the given KafkaZkClient for the cluster ID or createOrGetClusterId with a random UUID (as Base64).
Note
|
getOrGenerateClusterId is used exclusively when KafkaServer is requested to start up (and initializes the internal cluster ID).
|
initZkClient
Internal Method
initZkClient(time: Time): Unit
initZkClient
prints out the following INFO message to the logs:
Connecting to zookeeper on [zkConnect]
(only if the chroot path is used) initZkClient
…FIXME
initZkClient
…FIXME (secureAclsEnabled)
initZkClient
creates a KafkaZkClient (with the following configuration properties: KafkaConfig.zkConnect, KafkaConfig.secureAclsEnabled, KafkaConfig.zkSessionTimeoutMs, KafkaConfig.zkConnectionTimeoutMs, KafkaConfig.zkMaxInFlightRequests).
In the end, initZkClient
requests the KafkaZkClient to createTopLevelPaths.
Note
|
initZkClient is used exclusively when KafkaServer is requested to start up.
|
controlledShutdown
Internal Method
controlledShutdown(): Unit
controlledShutdown
…FIXME
Note
|
controlledShutdown is used when KafkaServer is requested to shut down.
|
Checkpointing Broker — checkpointBrokerId
Internal Method
checkpointBrokerId(brokerId: Int): Unit
For every directory in KafkaConfig.logDirs that is isLogDirOnline (according to the LogManager), checkpointBrokerId
finds the corresponding BrokerMetadataCheckpoint
(with the path to the meta.properties file) in the brokerMetadataCheckpoints registry and requests it to read
it.
Unless the meta.properties
file was already available, checkpointBrokerId
requests the BrokerMetadataCheckpoints
(of the log directories with no meta files) to write
the broker metadata.
Note
|
checkpointBrokerId is used exclusively when KafkaServer is requested to start up.
|
Getting Broker ID and Initial Offline Directories — getBrokerIdAndOfflineDirs
Internal Method
getBrokerIdAndOfflineDirs: (Int, Seq[String])
getBrokerIdAndOfflineDirs
…FIXME
Note
|
getBrokerIdAndOfflineDirs is used exclusively when KafkaServer is requested to start up.
|
generateBrokerId
Internal Method
generateBrokerId: Int
generateBrokerId
…FIXME
Note
|
generateBrokerId is used exclusively when KafkaServer is requested to getBrokerIdAndOfflineDirs.
|
createBrokerInfo
Internal Method
createBrokerInfo: BrokerInfo
createBrokerInfo
…FIXME
Note
|
|
Cluster ID — _clusterId
Internal Property
_clusterId: String
KafkaServer
uses Cluster ID that is a random UUID (encoded to Base64).
When requested to start up, KafkaServer
initializes the internal _clusterId
which is immediately printed out as an INFO message to the logs:
Cluster ID = [clusterId]
Cluster ID is persisted in Zookeeper in /cluster/id znode (in JSON format).
Cluster ID is registered as kafka.server:type=KafkaServer,name=ClusterId MBean in the JMX system.
Cluster ID is used for the following:
-
Creating KafkaApis (for dataPlaneRequestProcessor and
controlPlaneRequestChannelOpt
) at startup -
Sending an updated cluster metadata to ClusterResourceListeners
Default Configuration Properties of Logs (for LogManager and AdminManager) — copyKafkaConfigToLog
Internal Utility
copyKafkaConfigToLog(
kafkaConfig: KafkaConfig): Map[String, Object]
copyKafkaConfigToLog
sets the topic-level configuration properties based on the given KafkaConfig.
Note
|
copyKafkaConfigToLog uses the same configuration properties as TopicConfigSynonyms. The keys of the configuration properties of LogConfig are simply aliases of TopicConfig.
|
LogConfig | KafkaConfig |
---|---|
Note
|
|
Internal Properties
Name | Description |
---|---|
|
|
|
|
|
|
|
|
|
Created when Used (as |
|
Control-plane KafkaRequestHandlerPool for the control-plane KafkaApis |
|
Control-plane KafkaApis request processor (handler) for the optional control-plane KafkaRequestHandlerPool of the SocketServer |
|
|
|
Data-plane KafkaApis request handler for the data-plane RequestChannel of the SocketServer |
|
Data-plane KafkaRequestHandlerPool for the data-plane KafkaApis |
|
ConfigHandlers by name:
Initialized when |
|
|
|
GroupCoordinator (for the only purpose of creating the KafkaApis) Created and immediately started up when |
|
|
|
|
|
|
|
KafkaScheduler with the number of daemon threads as configured using background.threads configuration property (default: |
|
|
|
|
|
Used when:
|
|
MetadataCache that is created for the sake of creating the following services (at startup): |
|
ReplicaManager used to create:
|
|
Collection of MetricsReporter Used when…FIXME |
|
|
|
|
|
|
|
|
|
|
|
|
|