KafkaServer

KafkaServer is a Kafka broker that wires (creates and starts) Kafka services together.

KafkaServer startup.png
Figure 1. KafkaServer’s Startup and Auxiliary Services

KafkaServer registers itself in the JMX system under kafka.server.

Table 1. KafkaServer’s Internal Properties (e.g. Registries and Counters)
Name Description

adminManager

AdminManager

apis

KafkaApis

authorizer

Optional Authorizer

brokerMetadataCheckpoints

brokerState

BrokerState

_brokerTopicStats

BrokerTopicStats

_clusterId

Cluster ID

credentialProvider

CredentialProvider

dynamicConfigHandlers

dynamicConfigManager

DynamicConfigManager

groupCoordinator

Created and started up when KafkaServer is requested to start up

Shut down when KafkaServer is requested to shut down

Used exclusively when KafkaServer is requested to start up (and creates a KafkaApis)

isStartingUp

Flag for…​FIXME

kafkaController

KafkaController

kafkaHealthcheck

KafkaHealthcheck

kafkaScheduler

KafkaScheduler with the number of daemon threads as configured using background.threads configuration property (default: 10)

logContext

LogContext

logDirFailureChannel

LogDirFailureChannel

logManager

Used when:


Created and immediately started when KafkaServer is requested to start up.

Shut down when KafkaServer is requested to shut down.

metadataCache

MetadataCache

replicaManager

ReplicaManager used to create:


reporters

Collection of MetricsReporter

Used when…​FIXME

requestHandlerPool

KafkaRequestHandlerPool

socketServer

SocketServer

transactionCoordinator

TransactionCoordinator

quotaManagers

QuotaManagers

shutdownLatch

Java’s java.util.concurrentCountDownLatch

startupComplete

Flag for…​FIXME

zkUtils

ZkUtils

getOrGenerateClusterId Internal Method

Caution
FIXME

Connecting to Zookeeper — initZk Internal Method

Caution
FIXME

Creating KafkaServer Instance

KafkaServer takes the following when created:

Caution
FIXME
Note
KafkaServer is created when KafkaServerStartable is created.

Starting Up — startup Method

startup(): Unit

startup starts a single Kafka server.

Internally, startup first prints out the following INFO message to the logs:

INFO starting (kafka.server.KafkaServer)

startup sets BrokerState as Starting.

startup requests KafkaScheduler to start.

startup connects to Zookeeper (and initializes ZkUtils).

startup getOrGenerateClusterId (that is recorded as cluster id).

You should see the following INFO message in the logs:

INFO Cluster ID = [clusterId] (kafka.server.KafkaServer)

startup creates the LogContext with [KafkaServer id=[brokerId]] prefix.

startup creates and configures metrics.

  1. Requests KafkaConfig for configured instances of metric reporters

  2. Adds a JmxReporter (with kafka.server prefix)

  3. Creates the MetricConfig

  4. Initializes Metrics internal registry

startup registers broker topic metrics (by initializing BrokerTopicStats).

startup initializes QuotaManagers.

startup notifies cluster resource listeners (i.e. KafkaMetricsReporters and the configured instances of metric reporters).

startup creates the LogDirFailureChannel

startup creates the LogManager and requests it to start up.

startup creates the MetadataCache (for the broker ID).

startup creates the CredentialProvider (per sasl.enabled.mechanisms property).

startup creates the SocketServer (for KafkaConfig, Metrics and CredentialProvider) and requests it to start up.

startup creates the ReplicaManager and requests it to start up.

startup creates the KafkaController (for KafkaConfig, ZkUtils, Metrics and the optional threadNamePrefix) and requests it to start up.

startup creates the GroupCoordinator (for KafkaConfig, ZkUtils and ReplicaManager) and requests it to start up.

startup creates the TransactionCoordinator (for KafkaConfig, ReplicaManager, a new dedicated KafkaScheduler with transaction-log-manager- thread name prefix, ZkUtils, Metrics and MetadataCache) and requests it to start up.

