Metadata

Metadata describes a Kafka cluster (and is created) for KafkaConsumer and KafkaProducer.

Table 1. Metadata’s Properties When Created by Clients
Client refreshBackoffMs metadataExpireMs allowAutoTopicCreation topicExpiryEnabled

KafkaConsumer

retry.backoff.ms

metadata.max.age.ms

on

off

KafkaProducer

retry.backoff.ms

metadata.max.age.ms

on

on

A (seemingly) common usage pattern is as follows:

  1. Request Metadata for a update (that simply turns the needUpdate flag on)

  2. Request (indirectly) KafkaClient to wake up if blocked on I/O

  3. Request Metadata to wait for metadata change (i.e. until the metadata version has changed)

Metadata manages metadata update listeners that want to be notified about metadata updates. Listeners can be registered and deregistered.

Note
ConsumerCoordinator is the only entity to register a metadata update listener.
Table 2. Metadata’s Internal Properties (e.g. Registries and Counters)
Name Description

cluster

Cluster with a subset of the nodes and topic partitions in a Kafka cluster.

  • Empty (with no nodes and no topic partitions) when Metadata is created

  • Updated when:

    • DefaultMetadataUpdater handles MetadataResponse

    • KafkaConsumer, KafkaProducer, KafkaAdminClient and AdminClient are created and update the cluster with a "bootstrap" cluster with bootstrap brokers

  • Can be accessed using fetch

lastRefreshMs

The time (in millis) of the last successful update (and failed update)

lastSuccessfulRefreshMs

needMetadataForAllTopics

Flag…​FIXME

needUpdate

Flag that controls whether a metadata update has been requested (enabled) or not (disabled).

  • Starts turned off when Metadata is created

  • Turned on exclusively when Metadata is requested for an update

  • Turned off when Metadata is updated

Use updateRequested to know the current value.

version

Metadata version

Tip

Enable INFO, DEBUG or TRACE logging levels for org.apache.kafka.clients.Metadata logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.Metadata=TRACE

Refer to Logging.

Checking if Metadata Update was Requested — updateRequested Method

synchronized boolean updateRequested()

updateRequested…​FIXME

Note

updateRequested is used when:

Recording Update Request Failure — failedUpdate Method

synchronized void failedUpdate(long now, AuthenticationException authenticationException)

failedUpdate…​FIXME

Note
failedUpdate is used when…​FIXME

getClusterForCurrentTopics Internal Method

Cluster getClusterForCurrentTopics(Cluster cluster)

getClusterForCurrentTopics…​FIXME

Note
getClusterForCurrentTopics is used exclusively when Metadata is requested to update.

timeToNextUpdate Method

synchronized long timeToNextUpdate(long nowMs)

timeToNextUpdate…​FIXME

Note

timeToNextUpdate is used when:

add Method

synchronized void add(String topic)

add…​FIXME

Note
add is used when…​FIXME

Requesting Metadata Update — requestUpdate Method

int requestUpdate()

requestUpdate simply turns the needUpdate flag on and returns the current version.

Note

requestUpdate is used when:

Waiting for Metadata Update (i.e. Metadata Version Change) — awaitUpdate Method

synchronized void awaitUpdate(
  final int lastVersion,
  final long maxWaitMs) throws InterruptedException

awaitUpdate…​FIXME

Note
awaitUpdate is used when…​FIXME

Getting Current Cluster Metadata — fetch Method

synchronized Cluster fetch()

fetch returns current cluster information.

Note
fetch is used when…​FIXME

Setting Topics to Maintain — setTopics Method

Caution
FIXME

Updating Cluster Metadata — update Method

void update(
  Cluster newCluster,
  Set<String> unavailableTopics,
  long now)

update turns the needUpdate flag off and increments version.

update sets lastRefreshMs and lastSuccessfulRefreshMs internal registries to the given now.

(only when topicExpiryEnabled is enabled, e.g. KafkaProducer) update…​FIXME

update notifies the Metadata.Listeners that the metadata has been updated.

update does getClusterForCurrentTopics for the cluster when needMetadataForAllTopics flag is on and turns needUpdate flag off (that may have been turned on…​FIXME).

update sets the cluster to the given Cluster when the needMetadataForAllTopics flag is off.

update prints out the following INFO message to the logs with the cluster ID and notifies clusterResourceListeners that cluster has changed (only for a non-bootstrap cluster).

Cluster ID: [clusterId]

In the end, update prints out the following DEBUG message to the logs:

Updated cluster metadata version [version] to [cluster]
Note

update is used when:

Creating Metadata Instance

Metadata takes the following when created:

Metadata initializes the internal registries and counters.

Conditionally Requesting Update For New Topics (for KafkaConsumer) — needMetadataForAllTopics Method

synchronized void needMetadataForAllTopics(boolean needMetadataForAllTopics)

needMetadataForAllTopics requestUpdateForNewTopics when the input needMetadataForAllTopics flag is enabled (i.e. true) and the current needMetadataForAllTopics is disabled (i.e. false).

needMetadataForAllTopics sets needMetadataForAllTopics to be the input needMetadataForAllTopics.

Note

needMetadataForAllTopics is used when KafkaConsumer:

requestUpdateForNewTopics Internal Method

synchronized void requestUpdateForNewTopics()

requestUpdateForNewTopics sets lastRefreshMs to 0 and requests update.

Note

requestUpdateForNewTopics is used when Metadata:

Registering Metadata.Listener — addListener Method

void addListener(Listener listener)

addListener simply adds the given Listener to the listeners internal registry.

Note
addListener is used exclusively when ConsumerCoordinator is requested to addMetadataListener (when created).

results matching ""

    No results matching ""