getRanges(
fromOffsets: PartitionOffsetMap,
untilOffsets: PartitionOffsetMap,
executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]
KafkaOffsetRangeCalculator
KafkaOffsetRangeCalculator
is created for KafkaMicroBatchReader to calculate offset ranges (when KafkaMicroBatchReader
is requested to planInputPartitions).
KafkaOffsetRangeCalculator
takes an optional minimum number of partitions per executor (minPartitions
) to be created (that can either be undefined or greater than 0
).
When created with a DataSourceOptions
, KafkaOffsetRangeCalculator
uses minPartitions option for the minimum number of partitions per executor.
Offset Ranges — getRanges
Method
getRanges
finds the common TopicPartitions
that are the keys that are used in the fromOffsets
and untilOffsets
collections (intersection).
For every common TopicPartition
, getRanges
creates a KafkaOffsetRange with the from and until offsets from the fromOffsets
and untilOffsets
collections (and the preferredLoc undefined). getRanges
filters out the TopicPartitions
that have no records to consume (i.e. the difference between until and from offsets is not greater than 0
).
At this point, getRanges
knows the TopicPartitions
with records to consume.
getRanges
branches off based on the defined minimum number of partitions per executor and the number of KafkaOffsetRanges
(TopicPartitions
with records to consume).
For the minimum number of partitions per executor undefined or smaller than the number of KafkaOffsetRanges
(TopicPartitions
to consume records from), getRanges
updates every KafkaOffsetRange
with the preferred executor based on the TopicPartition
and the executorLocations
).
Otherwise (with the minimum number of partitions per executor defined and greater than the number of KafkaOffsetRanges
), getRanges
splits KafkaOffsetRanges
into smaller ones.
Note
|
getRanges is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions.
|
KafkaOffsetRange — TopicPartition with From and Until Offsets and Optional Preferred Location
KafkaOffsetRange
is a case class with the following attributes:
KafkaOffsetRange
knows the size, i.e. the number of records between the untilOffset and fromOffset offsets.
Selecting Preferred Executor for TopicPartition — getLocation
Internal Method
getLocation(
tp: TopicPartition,
executorLocations: Seq[String]): Option[String]
getLocation
…FIXME
Note
|
getLocation is used exclusively when KafkaOffsetRangeCalculator is requested to calculate offset ranges.
|