startup creates a Authorizer (if defined using authorizer.class.name property) and configures it.

Note
At this point KafkaServer may start processing requests.

startup starts the HTTP interface of mx4j (if configured).

startup creates the DynamicConfigManager (for ZkUtils and dynamicConfigHandlers) and requests it to start up.

startup configures the advertised listeners (if defined).

startup creates the KafkaHealthcheck (for broker ID, the advertised listeners, ZkUtils, broker.rack and inter.broker.protocol.version Kafka properties) and requests it to start up.

startup checkpoints the broker ID.

startup sets BrokerState as RunningAsBroker, creates the CountDownLatch, enables the startupComplete flag, disables isStartingUp flag

startup registers AppInfo as an MBean with the MBean server as kafka.server:type=app-info,id=[brokerId].

In the end, you should see the following INFO message in the logs:

INFO [Kafka Server [brokerId]], started (kafka.server.KafkaServer)
Note
The INFO message above uses so-called log ident with the value of broker.id property and is always in the format [Kafka Server [brokerId]], after a Kafka server has fully started.
Note
startup is used exclusively when KafkaServerStartable starts up.

Sending Updated Cluster Metadata to ClusterResourceListeners — notifyClusterListeners Internal Method

notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit

notifyClusterListeners creates a ClusterResourceListeners (with the objects from the input clusterListeners of type ClusterResourceListener) and sends the updated cluster metadata to them.

Note
notifyClusterListeners is used exclusively when KafkaServer starts up (with clusterListeners as kafkaMetricsReporters and the MetricsReporter reporters from metric.reporters Kafka property).

Creating ReplicaManager — createReplicaManager Internal Method

createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager

createReplicaManager simply creates the ReplicaManager (passing in the references to the services, e.g. Metrics, KafkaScheduler, LogManager, QuotaManagers, MetadataCache, LogDirFailureChannel).

Note
createReplicaManager is used exclusively when KafkaServer is requested to start up.

Shutting Down — shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note

shutdown is used when:

  • KafkaServer is requested to startup (and there was an exception)

  • KafkaServerStartable is requested to shutdown

initZkClient Internal Method

initZkClient(time: Time): Unit

initZkClient prints out the following INFO message to the logs:

Connecting to zookeeper on [zkConnect]

(only if the chroot path is used) initZkClient…​FIXME

initZkClient…​FIXME (secureAclsEnabled)

In the end, initZkClient requests the KafkaZkClient to createTopLevelPaths.

Note
initZkClient is used exclusively when KafkaServer is requested to start up.

controlledShutdown Internal Method

controlledShutdown(): Unit

controlledShutdown…​FIXME

Note
controlledShutdown is used when…​FIXME

Checkpointing Broker — checkpointBrokerId Internal Method

checkpointBrokerId(brokerId: Int): Unit

For every directory in KafkaConfig.logDirs that is isLogDirOnline (according to the LogManager), checkpointBrokerId finds the corresponding BrokerMetadataCheckpoint (with the path to the meta.properties file) in the brokerMetadataCheckpoints registry and requests it to read it.

Unless the meta.properties file was already available, checkpointBrokerId requests the BrokerMetadataCheckpoints (of the log directories with no meta files) to write the broker metadata.

Note
checkpointBrokerId is used exclusively when KafkaServer is requested to start up.

Getting Broker ID and Initial Offline Directories — getBrokerIdAndOfflineDirs Internal Method

getBrokerIdAndOfflineDirs: (Int, Seq[String])

getBrokerIdAndOfflineDirs…​FIXME

Note
getBrokerIdAndOfflineDirs is used exclusively when KafkaServer is requested to start up.

generateBrokerId Internal Method

generateBrokerId: Int

generateBrokerId…​FIXME

Note
generateBrokerId is used exclusively when KafkaServer is requested to getBrokerIdAndOfflineDirs.

createBrokerInfo Internal Method

createBrokerInfo: BrokerInfo

createBrokerInfo…​FIXME

Note

createBrokerInfo is used when:

results matching ""

    No results matching ""