KafkaServer — Kafka Broker

KafkaServer is a Kafka broker that manages Kafka services.

KafkaServer startup.png
Figure 1. KafkaServer’s Startup and Auxiliary Services

When created, KafkaServer registers itself in the JMX system under the kafka.server node.

Tip

Enable ALL logging level for kafka.server.KafkaServer logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.server.KafkaServer=ALL

Refer to Logging.

Creating KafkaServer Instance

KafkaServer takes the following when created:

Caution
FIXME
Note
KafkaServer is created when KafkaServerStartable is created.

Starting Up — startup Method

startup(): Unit

startup starts the Kafka broker.

Internally, startup first prints out the following INFO message to the logs:

starting

startup sets BrokerState as Starting.

startup requests KafkaScheduler to start.

startup connects to Zookeeper (and initializes ZkUtils).

startup getOrGenerateClusterId (that is recorded as cluster id).

You should see the following INFO message in the logs:

Cluster ID = [clusterId]

startup requests the KafkaConfig to set the brokerId to the broker ID.

startup creates the LogContext with [KafkaServer id=[brokerId]] prefix.

startup requests the KafkaConfig for the DynamicBrokerConfig that is in turn requested to initialize (fetching broker configuration from Zookeeper).

startup creates a new KafkaScheduler (with the number of threads as specified by background.threads configuration property) and immediately requests it to start up.

startup creates and configures metrics.

Caution
FIXME

startup registers broker topic metrics (by initializing BrokerTopicStats).

startup initializes QuotaManagers.

startup notifies cluster resource listeners (i.e. KafkaMetricsReporters and the configured instances of metric reporters).

startup creates the LogDirFailureChannel

startup creates the LogManager and requests it to start up.

startup creates the MetadataCache (for the broker ID).

startup creates the CredentialProvider (per sasl.enabled.mechanisms property).

startup creates the SocketServer (for KafkaConfig, Metrics and CredentialProvider) and requests it to start up.

startup creates the ReplicaManager and requests it to start up.

startup creates the KafkaController (for KafkaConfig, ZkUtils, Metrics and the optional threadNamePrefix) and requests it to start up.

startup creates the GroupCoordinator (for KafkaConfig, ZkUtils and ReplicaManager) and requests it to start up.

startup creates the TransactionCoordinator (for KafkaConfig, ReplicaManager, a new dedicated KafkaScheduler with transaction-log-manager- thread name prefix, ZkUtils, Metrics and MetadataCache) and requests it to start up.

startup creates a Authorizer (if defined using authorizer.class.name property) and requests it to configure.

Note
At this point KafkaServer may start processing requests.

startup starts the HTTP interface of mx4j (if configured).

startup creates the DynamicConfigManager (for ZkUtils and dynamicConfigHandlers) and requests it to start up.

startup configures the advertised listeners (if defined).

startup creates the KafkaHealthcheck (for broker ID, the advertised listeners, ZkUtils, broker.rack and inter.broker.protocol.version Kafka properties) and requests it to start up.

startup checkpoints the broker ID.

startup sets BrokerState as RunningAsBroker, creates the CountDownLatch, enables the startupComplete flag, disables isStartingUp flag

startup registers AppInfo as an MBean with the MBean server as kafka.server:type=app-info,id=[brokerId].

In the end, you should see the following INFO message in the logs:

INFO [Kafka Server [brokerId]], started (kafka.server.KafkaServer)
Note
The INFO message above uses so-called log ident with the value of broker.id property and is always in the format [Kafka Server [brokerId]], after a Kafka server has fully started.
Note
startup is used exclusively when KafkaServerStartable is requested to starts up.

Authorizer

authorizer: Option[Authorizer] = None

authorizer is an Authorizer based on authorizer.class.name configuration property (default: (empty)).

Authorizer is used to create the data-plane and control-plane KafkaApis (for authorizing operations).

Sending Updated Cluster Metadata to ClusterResourceListeners — notifyClusterListeners Internal Method

