Metadata

Metadata describes a Kafka cluster…​FIXME

Metadata is created right when KafkaConsumer, KafkaProducer, KafkaAdminClient and AdminClient are created.

Table 1. Metadata’s Properties When Created by Clients
Client refreshBackoffMs metadataExpireMs allowAutoTopicCreation topicExpiryEnabled

KafkaConsumer

retry.backoff.ms

metadata.max.age.ms

enabled

disabled

KafkaProducer

retry.backoff.ms

metadata.max.age.ms

enabled

enabled

KafkaAdminClient

retry.backoff.ms

metadata.max.age.ms

AdminClient

A (seemingly) common usage pattern is as follows:

  1. Request Metadata for a update (that simply turns the needUpdate flag on)

  2. Request (indirectly) KafkaClient to wake up if blocked on I/O

  3. Request Metadata to wait for metadata change (i.e. until the metadata version has changed)

Table 2. Metadata’s Internal Properties (e.g. Registries and Counters)
Name Description

cluster

Cluster with a subset of the nodes and topic partitions in a Kafka cluster.

  • Empty (with no nodes and no topic partitions) when Metadata is created

  • Updated when:

    • DefaultMetadataUpdater handles MetadataResponse

    • KafkaConsumer, KafkaProducer, KafkaAdminClient and AdminClient are created and update the cluster with a "bootstrap" cluster with bootstrap brokers

  • Can be accessed using fetch

listeners

lastRefreshMs

The time (in millis) of the last successful update (and failed update)

lastSuccessfulRefreshMs

needMetadataForAllTopics

Flag…​FIXME

needUpdate

Flag that controls whether a metadata update has been requested (enabled) or not (disabled).

  • Starts turned off when Metadata is created

  • Turned on exclusively when Metadata is requested for an update

  • Turned off when Metadata is updated

Use updateRequested to know the current value.

version

Metadata version

Tip

Enable DEBUG or TRACE logging levels for org.apache.kafka.clients.Metadata logger to see what happens inside.

Add the following line to config/tools-log4j.properties:

log4j.logger.org.apache.kafka.clients.Metadata=TRACE

Refer to Logging.

Checking if Metadata Update was Requested — updateRequested Method

synchronized boolean updateRequested()

updateRequested…​FIXME

Note

updateRequested is used when:

Recording Update Request Failure — failedUpdate Method

synchronized void failedUpdate(long now, AuthenticationException authenticationException)

failedUpdate…​FIXME

Note
failedUpdate is used when…​FIXME

getClusterForCurrentTopics Internal Method

Cluster getClusterForCurrentTopics(Cluster cluster)

getClusterForCurrentTopics…​FIXME

Note
getClusterForCurrentTopics is used when…​FIXME

timeToNextUpdate Method

synchronized long timeToNextUpdate(long nowMs)

timeToNextUpdate…​FIXME

Note

timeToNextUpdate is used when:

add Method

synchronized void add(String topic)

add…​FIXME

Note
add is used when…​FIXME

requestUpdate Method

synchronized int requestUpdate()

requestUpdate…​FIXME

Note
requestUpdate is used when…​FIXME

Waiting for Metadata Update (i.e. Metadata Version Change) — awaitUpdate Method

synchronized void awaitUpdate(
  final int lastVersion,
  final long maxWaitMs) throws InterruptedException

awaitUpdate…​FIXME

Note
awaitUpdate is used when…​FIXME

Getting Current Cluster Information — fetch Method

synchronized Cluster fetch()

fetch returns current cluster information.

Note
fetch is used when…​FIXME

Setting Topics to Maintain — setTopics Method

Caution
FIXME

Updating Cluster Metadata — update Method

synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now)

update turns needUpdate flag off and increments version.

update sets lastRefreshMs and lastSuccessfulRefreshMs internal registries to the input now.

(only when topicExpiryEnabled is enabled, e.g. KafkaProducer) update…​FIXME

update notifies listeners that the metadata has been updated.

update does getClusterForCurrentTopics for the cluster when needMetadataForAllTopics flag is on and turns needUpdate flag off (that may have been turned on…​FIXME).

update sets the cluster to the input cluster.

update prints out the cluster ID and notifies clusterResourceListeners that cluster has changed (only for a non-bootstrap cluster).

Cluster ID: [clusterId]
Note

update is used when:

  • DefaultMetadataUpdater handles MetadataResponse

  • KafkaConsumer is created (and updates the cluster with a "bootstrap" cluster with bootstrap servers)

  • KafkaProducer is created (and updates the cluster with a "bootstrap" cluster with bootstrap servers)

  • KafkaAdminClient is created (and updates the cluster with a "bootstrap" cluster with bootstrap brokers)

  • AdminClient is created (and updates the cluster with a "bootstrap" cluster with bootstrap brokers)

Creating Metadata Instance

Metadata takes the following when created:

Metadata initializes the internal registries and counters.

Conditionally Requesting Update For New Topics (for KafkaConsumer) — needMetadataForAllTopics Method

synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics)

needMetadataForAllTopics requestUpdateForNewTopics when the input needMetadataForAllTopics flag is enabled (i.e. true) and the current needMetadataForAllTopics is disabled (i.e. false).

needMetadataForAllTopics sets needMetadataForAllTopics to be the input needMetadataForAllTopics.

Note

needMetadataForAllTopics is used when KafkaConsumer:

requestUpdateForNewTopics Internal Method

synchronized void requestUpdateForNewTopics()

requestUpdateForNewTopics sets lastRefreshMs to 0 and requests update.

Note

requestUpdateForNewTopics is used when Metadata:

results matching ""

    No results matching ""