Sender — Kafka Producer I/O Thread

Sender is a Kafka producer I/O thread (of execution) that runs indefinitely and sends ProduceRequests to a Kafka cluster.

Sender.png
Figure 1. Sender and KafkaProducer

Sender is created exclusively when KafkaProducer is created and immediatelly started under the name kafka-producer-network-thread | [clientId] (as a daemon thread).

Every runOnce, Sender takes (drains) up to max.request.size messages from the record accumulator (that batches records) and eventually sendProduceRequests to every active Kafka broker for which there is at least one ProducerBatch.

KafkaProducer uses the following configuration properties to create a Sender:

Tip

Enable ALL logging level for org.apache.kafka.clients.producer.internals.Sender logger to see what happens inside.

Add the following line to config/tools-log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.internals.Sender=ALL

Refer to Logging.

Creating Sender Instance

Sender takes the following to be created:

Sender initializes the internal properties.

Starting Thread of Execution — run Method (of Java’s Runnable)

void run()
Note
run is a part of java.lang.Runnable that is executed when the thread is started.

run first prints out the following DEBUG message to the logs:

Starting Kafka producer I/O thread.

run keeps running until the running internal flag is off.

Once the running internal flag is off, run prints out the following DEBUG message to the logs:

Beginning shutdown of Kafka producer I/O thread, sending remaining records.

run may wait until…​FIXME ((!forceClose…​)

In the end, run requests the KafkaClient to close and prints out the following DEBUG message to the logs:

Shutdown of Kafka producer I/O thread has completed.

Running Single Iteration of Sending — runOnce Method

void runOnce()

runOnce simply sendProducerData followed by requesting the KafkaClient to poll.

With a transactionManager assigned, runOnce…​FIXME

Note
run is used exclusively when Sender is requested to run (as a thread of execution).

sendProducerData Internal Method

long sendProducerData(long now)

sendProducerData…​FIXME (guaranteeMessageOrder)

sendProducerData requests the SenderMetrics to updateProduceRequestMetrics.

sendProducerData…​FIXME

In the end, sendProducerData sendProduceRequests (for the batches).

Note
sendProducerData is used exclusively when Sender is requested to run a single iteration of sending.

Sending ProduceRequests — sendProduceRequests Internal Method

void sendProduceRequests(
  Map<Integer, List<ProducerBatch>> collated,
  long now)

sendProduceRequests simply goes over the given collated collection (of ProducerBatches per partition ID) and sendProduceRequest for every pair.

Note
sendProduceRequests is used exclusively when Sender is requested to sendProducerData.

Sending ProduceRequest to Broker — sendProduceRequest Internal Method

void sendProduceRequest(
  long now,
  int destination,
  short acks,
  int timeout,
  List<ProducerBatch> batches)

sendProduceRequest…​FIXME

sendProduceRequest requests the ProduceRequest.Builder to create a ProduceRequest.Builder instance (for a given magic number).

sendProduceRequest creates a new RequestCompletionHandler that handleProduceResponse when a request is complete.

Note
sendProduceRequest creates a new ClientRequest with the expectResponse flag on when acks argument is non-0.

sendProduceRequest requests the KafkaClient to send the ClientRequest.

In the end, sendProduceRequest prints out the following TRACE message to the logs:

Sent produce request to [nodeId]: [requestBuilder]
Note
sendProduceRequest is used exclusively when Sender is requested to sendProduceRequests.

Handling ProduceResponse — handleProduceResponse Internal Method

void handleProduceResponse(
  ClientResponse response,
  Map<TopicPartition, ProducerBatch> batches,
  long now)

handleProduceResponse…​FIXME

Note
handleProduceResponse is used exclusively when Sender is requested to send a ProduceRequest.

completeBatch Internal Method

void completeBatch(
  ProducerBatch batch,
  ProduceResponse.PartitionResponse response,
  long correlationId,
  long now,
  long throttleUntilTimeMs)

completeBatch…​FIXME

Note
completeBatch is used exclusively when Sender is requested to handle a ProduceResponse.

addToInflightBatches Method

void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches)

addToInflightBatches…​FIXME

Note
addToInflightBatches is used exclusively when Sender is requested to sendProducerData.

maybeSendTransactionalRequest Internal Method

boolean maybeSendTransactionalRequest(long now)

maybeSendTransactionalRequest…​FIXME

Note
maybeSendTransactionalRequest is used exclusively when Sender is running.

maybeWaitForProducerId Internal Method

void maybeWaitForProducerId()

maybeWaitForProducerId…​FIXME

Note
maybeWaitForProducerId is used exclusively when Sender is running.

awaitLeastLoadedNodeReady Internal Method

Node awaitLeastLoadedNodeReady(long remainingTimeMs)

awaitLeastLoadedNodeReady…​FIXME

Note
awaitLeastLoadedNodeReady is used when Sender is requsted to maybeSendTransactionalRequest and maybeWaitForProducerId.

initiateClose Method

void initiateClose()

initiateClose requests the RecordAccumulator to close.

In the end, initiateClose turns the running internal flag off followed by waking up the Kafka client.

Note

initiateClose is used when:

wakeup Method

void wakeup()

wakeup merely requests the KafkaClient to wakeup.

Note

wakeup is used when:

forceClose Method

void forceClose()

forceClose…​FIXME

Note
forceClose is used exclusively when KafkaProducer is requested to close.

reenqueueBatch Internal Method

void reenqueueBatch(
  ProducerBatch batch,
  long currentTimeMs)

reenqueueBatch…​FIXME

Note
reenqueueBatch is used when…​FIXME

Internal Properties

Name Description

running

Flag that controls whether run should stop (false) or not (true)

  • Enabled (true) by default when Sender is created

  • Disabled (false) when Sender is requested to initiateClose

results matching ""

    No results matching ""