KafkaProducer — Main Class For Kafka Producers

KafkaProducer is the main public class that you and other Kafka developers use to write Kafka producers that publish records to a Kafka cluster.

KafkaProducer.png
Figure 1. KafkaProducer

KafkaProducer is a part of the public API and is created with properties and (key and value) serializers as configuration.

KafkaProducer follows the Producer contract.

Table 1. KafkaProducer’s Internal Properties (e.g. Registries and Counters)
Name Description

maxBlockTimeMs

Time KafkaProducer uses to block when requesting partitions for a topic.

Note
Use max.block.ms Kafka property to set the value.

metadata

sender

Sender with the following:

  • …​FIXME

ioThread

Thread of execution…​FIXME

Tip

Enable DEBUG or TRACE logging levels for org.apache.kafka.clients.producer.KafkaProducer logger to see what happens inside.

Add the following line to config/tools-log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.KafkaProducer=TRACE

Refer to Logging.

Asynchronously Sending Record to Topic and Notifying Producer Interceptors — send Method

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
Note
send is a part of Producer Contract.

send…​FIXME

Asynchronously Sending Record to Topic — doSend Internal Method

Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)

doSend…​FIXME

Note
doSend is used exclusively when KafkaProducer asynchronously sends a record to a topic and notifies producer interceptors.

Creating KafkaProducer Instance

KafkaProducer takes the following when created:

  • FIXME

KafkaProducer initializes the internal registries and counters.

Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method

ClusterResourceListeners configureClusterResourceListeners(
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  List<?>... candidateLists)

configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keySerializer and valueSerializer.

Note

configureClusterResourceListeners is used exclusively when KafkaProducer is created (to create the Metadata) with the following input arguments:

Requesting Partitions for Topic — partitionsFor Method

List<PartitionInfo> partitionsFor(String topic)
Note
partitionsFor is a part of Producer Contract.

partitionsFor waits on cluster metadata for the input topic and max.block.ms time. Once retrieved, partitionsFor requests Cluster for the partitions.

Waiting for Cluster Metadata (with Partitions for Topic) — waitOnMetadata Internal Recursive Method

ClusterAndWaitTime waitOnMetadata(
  String topic,
  Integer partition,
  long maxWaitMs) throws InterruptedException

waitOnMetadata adds the input topic to Metadata.

waitOnMetadata first checks if the available cluster metadata could be current enough.

waitOnMetadata requests Metadata for the current cluster information and then requests the cluster for the number of partitions of the input topic.

If the cluster metadata is not current enough (i.e. the number of partitions is unavailable or the partition is above the current count), waitOnMetadata prints out the following TRACE message to the logs:

Requesting metadata update for topic [topic].

waitOnMetadata requests Metadata for update and requests Sender to wake up.

waitOnMetadata then requests Metadata to wait for a metadata update and then Metadata for the current cluster information.

waitOnMetadata keeps doing it until the number of partitions of the input topic is available.

waitOnMetadata reports a TimeoutException when maxWaitMs has elapsed.

Failed to update metadata after [maxWaitMs] ms.

waitOnMetadata reports a TopicAuthorizationException when the access to the topic is unauthorized.

waitOnMetadata reports a KafkaException when the partition is above the number of available partitions.

Invalid partition given with record: [partition] is not in the range [0...[partitionsCount]).
Note
waitOnMetadata is used when KafkaProducer requests partitions for a topic and asynchronously sends a record to a topic.

results matching ""

    No results matching ""