StampedRecord — Orderable Kafka ConsumerRecords At Timestamp

StampedRecord is a Stamped with Kafka ConsumerRecords (as values).

In other words, StampedRecord represents a Kafka ConsumerRecord at a given timestamp and can be ordered in ascending order.

StampedRecord is created when:

StampedRecord takes the following when created:

StampedRecord gives access to the properties of a ConsumerRecord:

  • topic

  • partition

  • key

  • value

  • offset

The text representation of a StampedRecord is of the format value, timestamp = [timestamp].

import org.apache.kafka.streams.processor.internals.StampedRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecord.{NULL_CHECKSUM, NULL_SIZE}
import org.apache.kafka.common.record.TimestampType

val timestamp1 = 1L
val cr1 = new ConsumerRecord("topic", 0, 0L,
  timestamp1,
  TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, 0, 0)
  .asInstanceOf[ConsumerRecord[Object, Object]] // wish there were a better way
val sr1 = new StampedRecord(cr1, timestamp1)

val timestamp2 = 2L
val cr2 = new ConsumerRecord("topic", 0, 0L,
  timestamp2,
  TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, 0, 0)
  .asInstanceOf[ConsumerRecord[Object, Object]] // wish there were a better way
val sr2 = new StampedRecord(cr2, timestamp2)

val timestamp3 = 3L
val cr3 = new ConsumerRecord("topic", 0, 0L,
  timestamp3,
  TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, 0, 0)
  .asInstanceOf[ConsumerRecord[Object, Object]] // wish there were a better way
val sr3 = new StampedRecord(cr3, timestamp3)

// Adding the stampeds in a random order
// TreeSet is a concrete SortedSet
import java.util.TreeSet
import collection.JavaConverters._
val srs = new TreeSet(Seq(sr3, sr1, sr2).asJava)

assert(srs.asScala == Set(sr1, sr2, sr3))

results matching ""

    No results matching ""