HasOffsetRanges and OffsetRange

HasOffsetRanges

HasOffsetRanges represents an object that has a collection of OffsetRanges (i.e. a range of offsets from a single Kafka topic partition).

HasOffsetRanges is part of org.apache.spark.streaming.kafka010 package.

Note
KafkaRDD is a HasOffsetRanges object.

You can access HasOffsetRanges given a KafkaRDD as follows:

import org.apache.spark.streaming.kafka010.KafkaUtils
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
  import org.apache.spark.streaming.kafka010.OffsetRange
  val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}

OffsetRange

OffsetRange represents a range of offsets from a single Kafka TopicPartition (i.e. a topic name and partition number).

OffsetRange holds a topic, partition number, fromOffset (inclusive) and untilOffset (exclusive) offsets.

You can create instances of OffsetRange using the factory methods from OffsetRange companion object. You can then count the number of records in a topic partition using count method.

// Start spark-shell with spark-streaming-kafka-0-10_2.11 dependency
// --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT
import org.apache.spark.streaming.kafka010.OffsetRange

scala> val offsets = OffsetRange(topic = "spark-logs", partition = 0, fromOffset = 2, untilOffset = 5)
offsets: org.apache.spark.streaming.kafka010.OffsetRange = OffsetRange(topic: 'spark-logs', partition: 0, range: [2 -> 5])

scala> offsets.count
res0: Long = 3

scala> offsets.topicPartition
res1: org.apache.kafka.common.TopicPartition = spark-logs-0

OffsetRange is part of org.apache.spark.streaming.kafka010 package.

Creating OffsetRange Instance

You can create instances of OffsetRange using the following factory methods (from OffsetRange companion object):

OffsetRange.create(
  topic: String,
  partition: Int,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

OffsetRange.create(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

OffsetRange.apply(
  topic: String,
  partition: Int,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

OffsetRange.apply(
  topicPartition: TopicPartition,
  fromOffset: Long,
  untilOffset: Long): OffsetRange

Counting Records in Topic Partition — count method

count(): Long

count counts the number of records in a OffsetRange.

results matching ""

    No results matching ""