import org.apache.kafka.streams.processor.internals.StampedRecord
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecord.{NULL_CHECKSUM, NULL_SIZE}
import org.apache.kafka.common.record.TimestampType
val timestamp1 = 1L
val cr1 = new ConsumerRecord("topic", 0, 0L,
timestamp1,
TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, 0, 0)
.asInstanceOf[ConsumerRecord[Object, Object]] // wish there were a better way
val sr1 = new StampedRecord(cr1, timestamp1)
val timestamp2 = 2L
val cr2 = new ConsumerRecord("topic", 0, 0L,
timestamp2,
TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, 0, 0)
.asInstanceOf[ConsumerRecord[Object, Object]] // wish there were a better way
val sr2 = new StampedRecord(cr2, timestamp2)
val timestamp3 = 3L
val cr3 = new ConsumerRecord("topic", 0, 0L,
timestamp3,
TimestampType.NO_TIMESTAMP_TYPE, NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, 0, 0)
.asInstanceOf[ConsumerRecord[Object, Object]] // wish there were a better way
val sr3 = new StampedRecord(cr3, timestamp3)
// Adding the stampeds in a random order
// TreeSet is a concrete SortedSet
import java.util.TreeSet
import collection.JavaConverters._
val srs = new TreeSet(Seq(sr3, sr1, sr2).asJava)
assert(srs.asScala == Set(sr1, sr2, sr3))
StampedRecord — Orderable Kafka ConsumerRecords At Timestamp
StampedRecord
is a Stamped with Kafka ConsumerRecords (as values).
In other words, StampedRecord
represents a Kafka ConsumerRecord at a given timestamp and can be ordered in ascending order.
StampedRecord
is created when:
-
RecordQueue
is requested to add Kafka ConsumerRecords (as StampedRecords) -
StreamTask
is requested to punctuate
StampedRecord
takes the following when created:
-
Kafka ConsumerRecord
StampedRecord
gives access to the properties of a ConsumerRecord:
The text representation of a StampedRecord
is of the format value, timestamp = [timestamp]
.