import org.apache.spark.streaming.kafka010.KafkaUtils
KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
import org.apache.spark.streaming.kafka010.OffsetRange
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
}
HasOffsetRanges and OffsetRange
HasOffsetRanges
HasOffsetRanges
represents an object that has a collection of OffsetRanges (i.e. a range of offsets from a single Kafka topic partition).
HasOffsetRanges
is part of org.apache.spark.streaming.kafka010
package.
Note
|
KafkaRDD is a HasOffsetRanges object.
|
You can access HasOffsetRanges
given a KafkaRDD as follows:
OffsetRange
OffsetRange
represents a range of offsets from a single Kafka TopicPartition (i.e. a topic name and partition number).
OffsetRange
holds a topic
, partition
number, fromOffset
(inclusive) and untilOffset
(exclusive) offsets.
You can create instances of OffsetRange
using the factory methods from OffsetRange
companion object. You can then count the number of records in a topic partition using count method.
// Start spark-shell with spark-streaming-kafka-0-10_2.11 dependency
// --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT
import org.apache.spark.streaming.kafka010.OffsetRange
scala> val offsets = OffsetRange(topic = "spark-logs", partition = 0, fromOffset = 2, untilOffset = 5)
offsets: org.apache.spark.streaming.kafka010.OffsetRange = OffsetRange(topic: 'spark-logs', partition: 0, range: [2 -> 5])
scala> offsets.count
res0: Long = 3
scala> offsets.topicPartition
res1: org.apache.kafka.common.TopicPartition = spark-logs-0
OffsetRange
is part of org.apache.spark.streaming.kafka010
package.
Creating OffsetRange Instance
You can create instances of OffsetRange
using the following factory methods (from OffsetRange
companion object):
OffsetRange.create(
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long): OffsetRange
OffsetRange.create(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange
OffsetRange.apply(
topic: String,
partition: Int,
fromOffset: Long,
untilOffset: Long): OffsetRange
OffsetRange.apply(
topicPartition: TopicPartition,
fromOffset: Long,
untilOffset: Long): OffsetRange