KafkaProducer — Main Class For Kafka Producers

KafkaProducer is the one and only known Producer in Apache Kafka.

KafkaProducer is the class that a Kafka developer use to send messages to a Kafka cluster.

// Necessary imports
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer

// Creating a KafkaProducer
import java.util.Properties
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
val producer = new KafkaProducer[String, String](props)

// Creating a record to be sent
import org.apache.kafka.clients.producer.ProducerRecord
val r = new ProducerRecord[String, String]("0", "this is a message")

// Sending the record
producer.send(r)

KafkaProducer uses the ProducerConfig.INTERCEPTOR_CLASSES_CONFIG configuration property to define ProducerInterceptors that are notified (about successes and failures) every time KafkaProducer is requested to send a record to a topic.

KafkaProducer uses the ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property to define the Partitioner that is used (to compute the partition for a record) when KafkaProducer is requested to partition (when requested to send a record to topic asynchronously).

Internally, KafkaProducer uses the Kafka producer I/O thread that is responsible for sending produce requests to a Kafka cluster (on kafka-producer-network-thread daemon thread of execution). The thread is started right when KafkaProducer is created.

KafkaProducer.png
Figure 1. KafkaProducer

KafkaProducer exposes performance metrics.

Table 1. KafkaProducer’s Metrics
Metric Name Recording Level Description

buffer-exhausted-records

INFO

Recorded when KafkaProducer is requested to doSend and BufferExhaustedException is reported.

errors

Recorded when KafkaProducer is requested to doSend and there were exceptions reported (e.g. ApiException, InterruptedException, BufferExhaustedException and KafkaException)

produce-throttle-time-avg

Recorded when NetworkClient is requested to parseStructMaybeUpdateThrottleTimeMetrics (and a request was throttled due to quota violation)

produce-throttle-time-max

Recorded when NetworkClient is requested to parseStructMaybeUpdateThrottleTimeMetrics (and a request was throttled due to quota violation)

Table 2. KafkaProducer’s Internal Properties (e.g. Registries and Counters)
Name Description

clientId

Client ID per CLIENT_ID_CONFIG (if defined) or producer-[number]

Used when:

interceptors

ProducerInterceptors that are notified (about successes and failures) when KafkaProducer is requested to send a record to a topic

The ProducerInterceptors are initialized when the KafkaProducer is created using ProducerConfig.INTERCEPTOR_CLASSES_CONFIG configuration property.

ioThread

kafka-producer-network-thread daemon thread of execution for the sender

maxBlockTimeMs

Time KafkaProducer uses to block when requesting partitions for a topic.

Note
Use max.block.ms Kafka property to set the value.

metadata

partitioner

Partitioner that is used (to compute the partition for a record) when KafkaProducer is requested to partition (when requested to send a record to topic asynchronously)

The Partitioner is initialized when the KafkaProducer is created using ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property.

producerConfig

sender

Kafka producer I/O thread (aka Sender) that is started at the time KafkaProducer is created.

time

Time abstraction with SYSTEM as the default.

Tip

Enable DEBUG or TRACE logging levels for org.apache.kafka.clients.producer.KafkaProducer logger to see what happens inside.

Add the following line to config/tools-log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.KafkaProducer=TRACE

Refer to Logging.

(Asynchronously) Sending Record to Topic and Notifying Producer Interceptors — send Method

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
Note
send is a part of Producer Contract.

send…​FIXME

Asynchronously Sending Record to Topic — doSend Internal Method

Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback)

doSend…​FIXME

Note
doSend is used exclusively when KafkaProducer is requested to asynchronously send a record to a topic and notifies producer interceptors.

Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method

ClusterResourceListeners configureClusterResourceListeners(
  Serializer<K> keySerializer,
  Serializer<V> valueSerializer,
  List<?>... candidateLists)

configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keySerializer and valueSerializer.

Note

configureClusterResourceListeners is used exclusively when KafkaProducer is created (to create the Metadata) with the following input arguments:

Requesting Partitions for Topic — partitionsFor Method

