// Necessary imports
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
// Creating a KafkaProducer
import java.util.Properties
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val producer = new KafkaProducer[String, String](props)
// Creating a record to be sent
import org.apache.kafka.clients.producer.ProducerRecord
val r = new ProducerRecord[String, String]("0", "this is a message")
// Sending the record (with no Callback)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val metadataF: Future[RecordMetadata] = producer.send(r)
KafkaProducer — Main Class For Kafka Producers
KafkaProducer is the default Producer client in Apache Kafka.
KafkaProducer is the class that a Kafka developer uses to send messages to a Kafka cluster.
KafkaProducer uses the ProducerConfig.INTERCEPTOR_CLASSES_CONFIG configuration property to define ProducerInterceptors that are notified (about successes and failures) every time KafkaProducer is requested to send a record to a topic.
KafkaProducer uses the ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property to define the Partitioner that is used (to compute the partition for a record) when KafkaProducer is requested to partition (when requested to send a ProducerRecord to a Kafka cluster asynchronously).
Internally, KafkaProducer uses the Kafka producer I/O thread that is responsible for sending produce requests to a Kafka cluster (on kafka-producer-network-thread daemon thread of execution). The thread is started right when KafkaProducer is created.
KafkaProducer uses producer-metrics for…FIXME
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Performance Metrics
KafkaProducer exposes performance metrics.
| Metric Name | Recording Level | Description |
|---|---|---|
|
|
Recorded when |
|
Recorded when |
|
|
Recorded when |
|
|
Recorded when |
Sending ProducerRecord to Kafka Cluster Asynchronously With Producer Interceptors — send Method
Future<RecordMetadata> send(
ProducerRecord<K, V> record) (1)
Future<RecordMetadata> send(
ProducerRecord<K, V> record,
Callback callback)
-
Uses no callback (
null)
|
Note
|
send is a part of Producer Contract to send a ProducerRecord to a Kafka cluster asynchronously.
|
send requests the ProducerInterceptors to pass the ProducerRecord through a chain of registered producer interceptors.
In the end, send sends a ProducerRecord to a Kafka cluster asynchronously (with the intercepted ProducerRecord and the Callback).
Sending ProducerRecord to Kafka Cluster Asynchronously — doSend Internal Method
Future<RecordMetadata> doSend(
ProducerRecord<K, V> record,
Callback callback)
doSend…FIXME
doSend requests the RecordAccumulator to append the serialized key and value.
doSend…FIXME
|
Note
|
doSend is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously with producer interceptors.
|
Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method
ClusterResourceListeners configureClusterResourceListeners(
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
List<?>... candidateLists)
configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keySerializer and valueSerializer.
|
Note
|
|
Requesting Partitions for Topic — partitionsFor Method
List<PartitionInfo> partitionsFor(String topic)
|
Note
|
partitionsFor is a part of Producer Contract.
|
partitionsFor waits on cluster metadata for the input topic and max.block.ms time. Once retrieved, partitionsFor requests Cluster for the partitions.
Waiting for Cluster Metadata (with Partitions for Topic) — waitOnMetadata Internal Recursive Method
ClusterAndWaitTime waitOnMetadata(
String topic,
Integer partition,
long maxWaitMs) throws InterruptedException
waitOnMetadata first checks if the available cluster metadata could be current enough.
waitOnMetadata requests Metadata for the current cluster information and then requests the cluster for the number of partitions of the input topic.
If the cluster metadata is not current enough (i.e. the number of partitions is unavailable or the partition is above the current count), waitOnMetadata prints out the following TRACE message to the logs:
Requesting metadata update for topic [topic].
waitOnMetadata then requests Metadata to wait for a metadata update and then Metadata for the current cluster information.
waitOnMetadata keeps doing it until the number of partitions of the input topic is available.
waitOnMetadata reports a TimeoutException when maxWaitMs has elapsed.
Failed to update metadata after [maxWaitMs] ms.
waitOnMetadata reports a TopicAuthorizationException when the access to the topic is unauthorized.
waitOnMetadata reports a KafkaException when the partition is above the number of available partitions.
Invalid partition given with record: [partition] is not in the range [0...[partitionsCount]).
|
Note
|
waitOnMetadata is used when KafkaProducer is requested for the partitions of a topic and to send a ProducerRecord to a Kafka cluster asynchronously.
|
Creating KafkaProducer Instance
KafkaProducer takes the following when created:
-
Serializer for keys
-
Serializer for values
KafkaProducer initializes the internal registries and counters.
While being created, KafkaProducer saves the ProducerConfig in the producerConfig internal registry and the time becomes SYSTEM.
KafkaProducer sets the clientId as the ProducerConfig.CLIENT_ID_CONFIG or uses producer-[id].
KafkaProducer prints out the following TRACE message to the logs:
Starting the Kafka producer
KafkaProducer creates a MetricConfig with the following:
-
Number of samples as ProducerConfig.METRICS_NUM_SAMPLES_CONFIG
-
Time window of ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG milliseconds
-
Recording level as ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG
-
Metrics tags with a single pair of
client-idand the clientId
KafkaProducer uses the ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG as the MetricsReporters and adds the JmxReporter (with kafka.producer prefix).
KafkaProducer sets the metrics as a new Metrics (with the MetricConfig, the MetricsReporters and the time).
KafkaProducer sets the partitioner as ProducerConfig.PARTITIONER_CLASS_CONFIG.
KafkaProducer sets the keySerializer as follows:
-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG when the input
keySerializerwas not defined and immediately requests theSerializerto configure itself -
The input
keySerializer
KafkaProducer sets the valueSerializer as follows:
-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG when the input
keySerializerwas not defined and immediately requests theSerializerto configure itself -
The input
valueSerializer
KafkaProducer sets the interceptors as ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.
KafkaProducer sets the following:
KafkaProducer creates a new ApiVersions for the apiVersions.
KafkaProducer creates a new RecordAccumulator for the accumulator with the following configuration properties:
KafkaProducer sets the metadata as follows:
-
Creates a new Metadata (with ProducerConfig.RETRY_BACKOFF_MS_CONFIG, ProducerConfig.METADATA_MAX_AGE_CONFIG and configureClusterResourceListeners) and immediately requests the Metadata to update (with ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
-
The input
metadataif given
KafkaProducer creates a new NetworkClient (unless the input KafkaClient was given) with the following configuration properties:
KafkaProducer sets the transactionManager as configureTransactionState.
KafkaProducer configureRetries for the number of retries.
KafkaProducer configureInflightRequests for the maximum number of in-flight requests.
KafkaProducer configureAcks for acks.
KafkaProducer creates a new ProducerMetrics (with the metrics).
KafkaProducer starts the kafka-producer-network-thread daemon thread of execution for the sender.
KafkaProducer requests the ProducerConfig to logUnused.
KafkaProducer registers the AppInfo MBean (with kafka.producer JMX prefix, the clientId and the metrics).
In the end, KafkaProducer prints out the following DEBUG message to the logs:
Kafka producer started
In case of any errors, KafkaProducer closes itself with 0 millis timeout and throws a KafkaException:
Failed to construct kafka producer
Computing Partition For ProducerRecord — partition Internal Method
int partition(
ProducerRecord<K, V> record,
byte[] serializedKey,
byte[] serializedValue,
Cluster cluster)
partition requests the ProducerRecord for the partition and return it when specified. Otherwise, partition requests the Partitioner for the partition.
|
Note
|
partition is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously.
|
beginTransaction Method
void beginTransaction() throws ProducerFencedException
|
Note
|
beginTransaction is part of the Producer Contract to…FIXME.
|
beginTransaction simply requests the internal TransactionManager to beginTransaction.
beginTransaction throws an IllegalStateException when the TransactionManager is undefined (null).
Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property
configureTransactionState Static Internal Method
TransactionManager configureTransactionState(
ProducerConfig config,
LogContext logContext,
Logger log)
configureTransactionState…FIXME
|
Note
|
configureTransactionState is used exclusively when KafkaProducer is created (and initializes a TransactionManager).
|
Closing Kafka Producer — close Method
void close() (1)
void close(Duration timeout) (2)
// private API
void close(Duration timeout, boolean swallowException)
-
Uses the maximum timeout (
Long.MAX_VALUE) -
Disables
swallowExceptionflag (false)
|
Note
|
close is a part of Producer Contract.
|
close…FIXME
Flushing Accumulated Records — flush Method
void flush()
|
Note
|
flush is a part of Producer Contract.
|
flush…FIXME
initTransactions Method
void initTransactions()
|
Note
|
initTransactions is a part of Producer Contract.
|
initTransactions…FIXME
sendOffsetsToTransaction Method
void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId)
throws ProducerFencedException
|
Note
|
sendOffsetsToTransaction is a part of Producer Contract.
|
sendOffsetsToTransaction…FIXME
commitTransaction Method
void commitTransaction()
throws ProducerFencedException
|
Note
|
commitTransaction is a part of Producer Contract.
|
commitTransaction…FIXME
abortTransaction Method
void abortTransaction()
throws ProducerFencedException
|
Note
|
abortTransaction is a part of Producer Contract.
|
abortTransaction…FIXME
newSender Method
Sender newSender(
LogContext logContext,
KafkaClient kafkaClient,
ProducerMetadata metadata)
newSender…FIXME
|
Note
|
newSender is used when KafkaProducer is created.
|
Internal Properties
| Name | Description | ||
|---|---|---|---|
|
|||
|
Client ID per CLIENT_ID_CONFIG (if defined) or Used when:
|
||
|
ProducerInterceptors that are notified (about successes and failures) when The |
||
|
kafka-producer-network-thread daemon thread of execution for the sender |
||
|
Time
|
||
|
|
||
|
Partitioner that is used (to compute the partition for a record) when The |
||
|
|||
|
Kafka producer I/O thread (aka |
||
|
|||
|
Used when:
|