// Necessary imports
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringSerializer
// Creating a KafkaProducer
import java.util.Properties
val props = new Properties()
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
val producer = new KafkaProducer[String, String](props)
// Creating a record to be sent
import org.apache.kafka.clients.producer.ProducerRecord
val r = new ProducerRecord[String, String]("0", "this is a message")
// Sending the record (with no Callback)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val metadataF: Future[RecordMetadata] = producer.send(r)
KafkaProducer — Main Class For Kafka Producers
KafkaProducer
is the default Producer client in Apache Kafka.
KafkaProducer
is the class that a Kafka developer uses to send messages to a Kafka cluster.
KafkaProducer
uses the ProducerConfig.INTERCEPTOR_CLASSES_CONFIG configuration property to define ProducerInterceptors that are notified (about successes and failures) every time KafkaProducer
is requested to send a record to a topic.
KafkaProducer
uses the ProducerConfig.PARTITIONER_CLASS_CONFIG configuration property to define the Partitioner that is used (to compute the partition for a record) when KafkaProducer
is requested to partition (when requested to send a ProducerRecord to a Kafka cluster asynchronously).
Internally, KafkaProducer
uses the Kafka producer I/O thread that is responsible for sending produce requests to a Kafka cluster (on kafka-producer-network-thread daemon thread of execution). The thread is started right when KafkaProducer
is created.
KafkaProducer
uses producer-metrics for…FIXME
Tip
|
Enable Add the following line to
Refer to Logging. |
Performance Metrics
KafkaProducer
exposes performance metrics.
Metric Name | Recording Level | Description |
---|---|---|
|
|
Recorded when |
|
Recorded when |
|
|
Recorded when |
|
|
Recorded when |
Sending ProducerRecord to Kafka Cluster Asynchronously With Producer Interceptors — send
Method
Future<RecordMetadata> send(
ProducerRecord<K, V> record) (1)
Future<RecordMetadata> send(
ProducerRecord<K, V> record,
Callback callback)
-
Uses no callback (
null
)
Note
|
send is a part of Producer Contract to send a ProducerRecord to a Kafka cluster asynchronously.
|
send
requests the ProducerInterceptors to pass the ProducerRecord through a chain of registered producer interceptors.
In the end, send
sends a ProducerRecord to a Kafka cluster asynchronously (with the intercepted ProducerRecord and the Callback).
Sending ProducerRecord to Kafka Cluster Asynchronously — doSend
Internal Method
Future<RecordMetadata> doSend(
ProducerRecord<K, V> record,
Callback callback)
doSend
…FIXME
doSend
requests the RecordAccumulator to append the serialized key and value.
doSend
…FIXME
Note
|
doSend is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously with producer interceptors.
|
Configuring ClusterResourceListeners — configureClusterResourceListeners
Internal Method
ClusterResourceListeners configureClusterResourceListeners(
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
List<?>... candidateLists)
configureClusterResourceListeners
creates a ClusterResourceListeners and registers ClusterResourceListener
instances from the input candidateLists
, keySerializer
and valueSerializer
.
Note
|
|
Requesting Partitions for Topic — partitionsFor
Method
List<PartitionInfo> partitionsFor(String topic)
Note
|
partitionsFor is a part of Producer Contract.
|
partitionsFor
waits on cluster metadata for the input topic
and max.block.ms time. Once retrieved, partitionsFor
requests Cluster
for the partitions.
Waiting for Cluster Metadata (with Partitions for Topic) — waitOnMetadata
Internal Recursive Method
ClusterAndWaitTime waitOnMetadata(
String topic,
Integer partition,
long maxWaitMs) throws InterruptedException
waitOnMetadata
first checks if the available cluster metadata could be current enough.
waitOnMetadata
requests Metadata for the current cluster information and then requests the cluster for the number of partitions of the input topic
.
If the cluster metadata is not current enough (i.e. the number of partitions is unavailable or the partition
is above the current count), waitOnMetadata
prints out the following TRACE message to the logs:
Requesting metadata update for topic [topic].
waitOnMetadata
then requests Metadata to wait for a metadata update and then Metadata for the current cluster information.
waitOnMetadata
keeps doing it until the number of partitions of the input topic
is available.
waitOnMetadata
reports a TimeoutException
when maxWaitMs
has elapsed.
Failed to update metadata after [maxWaitMs] ms.
waitOnMetadata
reports a TopicAuthorizationException
when the access to the topic
is unauthorized.
waitOnMetadata
reports a KafkaException
when the partition
is above the number of available partitions.
Invalid partition given with record: [partition] is not in the range [0...[partitionsCount]).
Note
|
waitOnMetadata is used when KafkaProducer is requested for the partitions of a topic and to send a ProducerRecord to a Kafka cluster asynchronously.
|
Creating KafkaProducer Instance
KafkaProducer
takes the following when created:
-
Serializer for keys
-
Serializer for values
KafkaProducer
initializes the internal registries and counters.
While being created, KafkaProducer
saves the ProducerConfig in the producerConfig internal registry and the time becomes SYSTEM
.
KafkaProducer
sets the clientId as the ProducerConfig.CLIENT_ID_CONFIG or uses producer-[id]
.
KafkaProducer
prints out the following TRACE
message to the logs:
Starting the Kafka producer
KafkaProducer
creates a MetricConfig
with the following:
-
Number of samples as ProducerConfig.METRICS_NUM_SAMPLES_CONFIG
-
Time window of ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG milliseconds
-
Recording level as ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG
-
Metrics tags with a single pair of
client-id
and the clientId
KafkaProducer
uses the ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG as the MetricsReporters and adds the JmxReporter (with kafka.producer
prefix).
KafkaProducer
sets the metrics as a new Metrics (with the MetricConfig
, the MetricsReporters
and the time).
KafkaProducer
sets the partitioner as ProducerConfig.PARTITIONER_CLASS_CONFIG.
KafkaProducer
sets the keySerializer as follows:
-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG when the input
keySerializer
was not defined and immediately requests theSerializer
to configure itself -
The input
keySerializer
KafkaProducer
sets the valueSerializer as follows:
-
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG when the input
keySerializer
was not defined and immediately requests theSerializer
to configure itself -
The input
valueSerializer
KafkaProducer
sets the interceptors as ProducerConfig.INTERCEPTOR_CLASSES_CONFIG.
KafkaProducer
sets the following:
KafkaProducer
creates a new ApiVersions
for the apiVersions.
KafkaProducer
creates a new RecordAccumulator for the accumulator with the following configuration properties:
KafkaProducer
sets the metadata as follows:
-
Creates a new Metadata (with ProducerConfig.RETRY_BACKOFF_MS_CONFIG, ProducerConfig.METADATA_MAX_AGE_CONFIG and configureClusterResourceListeners) and immediately requests the Metadata to update (with ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
-
The input
metadata
if given
KafkaProducer
creates a new NetworkClient (unless the input KafkaClient was given) with the following configuration properties:
KafkaProducer
sets the transactionManager as configureTransactionState.
KafkaProducer
configureRetries for the number of retries.
KafkaProducer
configureInflightRequests for the maximum number of in-flight requests.
KafkaProducer
configureAcks for acks.
KafkaProducer
creates a new ProducerMetrics (with the metrics).
KafkaProducer
starts the kafka-producer-network-thread daemon thread of execution for the sender.
KafkaProducer
requests the ProducerConfig
to logUnused.
KafkaProducer
registers the AppInfo MBean (with kafka.producer
JMX prefix, the clientId and the metrics).
In the end, KafkaProducer
prints out the following DEBUG message to the logs:
Kafka producer started
In case of any errors, KafkaProducer
closes itself with 0
millis timeout and throws a KafkaException
:
Failed to construct kafka producer
Computing Partition For ProducerRecord — partition
Internal Method
int partition(
ProducerRecord<K, V> record,
byte[] serializedKey,
byte[] serializedValue,
Cluster cluster)
partition
requests the ProducerRecord
for the partition and return it when specified. Otherwise, partition
requests the Partitioner for the partition.
Note
|
partition is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka cluster asynchronously.
|
beginTransaction
Method
void beginTransaction() throws ProducerFencedException
Note
|
beginTransaction is part of the Producer Contract to…FIXME.
|
beginTransaction
simply requests the internal TransactionManager to beginTransaction.
beginTransaction
throws an IllegalStateException
when the TransactionManager is undefined (null
).
Cannot use transactional methods without enabling transactions by setting the transactional.id configuration property
configureTransactionState
Static Internal Method
TransactionManager configureTransactionState(
ProducerConfig config,
LogContext logContext,
Logger log)
configureTransactionState
…FIXME
Note
|
configureTransactionState is used exclusively when KafkaProducer is created (and initializes a TransactionManager).
|
Closing Kafka Producer — close
Method
void close() (1)
void close(Duration timeout) (2)
// private API
void close(Duration timeout, boolean swallowException)
-
Uses the maximum timeout (
Long.MAX_VALUE
) -
Disables
swallowException
flag (false
)
Note
|
close is a part of Producer Contract.
|
close
…FIXME
Flushing Accumulated Records — flush
Method
void flush()
Note
|
flush is a part of Producer Contract.
|
flush
…FIXME
initTransactions
Method
void initTransactions()
Note
|
initTransactions is a part of Producer Contract.
|
initTransactions
…FIXME
sendOffsetsToTransaction
Method
void sendOffsetsToTransaction(
Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId)
throws ProducerFencedException
Note
|
sendOffsetsToTransaction is a part of Producer Contract.
|
sendOffsetsToTransaction
…FIXME
commitTransaction
Method
void commitTransaction()
throws ProducerFencedException
Note
|
commitTransaction is a part of Producer Contract.
|
commitTransaction
…FIXME
abortTransaction
Method
void abortTransaction()
throws ProducerFencedException
Note
|
abortTransaction is a part of Producer Contract.
|
abortTransaction
…FIXME
newSender
Method
Sender newSender(
LogContext logContext,
KafkaClient kafkaClient,
ProducerMetadata metadata)
newSender
…FIXME
Note
|
newSender is used when KafkaProducer is created.
|
Internal Properties
Name | Description | ||
---|---|---|---|
|
|||
|
Client ID per CLIENT_ID_CONFIG (if defined) or Used when:
|
||
|
ProducerInterceptors that are notified (about successes and failures) when The |
||
|
kafka-producer-network-thread daemon thread of execution for the sender |
||
|
Time
|
||
|
|
||
|
Partitioner that is used (to compute the partition for a record) when The |
||
|
|||
|
Kafka producer I/O thread (aka |
||
|
|||
|
Used when:
|