PartitionGroup is a collection of RecordQueues (one per every partition assigned to a StreamTask).

PartitionGroup is created exclusively for a StreamTask.

Creating PartitionGroup Instance

PartitionGroup takes the following to be created:

PartitionGroup initializes the internal properties.

Retrieving Next Stamped Record (Record with Timestamp) and RecordQueue — nextRecord Method

StampedRecord nextRecord(final RecordInfo info)


nextRecord is used exclusively when StreamTask is requested to process a single record.

Adding Records to RecordQueue For Given Partition — addRawRecords Method

int addRawRecords(
  final TopicPartition partition,
  final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords)

addRawRecords looks up the RecordQueue for the input Kafka TopicPartition (in the partitionQueues).

addRawRecords requests the RecordQueue for the size and to add Kafka ConsumerRecords (as StampedRecords).

addRawRecords inserts the RecordQueue into the queuesByTime priority queue only if the RecordQueue was empty before.

addRawRecords adds the difference between the old and current sizes to totalBuffered.

In the end, addRawRecords returns the current size of the RecordQueue.

addRawRecords is used exclusively when StreamTask is requested to buffer new records.

clear Method

void clear()

clear removes all of the elements from the queuesByTime and requests every RecordQueue (in the internal partitionQueues registry) to clear itself.

clear is used exclusively when StreamTask is requested to closeTopology.

numBuffered Method

int numBuffered(final TopicPartition partition)


numBuffered is used when…​FIXME

Minimum Partition Timestamp Across All Partitions — timestamp Method

long timestamp()

timestamp simply iterates over the TopicPartitions and requests every RecordQueue for the partition timestamp.

In the end, timestamp returns the oldest partition timestamp (time-wise).

timestamp is used exclusively when StreamTask is requested to maybePunctuateStreamTime

Internal Properties

Name Description


Flag that indicates whether all the RecordQueues have at least one record buffered (true) or not (false)

Default: false

Enabled (true) when the size of the nonEmptyQueuesByTime is exactly the size of the partitionQueues (when requested to add records to the RecordQueue for a given partition)

Disabled (false) when one of the RecordQueues has been drained (when requested for the next record)

Used (as allPartitionsBuffered method) exclusively when StreamTask is requested to isProcessable


Java PriorityQueue that orders RecordQueues per tracked partition timestamp (and the initial capacity as the number of partitions in the partitionQueues) (PriorityQueue<RecordQueue>)

Used when PartitionGroup is requested for the following:

* Next record

* Add records to the RecordQueue for a given partition

Cleared when PartitionGroup is requested to clear


Java PriorityQueue that is an unbounded priority queue that uses partition timestamps for ordering.


Default: UNKNOWN


Number of…​FIXME

Default: 0

results matching ""

    No results matching ""