Sender — Kafka Producer Network Thread

Sender is a Kafka producer network thread (of execution) that runs indefinitely and sends ProduceRequests to a Kafka cluster.

Figure 1. Sender and KafkaProducer

Sender is created when KafkaProducer is created and immediately started under the name kafka-producer-network-thread | [clientId] (as a daemon thread).

Every runOnce, Sender takes (drains) up to max.request.size messages from the record accumulator (that batches records) and eventually sendProduceRequests to every active Kafka broker for which there is at least one ProducerBatch.

KafkaProducer uses the following configuration properties to create a Sender:


Enable ALL logging level for org.apache.kafka.clients.producer.internals.Sender logger to see what happens inside.

Add the following line to config/

Refer to Logging.

Creating Sender Instance

Sender takes the following to be created:

Sender initializes the internal properties.

Starting Thread of Execution — run Method

void run()
run is a part of java.lang.Runnable that is executed when the thread is started.

run first prints out the following DEBUG message to the logs:

Starting Kafka producer I/O thread.

run keeps running until the running internal flag is off.

Once the running internal flag is off, run prints out the following DEBUG message to the logs:

Beginning shutdown of Kafka producer I/O thread, sending remaining records.

run may wait until…​FIXME ((!forceClose…​)

In the end, run requests the KafkaClient to close and prints out the following DEBUG message to the logs:

Shutdown of Kafka producer I/O thread has completed.

Running Single Iteration of Sending — runOnce Method

void runOnce()

runOnce simply sendProducerData followed by requesting the KafkaClient to poll.

With a transactionManager assigned, runOnce…​FIXME

run is used exclusively when Sender is requested to run (as a thread of execution).

sendProducerData Internal Method

long sendProducerData(long now)

sendProducerData…​FIXME (guaranteeMessageOrder)

sendProducerData requests the SenderMetrics to updateProduceRequestMetrics.


In the end, sendProducerData sendProduceRequests (for the batches).

sendProducerData is used exclusively when Sender is requested to run a single iteration of sending.

Sending ProduceRequests — sendProduceRequests Internal Method

void sendProduceRequests(
  Map<Integer, List<ProducerBatch>> collated,
  long now)

sendProduceRequests simply goes over the given collated collection (of ProducerBatches per partition ID) and sendProduceRequest for every pair.

sendProduceRequests is used exclusively when Sender is requested to sendProducerData.

Sending ProduceRequest to Broker — sendProduceRequest Internal Method

void sendProduceRequest(
  long now,
  int destination,
  short acks,
  int timeout,
  List<ProducerBatch> batches)


sendProduceRequest requests the ProduceRequest.Builder to create a ProduceRequest.Builder instance (for a given magic number).

sendProduceRequest creates a new RequestCompletionHandler that handleProduceResponse when a request is complete.

sendProduceRequest creates a new ClientRequest with the expectResponse flag on when acks argument is non-0.

sendProduceRequest requests the KafkaClient to send the ClientRequest.

In the end, sendProduceRequest prints out the following TRACE message to the logs:

Sent produce request to [nodeId]: [requestBuilder]
sendProduceRequest is used exclusively when Sender is requested to sendProduceRequests.

Handling ProduceResponse — handleProduceResponse Internal Method

void handleProduceResponse(
  ClientResponse response,
  Map<TopicPartition, ProducerBatch> batches,
  long now)


handleProduceResponse is used exclusively when Sender is requested to send a ProduceRequest.

completeBatch Internal Method

void completeBatch(
  ProducerBatch batch,
  ProduceResponse.PartitionResponse response,
  long correlationId,
  long now,
  long throttleUntilTimeMs)


completeBatch is used exclusively when Sender is requested to handle a ProduceResponse.

maybeSendAndPollTransactionalRequest Internal Method

boolean maybeSendAndPollTransactionalRequest()


maybeSendAndPollTransactionalRequest is used when Sender is requested to run a single iteration of sending.

addToInflightBatches Method

void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches)


addToInflightBatches is used exclusively when Sender is requested to sendProducerData.

maybeSendTransactionalRequest Internal Method

boolean maybeSendTransactionalRequest(long now)


maybeSendTransactionalRequest is used exclusively when Sender is running.

maybeWaitForProducerId Internal Method

void maybeWaitForProducerId()


maybeWaitForProducerId is used exclusively when Sender is running.

awaitLeastLoadedNodeReady Internal Method

Node awaitLeastLoadedNodeReady(long remainingTimeMs)


awaitLeastLoadedNodeReady is used when Sender is requsted to maybeSendTransactionalRequest and maybeWaitForProducerId.

initiateClose Method

void initiateClose()

initiateClose requests the RecordAccumulator to close.

In the end, initiateClose turns the running internal flag off followed by waking up the Kafka client.


initiateClose is used when:

wakeup Method

void wakeup()

wakeup merely requests the KafkaClient to wakeup.


wakeup is used when:

forceClose Method

void forceClose()


forceClose is used exclusively when KafkaProducer is requested to close.

reenqueueBatch Internal Method

void reenqueueBatch(
  ProducerBatch batch,
  long currentTimeMs)


reenqueueBatch is used when…​FIXME

Internal Properties

Name Description


Flag that controls whether run should stop (false) or not (true)

  • Enabled (true) by default when Sender is created

  • Disabled (false) when Sender is requested to initiateClose

results matching ""

    No results matching ""