RecordQueue — FIFO Queue of Buffered Stamped Records

RecordQueue is a FIFO queue of StampedRecords from a Kafka partition (associated with a SourceNode).

RecordQueue is created along with a StreamTask exclusively (for every partition assigned).

kafka streams RecordQueue.png
Figure 1. RecordQueue, StreamTask and TaskCreator

RecordQueue acts as a buffer of Kafka ConsumerRecords.

Note
StreamTask uses a PartitionGroup to manage the RecordQueues per partition assigned.

RecordQueue defines the UNKNOWN constant to be ConsumerRecord.NO_TIMESTAMP.

Tip

Enable ALL logging level for org.apache.kafka.streams.processor.internals.RecordQueue logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.streams.processor.internals.RecordQueue=ALL

Creating RecordQueue Instance

RecordQueue takes the following to be created:

RecordQueue initializes the internal properties.

Adding Kafka ConsumerRecords (as StampedRecords) — addRawRecords Method

int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords)

For every Kafka ConsumerRecord in the input rawRecords, addRawRecords does the following:

  1. Requests the RecordDeserializer to deserialize the record (with the ProcessorContext)

  2. FIXME

  3. Creates a StampedRecord for the record and the record timestamp

  4. Inserts the StampedRecord at the end of the fifoQueue

  5. FIXME

While processing ConsumerRecords, addRawRecords prints out the following TRACE message to the logs:

Source node [name] extracted timestamp [timestamp] for record [record]

With all ConsumerRecords processed, addRawRecords…​FIXME

In the end, addRawRecords returns the number of ConsumerRecords in the queue.

Note

addRawRecords skips (drops) Kafka ConsumerRecords when either condition holds:

  • RecordDeserializer could not deserialize and the DeserializationExceptionHandler is not set to fail upon a deserialization error

  • Extracted timestamp is negative (and hence invalid)

Note
addRawRecords is used exclusively when PartitionGroup is requested to add records to a RecordQueue for a Kafka partition.

Clearing (Resetting) RecordQueue — clear Method

void clear()

clear…​FIXME

In the end, clear (re)sets the partitionTime to NOT_KNOWN.

Note
clear is used exclusively when PartitionGroup is requested to clear.

Requesting Number of Records in Queue — size Method

int size()

size simply returns the number of records in the fifoQueue.

Note

size is used when:

Internal Properties

Table 1. RecordQueue’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

fifoQueue

Java’s java.util.ArrayDeque of StampedRecords (i.e. orderable Kafka ConsumerRecords with a timestamp)

Used when…​FIXME

partitionTime

recordDeserializer

RecordDeserializer (for the SourceNode and DeserializationExceptionHandler)

Used when…​FIXME

results matching ""

    No results matching ""