Sender — Kafka Producer Network Thread
Sender
is a Kafka producer network thread (of execution) that runs indefinitely and sends ProduceRequests to a Kafka cluster.
Sender
is created when KafkaProducer
is created and immediately started under the name kafka-producer-network-thread | [clientId] (as a daemon thread).
Every runOnce, Sender
takes (drains) up to max.request.size messages from the record accumulator (that batches records) and eventually sendProduceRequests to every active Kafka broker for which there is at least one ProducerBatch.
KafkaProducer
uses the following configuration properties to create a Sender
:
-
max.in.flight.requests.per.connection for the guaranteeMessageOrder
-
max.request.size for the maxRequestSize
-
request.timeout.ms for the requestTimeoutMs
-
retry.backoff.ms for the retryBackoffMs
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating Sender Instance
Sender
takes the following to be created:
Sender
initializes the internal properties.
Starting Thread of Execution — run
Method
void run()
Note
|
run is a part of java.lang.Runnable that is executed when the thread is started.
|
run
first prints out the following DEBUG message to the logs:
Starting Kafka producer I/O thread.
Once the running internal flag is off, run
prints out the following DEBUG message to the logs:
Beginning shutdown of Kafka producer I/O thread, sending remaining records.
run
may wait until…FIXME ((!forceClose…
)
In the end, run
requests the KafkaClient to close and prints out the following DEBUG message to the logs:
Shutdown of Kafka producer I/O thread has completed.
Running Single Iteration of Sending — runOnce
Method
void runOnce()
runOnce
simply sendProducerData followed by requesting the KafkaClient to poll.
With a transactionManager assigned, runOnce
…FIXME
Note
|
run is used exclusively when Sender is requested to run (as a thread of execution).
|
sendProducerData
Internal Method
long sendProducerData(long now)
sendProducerData
requests the ProducerMetadata to get the current cluster metadata (without blocking).
sendProducerData
requests the RecordAccumulator to get the current cluster metadata (without blocking).
sendProducerData
requests the RecordAccumulator to drain (create ProducerBatches per TopicPartition for ready brokers).
sendProducerData
adds the batches to inflight batches.
sendProducerData
…FIXME (guaranteeMessageOrder
)
sendProducerData
requests the SenderMetrics to updateProduceRequestMetrics.
sendProducerData
…FIXME
In the end, sendProducerData
sendProduceRequests (for the batches).
Note
|
sendProducerData is used exclusively when Sender is requested to run a single iteration of sending.
|
Sending ProduceRequests — sendProduceRequests
Internal Method
void sendProduceRequests(
Map<Integer, List<ProducerBatch>> collated,
long now)
sendProduceRequests
simply goes over the given collated
collection (of ProducerBatches per partition ID) and sendProduceRequest for every pair.
Note
|
sendProduceRequests is used exclusively when Sender is requested to sendProducerData.
|
Sending ProduceRequest to Broker — sendProduceRequest
Internal Method
void sendProduceRequest(
long now,
int destination,
short acks,
int timeout,
List<ProducerBatch> batches)
sendProduceRequest
…FIXME
sendProduceRequest
requests the ProduceRequest.Builder
to create a ProduceRequest.Builder instance (for a given magic number).
sendProduceRequest
creates a new RequestCompletionHandler that handleProduceResponse when a request is complete.
sendProduceRequest
requests the KafkaClient to create a new ClientRequest (for the ProduceRequest.Builder and RequestCompletionHandler).
Note
|
sendProduceRequest creates a new ClientRequest with the expectResponse flag on when acks argument is non-0 .
|
sendProduceRequest
requests the KafkaClient to send the ClientRequest.
In the end, sendProduceRequest
prints out the following TRACE message to the logs:
Sent produce request to [nodeId]: [requestBuilder]
Note
|
sendProduceRequest is used exclusively when Sender is requested to sendProduceRequests.
|
Handling ProduceResponse — handleProduceResponse
Internal Method
void handleProduceResponse(
ClientResponse response,
Map<TopicPartition, ProducerBatch> batches,
long now)
handleProduceResponse
…FIXME
Note
|
handleProduceResponse is used exclusively when Sender is requested to send a ProduceRequest.
|
completeBatch
Internal Method
void completeBatch(
ProducerBatch batch,
ProduceResponse.PartitionResponse response,
long correlationId,
long now,
long throttleUntilTimeMs)
completeBatch
…FIXME
Note
|
completeBatch is used exclusively when Sender is requested to handle a ProduceResponse.
|
maybeSendAndPollTransactionalRequest
Internal Method
boolean maybeSendAndPollTransactionalRequest()
maybeSendAndPollTransactionalRequest
…FIXME
Note
|
maybeSendAndPollTransactionalRequest is used when Sender is requested to run a single iteration of sending.
|
addToInflightBatches
Method
void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches)
addToInflightBatches
…FIXME
Note
|
addToInflightBatches is used exclusively when Sender is requested to sendProducerData.
|
maybeSendTransactionalRequest
Internal Method
boolean maybeSendTransactionalRequest(long now)
maybeSendTransactionalRequest
…FIXME
Note
|
maybeSendTransactionalRequest is used exclusively when Sender is running.
|
maybeWaitForProducerId
Internal Method
void maybeWaitForProducerId()
maybeWaitForProducerId
…FIXME
Note
|
maybeWaitForProducerId is used exclusively when Sender is running.
|
awaitLeastLoadedNodeReady
Internal Method
Node awaitLeastLoadedNodeReady(long remainingTimeMs)
awaitLeastLoadedNodeReady
…FIXME
Note
|
awaitLeastLoadedNodeReady is used when Sender is requsted to maybeSendTransactionalRequest and maybeWaitForProducerId.
|
initiateClose
Method
void initiateClose()
initiateClose
requests the RecordAccumulator to close.
In the end, initiateClose
turns the running internal flag off followed by waking up the Kafka client.
Note
|
|
wakeup
Method
void wakeup()
wakeup
merely requests the KafkaClient to wakeup.
Note
|
|
forceClose
Method
void forceClose()
forceClose
…FIXME
Note
|
forceClose is used exclusively when KafkaProducer is requested to close.
|
reenqueueBatch
Internal Method
void reenqueueBatch(
ProducerBatch batch,
long currentTimeMs)
reenqueueBatch
…FIXME
Note
|
reenqueueBatch is used when…FIXME
|
Internal Properties
Name | Description |
---|---|
|
Flag that controls whether run should stop (
|