RecordAccumulator

RecordAccumulator is created exclusively when KafkaProducer is created.

KafkaProducer uses the following configuration properties to create a RecordAccumulator:

Tip

Enable ALL logging level for org.apache.kafka.clients.producer.internals.RecordAccumulator logger to see what happens inside.

Add the following line to config/tools-log4j.properties:

log4j.logger.org.apache.kafka.clients.producer.internals.RecordAccumulator=ALL

Refer to Logging.

Creating RecordAccumulator Instance

RecordAccumulator takes the following to be created:

RecordAccumulator initializes the internal properties.

When created, RecordAccumulator registerMetrics (with the Metrics and the metric group name).

registerMetrics Method

void registerMetrics()

registerMetrics…​FIXME

Note
registerMetrics is used exclusively when RecordAccumulator is created.

close Method

void close()

close…​FIXME

Note
close is used when…​FIXME

append Method

RecordAppendResult append(
  TopicPartition tp,
  long timestamp,
  byte[] key,
  byte[] value,
  Header[] headers,
  Callback callback,
  long maxTimeToBlock) throws InterruptedException

append tryAppend and returns the RecordAppendResult if available (not null).

append prints out the following TRACE message to the logs:

Allocating a new [size] byte message buffer for topic [topic] partition [partition]
Note
The size in the above TRACE message is controlled by batch.size producer configuration property (default: 16384 which is 16 * 1024).

append once again tryAppend and returns the RecordAppendResult if available (which they say "should not happen often").

append creates a new MemoryRecordsBuilder and a new ProducerBatch (for the given TopicPartition and the new MemoryRecordsBuilder).

append requests the ProducerBatch to tryAppend (that gives a FutureRecordMetadata).

append adds the ProducerBatch to the Deque<ProducerBatch> and the IncompleteBatches internal registry.

In the end, append creates a new RecordAppendResult (with the FutureRecordMetadata).

Note
append is used exclusively when KafkaProducer is requested to send a ProducerRecord to a Kafka Cluster asynchronously.

tryAppend Internal Method

RecordAppendResult tryAppend(
  long timestamp,
  byte[] key,
  byte[] value,
  Header[] headers,
  Callback callback,
  Deque<ProducerBatch> deque)

tryAppend…​FIXME

Note
tryAppend is used exclusively when RecordAccumulator is requested to append.

Creating MemoryRecordsBuilder Instance (With TimestampType.CREATE_TIME) — recordsBuilder Internal Method

MemoryRecordsBuilder recordsBuilder(
  ByteBuffer buffer,
  byte maxUsableMagic)

recordsBuilder simply builds a new MemoryRecordsBuilder (with TimestampType.CREATE_TIME).

recordsBuilder throws an UnsupportedVersionException when the TransactionManager is defined and the maxUsableMagic magic number is lower than 2:

Attempting to use idempotence with a broker which does not support the required message format (v2). The broker must be version 0.11 or later.
Note
recordsBuilder is used exclusively when RecordAccumulator is requested to append.

Aborting Incomplete Batches — abortIncompleteBatches Method

void abortIncompleteBatches()

abortIncompleteBatches…​FIXME

Note
abortIncompleteBatches is used exclusively when Sender is requested to run (and forced to close while shutting down).

reenqueue Method

void reenqueue(
  ProducerBatch batch,
  long now)

reenqueue…​FIXME

Note
reenqueue is used exclusively when Sender is requested to reenqueueBatch.

splitAndReenqueue Method

int splitAndReenqueue(ProducerBatch bigBatch)

splitAndReenqueue…​FIXME

Note
splitAndReenqueue is used exclusively when Sender is requested to completeBatch.

Looking Up Or Creating New Deque Of ProducerBatches For TopicPartition — getOrCreateDeque Internal Method

Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp)

getOrCreateDeque…​FIXME

Note
getOrCreateDeque is used when RecordAccumulator is requested to append, reenqueue, and splitAndReenqueue.

expiredBatches Method

List<ProducerBatch> expiredBatches(long now)

expiredBatches…​FIXME

Note
expiredBatches is used exclusively when Sender is requested to sendProducerData.

ready Method

ReadyCheckResult ready(
  Cluster cluster,
  long nowMs)

ready…​FIXME

Note
ready is used exclusively when Sender is requested to sendProducerData (when requested to run a single iteration of sending).

drain Method

Map<Integer, List<ProducerBatch>> drain(
  Cluster cluster,
  Set<Node> nodes,
  int maxSize,
  long now)

drain…​FIXME

Note
drain is used exclusively when Sender is requested to sendProducerData (when requested to run a single iteration of sending).

drainBatchesForOneNode Internal Method

List<ProducerBatch> drainBatchesForOneNode(
  Cluster cluster,
  Node node,
  int maxSize,
  long now)

drainBatchesForOneNode…​FIXME

Note
drainBatchesForOneNode is used exclusively when RecordAccumulator is requested to drain.

Internal Properties

Name Description

appendsInProgress

  • Starts at 0 when RecordAccumulator is created

  • Increments and decrements when RecordAccumulator is requested to append (just after it is requested and right before it finishes)

Used exclusively when RecordAccumulator is requested to abort incomplete batches

batches

ProducerBatches per TopicPartition (ConcurrentMap<TopicPartition, Deque<ProducerBatch>>)

incomplete

IncompleteBatches

results matching ""

    No results matching ""