log4j.logger.org.apache.kafka.clients.producer.internals.RecordAccumulator=ALL
RecordAccumulator
KafkaProducer uses the following configuration properties to create a RecordAccumulator:
-
batch.size for the batchSize
-
retry.backoff.ms for the retryBackoffMs
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating RecordAccumulator Instance
RecordAccumulator takes the following to be created:
-
Metric group name (e.g. producer-metrics)
RecordAccumulator initializes the internal properties.
When created, RecordAccumulator registerMetrics (with the Metrics and the metric group name).
registerMetrics Method
void registerMetrics()
registerMetrics…FIXME
|
Note
|
registerMetrics is used exclusively when RecordAccumulator is created.
|
append Method
RecordAppendResult append(
TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException
append tryAppend and returns the RecordAppendResult if available (not null).
append prints out the following TRACE message to the logs:
Allocating a new [size] byte message buffer for topic [topic] partition [partition]
|
Note
|
The size in the above TRACE message is controlled by batch.size producer configuration property (default: 16384 which is 16 * 1024).
|
append once again tryAppend and returns the RecordAppendResult if available (which they say "should not happen often").
append creates a new MemoryRecordsBuilder and a new ProducerBatch (for the given TopicPartition and the new MemoryRecordsBuilder).
append requests the ProducerBatch to tryAppend (that gives a FutureRecordMetadata).
append adds the ProducerBatch to the Deque<ProducerBatch> and the IncompleteBatches internal registry.
In the end, append creates a new RecordAppendResult (with the FutureRecordMetadata).
|
Note
|
append is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka Cluster asynchronously.
|
tryAppend Internal Method
RecordAppendResult tryAppend(
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
Deque<ProducerBatch> deque)
tryAppend…FIXME
|
Note
|
tryAppend is used exclusively when RecordAccumulator is requested to append.
|
Creating MemoryRecordsBuilder Instance (With TimestampType.CREATE_TIME) — recordsBuilder Internal Method
MemoryRecordsBuilder recordsBuilder(
ByteBuffer buffer,
byte maxUsableMagic)
recordsBuilder simply builds a new MemoryRecordsBuilder (with TimestampType.CREATE_TIME).
recordsBuilder throws an UnsupportedVersionException when the TransactionManager is defined and the maxUsableMagic magic number is lower than 2:
Attempting to use idempotence with a broker which does not support the required message format (v2). The broker must be version 0.11 or later.
|
Note
|
recordsBuilder is used exclusively when RecordAccumulator is requested to append.
|
Aborting Incomplete Batches — abortIncompleteBatches Method
void abortIncompleteBatches()
abortIncompleteBatches…FIXME
|
Note
|
abortIncompleteBatches is used exclusively when Sender is requested to run (and forced to close while shutting down).
|
reenqueue Method
void reenqueue(
ProducerBatch batch,
long now)
reenqueue…FIXME
|
Note
|
reenqueue is used exclusively when Sender is requested to reenqueueBatch.
|
splitAndReenqueue Method
int splitAndReenqueue(ProducerBatch bigBatch)
splitAndReenqueue…FIXME
|
Note
|
splitAndReenqueue is used exclusively when Sender is requested to completeBatch.
|
Looking Up Or Creating New Deque Of ProducerBatches For TopicPartition — getOrCreateDeque Internal Method
Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp)
getOrCreateDeque…FIXME
|
Note
|
getOrCreateDeque is used when RecordAccumulator is requested to append, reenqueue, and splitAndReenqueue.
|
expiredBatches Method
List<ProducerBatch> expiredBatches(long now)
expiredBatches…FIXME
|
Note
|
expiredBatches is used exclusively when Sender is requested to sendProducerData.
|
ready Method
ReadyCheckResult ready(
Cluster cluster,
long nowMs)
ready…FIXME
|
Note
|
ready is used exclusively when Sender is requested to sendProducerData (when requested to run a single iteration of sending).
|
drain Method
Map<Integer, List<ProducerBatch>> drain(
Cluster cluster,
Set<Node> nodes,
int maxSize,
long now)
drain…FIXME
|
Note
|
drain is used exclusively when Sender is requested to sendProducerData (when requested to run a single iteration of sending).
|
drainBatchesForOneNode Internal Method
List<ProducerBatch> drainBatchesForOneNode(
Cluster cluster,
Node node,
int maxSize,
long now)
drainBatchesForOneNode…FIXME
|
Note
|
drainBatchesForOneNode is used exclusively when RecordAccumulator is requested to drain.
|
Internal Properties
| Name | Description |
|---|---|
|
java.util.concurrent.atomic.AtomicInteger to keep track of the number of appending threads Used exclusively when |
|
ProducerBatches per |
|