KafkaProducer — Main Class For Kafka Producers

KafkaProducer is the default Producer client in Apache Kafka.

KafkaProducer is the class that a Kafka developer uses to send messages to a Kafka cluster.

// Necessary imports
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer

// Creating a KafkaProducer
import java.util.Properties
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val producer = new KafkaProducer[String, String](props)

// Creating a record to be sent
import org.apache.kafka.clients.producer.ProducerRecord
val r = new ProducerRecord[String, String]("0", "this is a message")

// Sending the record (with no Callback)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val metadataF: Future[RecordMetadata] = producer.send(r)

KafkaProducer uses the ProducerConfig.INTERCEPTOR_CLASSES_CONFIG configuration property to define ProducerInterceptors that are notified (about successes and failures) every time KafkaProducer is requested to send a record to a topic.

KafkaProducer uses the ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property to define the Partitioner that is used (to compute the partition for a record) when KafkaProducer is requested to partition (when requested to send a ProducerRecord to a Kafka cluster asynchronously).

Internally, KafkaProducer uses the Kafka producer I/O thread that is responsible for sending produce requests to a Kafka cluster (on kafka-producer-network-thread daemon thread of execution). The thread is started right when KafkaProducer is created.

Figure 1. KafkaProducer

KafkaProducer uses producer-metrics for…​FIXME


Enable ALL logging levels for org.apache.kafka.clients.producer.KafkaProducer logger to see what happens inside.

Add the following line to config/log4j.properties:


Refer to Logging.

Performance Metrics

KafkaProducer exposes performance metrics.

Table 1. KafkaProducer's Metrics
Metric Name Recording Level Description



Recorded when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously and BufferExhaustedException is reported.


Recorded when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously and there were exceptions reported (e.g. ApiException, InterruptedException, BufferExhaustedException and KafkaException)


Recorded when NetworkClient is requested to parseStructMaybeUpdateThrottleTimeMetrics (and a request was throttled due to quota violation)


Recorded when NetworkClient is requested to parseStructMaybeUpdateThrottleTimeMetrics (and a request was throttled due to quota violation)

Sending ProducerRecord to Kafka Cluster Asynchronously With Producer Interceptors — send Method

Future<RecordMetadata> send(
  ProducerRecord<K, V> record) (1)
Future<RecordMetadata> send(
  ProducerRecord<K, V> record,
  Callback callback)
  1. Uses no callback (null)

send is a part of Producer Contract to send a ProducerRecord to a Kafka cluster asynchronously.

In the end, send sends a ProducerRecord to a Kafka cluster asynchronously (with the intercepted ProducerRecord and the Callback).

Sending ProducerRecord to Kafka Cluster Asynchronously — doSend Internal Method

Future<RecordMetadata> doSend(
  ProducerRecord<K, V> record,
  Callback callback)



doSend is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously with producer interceptors.

Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method

ClusterResourceListeners configureClusterResourceListeners(
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  List<?>... candidateLists)

configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keySerializer and valueSerializer.


configureClusterResourceListeners is used exclusively when KafkaProducer is created (to create the Metadata) with the following input arguments:

Requesting Partitions for Topic — partitionsFor Method

List<PartitionInfo> partitionsFor(String topic)
partitionsFor is a part of Producer Contract.

partitionsFor waits on cluster metadata for the input topic and max.block.ms time. Once retrieved, partitionsFor requests Cluster for the partitions.

Waiting for Cluster Metadata (with Partitions for Topic) — waitOnMetadata Internal Recursive Method

ClusterAndWaitTime waitOnMetadata(
  String topic,
  Integer partition,
  long maxWaitMs) throws InterruptedException

waitOnMetadata adds the input topic to Metadata.

waitOnMetadata first checks if the available cluster metadata could be current enough.

waitOnMetadata requests Metadata for the current cluster information and then requests the cluster for the number of partitions of the input topic.

If the cluster metadata is not current enough (i.e. the number of partitions is unavailable or the partition is above the current count), waitOnMetadata prints out the following TRACE message to the logs:

Requesting metadata update for topic [topic].

waitOnMetadata requests Metadata for update and requests Sender to wake up.

waitOnMetadata then requests Metadata to wait for a metadata update and then Metadata for the current cluster information.

waitOnMetadata keeps doing it until the number of partitions of the input topic is available.

waitOnMetadata reports a TimeoutException when maxWaitMs has elapsed.

Failed to update metadata after [maxWaitMs] ms.

waitOnMetadata reports a TopicAuthorizationException when the access to the topic is unauthorized.

waitOnMetadata reports a KafkaException when the partition is above the number of available partitions.

Invalid partition given with record: [partition] is not in the range [0...[partitionsCount]).
waitOnMetadata is used when KafkaProducer is requested for the partitions of a topic and to send a ProducerRecord to a Kafka cluster asynchronously.

Creating KafkaProducer Instance