List<PartitionInfo> partitionsFor(String topic)
Note
partitionsFor is a part of Producer Contract.

partitionsFor waits on cluster metadata for the input topic and max.block.ms time. Once retrieved, partitionsFor requests Cluster for the partitions.

Waiting for Cluster Metadata (with Partitions for Topic) — waitOnMetadata Internal Recursive Method

ClusterAndWaitTime waitOnMetadata(
  String topic,
  Integer partition,
  long maxWaitMs) throws InterruptedException

waitOnMetadata adds the input topic to Metadata.

waitOnMetadata first checks if the available cluster metadata could be current enough.

waitOnMetadata requests Metadata for the current cluster information and then requests the cluster for the number of partitions of the input topic.

If the cluster metadata is not current enough (i.e. the number of partitions is unavailable or the partition is above the current count), waitOnMetadata prints out the following TRACE message to the logs:

Requesting metadata update for topic [topic].

waitOnMetadata requests Metadata for update and requests Sender to wake up.

waitOnMetadata then requests Metadata to wait for a metadata update and then Metadata for the current cluster information.

waitOnMetadata keeps doing it until the number of partitions of the input topic is available.

waitOnMetadata reports a TimeoutException when maxWaitMs has elapsed.

Failed to update metadata after [maxWaitMs] ms.

waitOnMetadata reports a TopicAuthorizationException when the access to the topic is unauthorized.

waitOnMetadata reports a KafkaException when the partition is above the number of available partitions.

Invalid partition given with record: [partition] is not in the range [0...[partitionsCount]).
Note
waitOnMetadata is used when KafkaProducer requests partitions for a topic and asynchronously sends a record to a topic.

Creating KafkaProducer Instance

KafkaProducer takes the following when created:

KafkaProducer initializes the internal registries and counters.

While being created, KafkaProducer saves the ProducerConfig in the producerConfig internal registry and the time becomes SYSTEM.

KafkaProducer sets the clientId as the ProducerConfig.CLIENT_ID_CONFIG or uses producer-[id].

KafkaProducer prints out the following TRACE message to the logs:

Starting the Kafka producer

KafkaProducer creates a MetricConfig with the following:

KafkaProducer uses the ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG as the MetricsReporters and adds the JmxReporter (with kafka.producer prefix).

KafkaProducer sets the metrics as a new Metrics (with the MetricConfig, the MetricsReporters and the time).

KafkaProducer sets the keySerializer as follows:

KafkaProducer sets the valueSerializer as follows:

KafkaProducer sets the following:

KafkaProducer creates a new ApiVersions for the apiVersions.

KafkaProducer creates a new RecordAccumulator for the accumulator with the following configuration properties:

KafkaProducer sets the metadata as follows:

KafkaProducer creates a new NetworkClient (unless the input KafkaClient was given) with the following configuration properties:

KafkaProducer creates a new Sender as the sender with the following configuration properties:

KafkaProducer sets the transactionManager as configureTransactionState.

KafkaProducer configureRetries for the number of retries.

KafkaProducer configureInflightRequests for the maximum number of in-flight requests.

KafkaProducer configureAcks for acks.

KafkaProducer creates a new ProducerMetrics (with the metrics).

KafkaProducer starts the kafka-producer-network-thread daemon thread of execution for the sender.

KafkaProducer requests the ProducerConfig to logUnused.

KafkaProducer registers the AppInfo MBean (with kafka.producer JMX prefix, the clientId and the metrics).

In the end, KafkaProducer prints out the following DEBUG message to the logs:

Kafka producer started

In case of any errors, KafkaProducer closes itself with 0 millis timeout and throws a KafkaException:

Failed to construct kafka producer

Computing Partition For ProducerRecord — partition Internal Method

int partition(
  ProducerRecord<K, V> record,
  byte[] serializedKey,
  byte[] serializedValue,
  Cluster cluster)

partition…​FIXME

Note
partition is used exclusively when KafkaProducer is requested to send a record to topic asynchronously.

results matching ""

    No results matching ""