log4j.logger.org.apache.kafka.clients.producer.internals.RecordAccumulator=ALL
RecordAccumulator
KafkaProducer
uses the following configuration properties to create a RecordAccumulator
:
-
batch.size for the batchSize
-
retry.backoff.ms for the retryBackoffMs
Tip
|
Enable Add the following line to Refer to Logging. |
Creating RecordAccumulator Instance
RecordAccumulator
takes the following to be created:
-
Metric group name (e.g. producer-metrics)
RecordAccumulator
initializes the internal properties.
When created, RecordAccumulator
registerMetrics (with the Metrics and the metric group name).
registerMetrics
Method
void registerMetrics()
registerMetrics
…FIXME
Note
|
registerMetrics is used exclusively when RecordAccumulator is created.
|
append
Method
RecordAppendResult append(
TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException
append
tryAppend and returns the RecordAppendResult
if available (not null
).
append
prints out the following TRACE message to the logs:
Allocating a new [size] byte message buffer for topic [topic] partition [partition]
Note
|
The size in the above TRACE message is controlled by batch.size producer configuration property (default: 16384 which is 16 * 1024 ).
|
append
once again tryAppend and returns the RecordAppendResult
if available (which they say "should not happen often").
append
creates a new MemoryRecordsBuilder and a new ProducerBatch (for the given TopicPartition
and the new MemoryRecordsBuilder
).
append
requests the ProducerBatch
to tryAppend (that gives a FutureRecordMetadata
).
append
adds the ProducerBatch
to the Deque<ProducerBatch>
and the IncompleteBatches internal registry.
In the end, append
creates a new RecordAppendResult
(with the FutureRecordMetadata
).
Note
|
append is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka Cluster asynchronously.
|
tryAppend
Internal Method
RecordAppendResult tryAppend(
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
Deque<ProducerBatch> deque)
tryAppend
…FIXME
Note
|
tryAppend is used exclusively when RecordAccumulator is requested to append.
|
Creating MemoryRecordsBuilder Instance (With TimestampType.CREATE_TIME) — recordsBuilder
Internal Method
MemoryRecordsBuilder recordsBuilder(
ByteBuffer buffer,
byte maxUsableMagic)
recordsBuilder
simply builds a new MemoryRecordsBuilder (with TimestampType.CREATE_TIME
).
recordsBuilder
throws an UnsupportedVersionException
when the TransactionManager is defined and the maxUsableMagic
magic number is lower than 2
:
Attempting to use idempotence with a broker which does not support the required message format (v2). The broker must be version 0.11 or later.
Note
|
recordsBuilder is used exclusively when RecordAccumulator is requested to append.
|
Aborting Incomplete Batches — abortIncompleteBatches
Method
void abortIncompleteBatches()
abortIncompleteBatches
…FIXME
Note
|
abortIncompleteBatches is used exclusively when Sender is requested to run (and forced to close while shutting down).
|
reenqueue
Method
void reenqueue(
ProducerBatch batch,
long now)
reenqueue
…FIXME
Note
|
reenqueue is used exclusively when Sender is requested to reenqueueBatch.
|
splitAndReenqueue
Method
int splitAndReenqueue(ProducerBatch bigBatch)
splitAndReenqueue
…FIXME
Note
|
splitAndReenqueue is used exclusively when Sender is requested to completeBatch.
|
Looking Up Or Creating New Deque Of ProducerBatches For TopicPartition — getOrCreateDeque
Internal Method
Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp)
getOrCreateDeque
…FIXME
Note
|
getOrCreateDeque is used when RecordAccumulator is requested to append, reenqueue, and splitAndReenqueue.
|
expiredBatches
Method
List<ProducerBatch> expiredBatches(long now)
expiredBatches
…FIXME
Note
|
expiredBatches is used exclusively when Sender is requested to sendProducerData.
|
ready
Method
ReadyCheckResult ready(
Cluster cluster,
long nowMs)
ready
…FIXME
Note
|
ready is used exclusively when Sender is requested to sendProducerData (when requested to run a single iteration of sending).
|
drain
Method
Map<Integer, List<ProducerBatch>> drain(
Cluster cluster,
Set<Node> nodes,
int maxSize,
long now)
drain
…FIXME
Note
|
drain is used exclusively when Sender is requested to sendProducerData (when requested to run a single iteration of sending).
|
drainBatchesForOneNode
Internal Method
List<ProducerBatch> drainBatchesForOneNode(
Cluster cluster,
Node node,
int maxSize,
long now)
drainBatchesForOneNode
…FIXME
Note
|
drainBatchesForOneNode is used exclusively when RecordAccumulator is requested to drain.
|
Internal Properties
Name | Description |
---|---|
|
java.util.concurrent.atomic.AtomicInteger to keep track of the number of appending threads Used exclusively when |
|
ProducerBatches per |
|