KafkaProducer takes the following when created:

KafkaProducer initializes the internal registries and counters.

While being created, KafkaProducer saves the ProducerConfig in the producerConfig internal registry and the time becomes SYSTEM.

KafkaProducer sets the clientId as the ProducerConfig.CLIENT_ID_CONFIG or uses producer-[id].

KafkaProducer prints out the following TRACE message to the logs:

Starting the Kafka producer

KafkaProducer creates a MetricConfig with the following:

KafkaProducer uses the ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG as the MetricsReporters and adds the JmxReporter (with kafka.producer prefix).

KafkaProducer sets the metrics as a new Metrics (with the MetricConfig, the MetricsReporters and the time).

KafkaProducer sets the keySerializer as follows:

KafkaProducer sets the valueSerializer as follows:

KafkaProducer sets the following:

KafkaProducer creates a new ApiVersions for the apiVersions.

KafkaProducer creates a new RecordAccumulator for the accumulator with the following configuration properties:

KafkaProducer sets the metadata as follows:

KafkaProducer creates a new NetworkClient (unless the input KafkaClient was given) with the following configuration properties:

KafkaProducer creates a new Sender as the sender with the following configuration properties:

KafkaProducer sets the transactionManager as configureTransactionState.

KafkaProducer configureRetries for the number of retries.

KafkaProducer configureInflightRequests for the maximum number of in-flight requests.

KafkaProducer configureAcks for acks.

KafkaProducer creates a new ProducerMetrics (with the metrics).

KafkaProducer starts the kafka-producer-network-thread daemon thread of execution for the sender.

KafkaProducer requests the ProducerConfig to logUnused.

KafkaProducer registers the AppInfo MBean (with kafka.producer JMX prefix, the clientId and the metrics).

In the end, KafkaProducer prints out the following DEBUG message to the logs:

Kafka producer started

In case of any errors, KafkaProducer closes itself with 0 millis timeout and throws a KafkaException:

Failed to construct kafka producer

Computing Partition For ProducerRecord — partition Internal Method

int partition(
  ProducerRecord<K, V> record,
  byte[] serializedKey,
  byte[] serializedValue,
  Cluster cluster)

partition requests the ProducerRecord for the partition and return it when specified. Otherwise, partition requests the Partitioner for the partition.

partition is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously.

beginTransaction Method

void beginTransaction() throws ProducerFencedException
beginTransaction is part of the Producer Contract to…​FIXME.

beginTransaction simply requests the internal TransactionManager to beginTransaction.

beginTransaction throws an IllegalStateException when the TransactionManager is undefined (null).

Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property

configureTransactionState Static Internal Method

TransactionManager configureTransactionState(
  ProducerConfig config,
  LogContext logContext,
  Logger log)


configureTransactionState is used exclusively when KafkaProducer is created (and initializes a TransactionManager).

Closing Kafka Producer — close Method

void close() (1)
void close(Duration timeout) (2)
// private API
void close(Duration timeout, boolean swallowException)
  1. Uses the maximum timeout (Long.MAX_VALUE)

  2. Disables swallowException flag (false)

close is a part of Producer Contract.


Flushing Accumulated Records — flush Method

void flush()
flush is a part of Producer Contract.


initTransactions Method

void initTransactions()
initTransactions is a part of Producer Contract.


sendOffsetsToTransaction Method

void sendOffsetsToTransaction(
  Map<TopicPartition, OffsetAndMetadata> offsets,
  String consumerGroupId)
throws ProducerFencedException
sendOffsetsToTransaction is a part of Producer Contract.


commitTransaction Method

void commitTransaction()
throws ProducerFencedException
commitTransaction is a part of Producer Contract.


abortTransaction Method

void abortTransaction()
throws ProducerFencedException
abortTransaction is a part of Producer Contract.


newSender Method

Sender newSender(
  LogContext logContext,
  KafkaClient kafkaClient,
  ProducerMetadata metadata)


newSender is used when KafkaProducer is created.

Internal Properties

Name Description



Client ID per CLIENT_ID_CONFIG (if defined) or producer-[number]

Used when:


ProducerInterceptors that are notified (about successes and failures) when KafkaProducer is requested to send a record to a topic

The ProducerInterceptors are initialized when the KafkaProducer is created using ProducerConfig.INTERCEPTOR_CLASSES_CONFIG configuration property.


kafka-producer-network-thread daemon thread of execution for the sender


Time KafkaProducer uses to block when requesting partitions for a topic.

Use max.block.ms Kafka property to set the value.



Partitioner that is used (to compute the partition for a record) when KafkaProducer is requested to partition (when requested to send a ProducerRecord to a Kafka cluster asynchronously)

The Partitioner is initialized when the KafkaProducer is created using ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property.



Kafka producer I/O thread (aka Sender) that is started when KafkaProducer is created.


Time abstraction (with SYSTEM being the default).


