KafkaProducer — Main Class For Kafka Producers

KafkaProducer is the main public class that you and other Kafka developers use to write Kafka producers that publish records to a Kafka cluster.

Figure 1. KafkaProducer

KafkaProducer is a part of the public API and is created with properties and (key and value) serializers as configuration.

KafkaProducer follows the Producer contract.

Table 1. KafkaProducer’s Internal Properties (e.g. Registries and Counters)
Name Description


Time KafkaProducer uses to block when requesting partitions for a topic.

Use max.block.ms Kafka property to set the value.



Sender with the following:

  • …​FIXME


Thread of execution…​FIXME


Enable DEBUG or TRACE logging levels for org.apache.kafka.clients.producer.KafkaProducer logger to see what happens inside.

Add the following line to config/tools-log4j.properties:


Refer to Logging.

Asynchronously Sending Record to Topic and Notifying Producer Interceptors — send Method

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
send is a part of Producer Contract.


Asynchronously Sending Record to Topic — doSend Internal Method

Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)


doSend is used exclusively when KafkaProducer asynchronously sends a record to a topic and notifies producer interceptors.

Creating KafkaProducer Instance

KafkaProducer takes the following when created:


KafkaProducer initializes the internal registries and counters.

Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method

ClusterResourceListeners configureClusterResourceListeners(
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  List<?>... candidateLists)

configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keySerializer and valueSerializer.


configureClusterResourceListeners is used exclusively when KafkaProducer is created (to create the Metadata) with the following input arguments:

Requesting Partitions for Topic — partitionsFor Method

List<PartitionInfo> partitionsFor(String topic)
partitionsFor is a part of Producer Contract.

partitionsFor waits on cluster metadata for the input topic and max.block.ms time. Once retrieved, partitionsFor requests Cluster for the partitions.

Waiting for Cluster Metadata (with Partitions for Topic) — waitOnMetadata Internal Recursive Method

ClusterAndWaitTime waitOnMetadata(
  String topic,
  Integer partition,
  long maxWaitMs) throws InterruptedException

waitOnMetadata adds the input topic to Metadata.

waitOnMetadata first checks if the available cluster metadata could be current enough.

waitOnMetadata requests Metadata for the current cluster information and then requests the cluster for the number of partitions of the input topic.

If the cluster metadata is not current enough (i.e. the number of partitions is unavailable or the partition is above the current count), waitOnMetadata prints out the following TRACE message to the logs:

Requesting metadata update for topic [topic].

waitOnMetadata requests Metadata for update and requests Sender to wake up.

waitOnMetadata then requests Metadata to wait for a metadata update and then Metadata for the current cluster information.

waitOnMetadata keeps doing it until the number of partitions of the input topic is available.

waitOnMetadata reports a TimeoutException when maxWaitMs has elapsed.

Failed to update metadata after [maxWaitMs] ms.

waitOnMetadata reports a TopicAuthorizationException when the access to the topic is unauthorized.

waitOnMetadata reports a KafkaException when the partition is above the number of available partitions.

Invalid partition given with record: [partition] is not in the range [0...[partitionsCount]).
waitOnMetadata is used when KafkaProducer requests partitions for a topic and asynchronously sends a record to a topic.

results matching ""

    No results matching ""