notifyClusterListeners(clusterListeners: Seq[AnyRef]): Unit

notifyClusterListeners creates a ClusterResourceListeners (with the objects from the input clusterListeners of type ClusterResourceListener) and sends the updated cluster metadata to them.

Note

notifyClusterListeners is used when:

Creating ReplicaManager — createReplicaManager Internal Method

createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager

createReplicaManager simply creates the ReplicaManager (passing in the references to the services, e.g. Metrics, KafkaScheduler, LogManager, QuotaManagers, MetadataCache, LogDirFailureChannel).

Note
createReplicaManager is used exclusively when KafkaServer is requested to start up.

getOrGenerateClusterId Internal Method

getOrGenerateClusterId(zkClient: KafkaZkClient): String

getOrGenerateClusterId simply requests the given KafkaZkClient for the cluster ID or createOrGetClusterId with a random UUID (as Base64).

Note
getOrGenerateClusterId is used exclusively when KafkaServer is requested to start up (and initializes the internal cluster ID).

Shutting Down — shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note

shutdown is used when:

  • KafkaServer is requested to start up (and there was an exception)

  • KafkaServerStartable is requested to shut down

initZkClient Internal Method

initZkClient(time: Time): Unit

initZkClient prints out the following INFO message to the logs:

Connecting to zookeeper on [zkConnect]

(only if the chroot path is used) initZkClient…​FIXME

initZkClient…​FIXME (secureAclsEnabled)

In the end, initZkClient requests the KafkaZkClient to createTopLevelPaths.

Note
initZkClient is used exclusively when KafkaServer is requested to start up.

controlledShutdown Internal Method

controlledShutdown(): Unit

controlledShutdown…​FIXME

Note
controlledShutdown is used when KafkaServer is requested to shut down.

Checkpointing Broker — checkpointBrokerId Internal Method

checkpointBrokerId(brokerId: Int): Unit

For every directory in KafkaConfig.logDirs that is isLogDirOnline (according to the LogManager), checkpointBrokerId finds the corresponding BrokerMetadataCheckpoint (with the path to the meta.properties file) in the brokerMetadataCheckpoints registry and requests it to read it.

Unless the meta.properties file was already available, checkpointBrokerId requests the BrokerMetadataCheckpoints (of the log directories with no meta files) to write the broker metadata.

Note
checkpointBrokerId is used exclusively when KafkaServer is requested to start up.

Getting Broker ID and Initial Offline Directories — getBrokerIdAndOfflineDirs Internal Method

getBrokerIdAndOfflineDirs: (Int, Seq[String])

getBrokerIdAndOfflineDirs…​FIXME

Note
getBrokerIdAndOfflineDirs is used exclusively when KafkaServer is requested to start up.

generateBrokerId Internal Method

generateBrokerId: Int

generateBrokerId…​FIXME

Note
generateBrokerId is used exclusively when KafkaServer is requested to getBrokerIdAndOfflineDirs.

createBrokerInfo Internal Method

createBrokerInfo: BrokerInfo

createBrokerInfo…​FIXME

Note

createBrokerInfo is used when:

Cluster ID — _clusterId Internal Property

_clusterId: String

KafkaServer uses Cluster ID that is a random UUID (encoded to Base64).

When requested to start up, KafkaServer initializes the internal _clusterId which is immediately printed out as an INFO message to the logs:

Cluster ID = [clusterId]

Cluster ID is persisted in Zookeeper in /cluster/id znode (in JSON format).

Cluster ID is registered as kafka.server:type=KafkaServer,name=ClusterId MBean in the JMX system.

Cluster ID is used for the following:

Default Configuration Properties of Logs (for LogManager and AdminManager) — copyKafkaConfigToLog Internal Utility

copyKafkaConfigToLog(
  kafkaConfig: KafkaConfig): Map[String, Object]

copyKafkaConfigToLog sets the topic-level configuration properties based on the given KafkaConfig.

