Sender — Kafka Producer Network Thread
Sender is a Kafka producer network thread (of execution) that runs indefinitely and sends ProduceRequests to a Kafka cluster.
Sender is created when KafkaProducer is created and immediately started under the name kafka-producer-network-thread | [clientId] (as a daemon thread).
Every runOnce, Sender takes (drains) up to max.request.size messages from the record accumulator (that batches records) and eventually sendProduceRequests to every active Kafka broker for which there is at least one ProducerBatch.
KafkaProducer uses the following configuration properties to create a Sender:
-
max.in.flight.requests.per.connection for the guaranteeMessageOrder
-
max.request.size for the maxRequestSize
-
request.timeout.ms for the requestTimeoutMs
-
retry.backoff.ms for the retryBackoffMs
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating Sender Instance
Sender takes the following to be created:
Sender initializes the internal properties.
Starting Thread of Execution — run Method
void run()
|
Note
|
run is a part of java.lang.Runnable that is executed when the thread is started.
|
run first prints out the following DEBUG message to the logs:
Starting Kafka producer I/O thread.
Once the running internal flag is off, run prints out the following DEBUG message to the logs:
Beginning shutdown of Kafka producer I/O thread, sending remaining records.
run may wait until…FIXME ((!forceClose…)
In the end, run requests the KafkaClient to close and prints out the following DEBUG message to the logs:
Shutdown of Kafka producer I/O thread has completed.
Running Single Iteration of Sending — runOnce Method
void runOnce()
runOnce simply sendProducerData followed by requesting the KafkaClient to poll.
With a transactionManager assigned, runOnce…FIXME
|
Note
|
run is used exclusively when Sender is requested to run (as a thread of execution).
|
sendProducerData Internal Method
long sendProducerData(long now)
sendProducerData requests the ProducerMetadata to get the current cluster metadata (without blocking).
sendProducerData requests the RecordAccumulator to get the current cluster metadata (without blocking).
sendProducerData requests the RecordAccumulator to drain (create ProducerBatches per TopicPartition for ready brokers).
sendProducerData adds the batches to inflight batches.
sendProducerData…FIXME (guaranteeMessageOrder)
sendProducerData requests the SenderMetrics to updateProduceRequestMetrics.
sendProducerData…FIXME
In the end, sendProducerData sendProduceRequests (for the batches).
|
Note
|
sendProducerData is used exclusively when Sender is requested to run a single iteration of sending.
|
Sending ProduceRequests — sendProduceRequests Internal Method
void sendProduceRequests(
Map<Integer, List<ProducerBatch>> collated,
long now)
sendProduceRequests simply goes over the given collated collection (of ProducerBatches per partition ID) and sendProduceRequest for every pair.
|
Note
|
sendProduceRequests is used exclusively when Sender is requested to sendProducerData.
|
Sending ProduceRequest to Broker — sendProduceRequest Internal Method
void sendProduceRequest(
long now,
int destination,
short acks,
int timeout,
List<ProducerBatch> batches)
sendProduceRequest…FIXME
sendProduceRequest requests the ProduceRequest.Builder to create a ProduceRequest.Builder instance (for a given magic number).
sendProduceRequest creates a new RequestCompletionHandler that handleProduceResponse when a request is complete.
sendProduceRequest requests the KafkaClient to create a new ClientRequest (for the ProduceRequest.Builder and RequestCompletionHandler).
|
Note
|
sendProduceRequest creates a new ClientRequest with the expectResponse flag on when acks argument is non-0.
|
sendProduceRequest requests the KafkaClient to send the ClientRequest.
In the end, sendProduceRequest prints out the following TRACE message to the logs:
Sent produce request to [nodeId]: [requestBuilder]
|
Note
|
sendProduceRequest is used exclusively when Sender is requested to sendProduceRequests.
|
Handling ProduceResponse — handleProduceResponse Internal Method
void handleProduceResponse(
ClientResponse response,
Map<TopicPartition, ProducerBatch> batches,
long now)
handleProduceResponse…FIXME
|
Note
|
handleProduceResponse is used exclusively when Sender is requested to send a ProduceRequest.
|
completeBatch Internal Method
void completeBatch(
ProducerBatch batch,
ProduceResponse.PartitionResponse response,
long correlationId,
long now,
long throttleUntilTimeMs)
completeBatch…FIXME
|
Note
|
completeBatch is used exclusively when Sender is requested to handle a ProduceResponse.
|
maybeSendAndPollTransactionalRequest Internal Method
boolean maybeSendAndPollTransactionalRequest()
maybeSendAndPollTransactionalRequest…FIXME
|
Note
|
maybeSendAndPollTransactionalRequest is used when Sender is requested to run a single iteration of sending.
|
addToInflightBatches Method
void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches)
addToInflightBatches…FIXME
|
Note
|
addToInflightBatches is used exclusively when Sender is requested to sendProducerData.
|
maybeSendTransactionalRequest Internal Method
boolean maybeSendTransactionalRequest(long now)
maybeSendTransactionalRequest…FIXME
|
Note
|
maybeSendTransactionalRequest is used exclusively when Sender is running.
|
maybeWaitForProducerId Internal Method
void maybeWaitForProducerId()
maybeWaitForProducerId…FIXME
|
Note
|
maybeWaitForProducerId is used exclusively when Sender is running.
|
awaitLeastLoadedNodeReady Internal Method
Node awaitLeastLoadedNodeReady(long remainingTimeMs)
awaitLeastLoadedNodeReady…FIXME
|
Note
|
awaitLeastLoadedNodeReady is used when Sender is requsted to maybeSendTransactionalRequest and maybeWaitForProducerId.
|
initiateClose Method
void initiateClose()
initiateClose requests the RecordAccumulator to close.
In the end, initiateClose turns the running internal flag off followed by waking up the Kafka client.
|
Note
|
|
wakeup Method
void wakeup()
wakeup merely requests the KafkaClient to wakeup.
|
Note
|
|
forceClose Method
void forceClose()
forceClose…FIXME
|
Note
|
forceClose is used exclusively when KafkaProducer is requested to close.
|
reenqueueBatch Internal Method
void reenqueueBatch(
ProducerBatch batch,
long currentTimeMs)
reenqueueBatch…FIXME
|
Note
|
reenqueueBatch is used when…FIXME
|
Internal Properties
| Name | Description |
|---|---|
|
Flag that controls whether run should stop (
|