Note
copyKafkaConfigToLog uses the same configuration properties as TopicConfigSynonyms. The keys of the configuration properties of LogConfig are simply aliases of TopicConfig.
Table 1. Topic-Level Configuration Properties
LogConfig KafkaConfig

CleanupPolicyProp

logCleanupPolicy

CompressionTypeProp

compressionType

DeleteRetentionMsProp

logCleanerDeleteRetentionMs

FileDeleteDelayMsProp

logDeleteDelayMs

FlushMessagesProp

logFlushIntervalMessages

FlushMsProp

logFlushIntervalMs

IndexIntervalBytesProp

logIndexIntervalBytes

MaxCompactionLagMsProp

logCleanerMaxCompactionLagMs

MaxMessageBytesProp

messageMaxBytes

MessageDownConversionEnableProp

logMessageDownConversionEnable

MessageFormatVersionProp

logMessageFormatVersion

MessageTimestampDifferenceMaxMsProp

logMessageTimestampDifferenceMaxMs

MessageTimestampTypeProp

logMessageTimestampType

MinCleanableDirtyRatioProp

logCleanerMinCleanRatio

MinCompactionLagMsProp

logCleanerMinCompactionLagMs

MinInSyncReplicasProp

minInSyncReplicas

PreAllocateEnableProp

logPreAllocateEnable

RetentionBytesProp

logRetentionBytes

RetentionMsProp

logRetentionTimeMillis

SegmentBytesProp

logSegmentBytes

SegmentIndexBytesProp

logIndexSizeMaxBytes

SegmentJitterMsProp

logRollTimeJitterMillis

SegmentMsProp

logRollTimeMillis

UncleanLeaderElectionEnableProp

uncleanLeaderElectionEnable

Note

copyKafkaConfigToLog is used when:

Internal Properties

Name Description

adminManager

AdminManager

apis

brokerMetadataCheckpoints

brokerState

BrokerState

_brokerTopicStats

Created when KafkaServer is requested to start up

Used (as brokerTopicStats method) to create the data-plane KafkaApis, the control-plane KafkaApis, the ReplicaManager, the LogManager when KafkaServer is requested to start up

controlPlaneRequestHandlerPool

controlPlaneRequestProcessor

Control-plane KafkaApis request processor (handler) for the optional control-plane KafkaRequestHandlerPool of the SocketServer

credentialProvider

CredentialProvider

dataPlaneRequestProcessor

Data-plane KafkaApis request handler for the data-plane RequestChannel of the SocketServer

dataPlaneRequestHandlerPool

dynamicConfigHandlers

ConfigHandlers by name:

Initialized when KafkaServer is requested to start up for the only purpose of creating the DynamicConfigManager.

dynamicConfigManager

DynamicConfigManager

groupCoordinator

GroupCoordinator (for the only purpose of creating the KafkaApis)

Created and immediately started up when KafkaServer is requested to start up

Shut down when KafkaServer is requested to shut down

isStartingUp

Flag for…​FIXME

kafkaController

KafkaController

kafkaHealthcheck

KafkaHealthcheck

kafkaScheduler

KafkaScheduler with the number of daemon threads as configured using background.threads configuration property (default: 10)

logContext

LogContext

logDirFailureChannel

LogDirFailureChannel

logManager

Used when:


Created and immediately started when KafkaServer is requested to start up.

Shut down when KafkaServer is requested to shut down.

metadataCache

MetadataCache that is created for the sake of creating the following services (at startup):

replicaManager

ReplicaManager used to create:


reporters

Collection of MetricsReporter

Used when…​FIXME

requestHandlerPool

KafkaRequestHandlerPool

socketServer

transactionCoordinator

TransactionCoordinator

quotaManagers

shutdownLatch

java.util.concurrent.CountDownLatch

startupComplete

Flag for…​FIXME

zkUtils

ZkUtils

results matching ""

    No results matching ""