KafkaOffsetRangeCalculator

KafkaOffsetRangeCalculator is created for KafkaMicroBatchReader to calculate offset ranges (when KafkaMicroBatchReader is requested to planInputPartitions).

KafkaOffsetRangeCalculator takes an optional minimum number of partitions per executor (minPartitions) to be created (that can either be undefined or greater than 0).

When created with a DataSourceOptions, KafkaOffsetRangeCalculator uses minPartitions option for the minimum number of partitions per executor.

Offset Ranges — getRanges Method

getRanges(
  fromOffsets: PartitionOffsetMap,
  untilOffsets: PartitionOffsetMap,
  executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]

getRanges finds the common TopicPartitions that are the keys that are used in the fromOffsets and untilOffsets collections (intersection).

For every common TopicPartition, getRanges creates a KafkaOffsetRange with the from and until offsets from the fromOffsets and untilOffsets collections (and the preferredLoc undefined). getRanges filters out the TopicPartitions that have no records to consume (i.e. the difference between until and from offsets is not greater than 0).

At this point, getRanges knows the TopicPartitions with records to consume.

getRanges branches off based on the defined minimum number of partitions per executor and the number of KafkaOffsetRanges (TopicPartitions with records to consume).

For the minimum number of partitions per executor undefined or smaller than the number of KafkaOffsetRanges (TopicPartitions to consume records from), getRanges updates every KafkaOffsetRange with the preferred executor based on the TopicPartition and the executorLocations).

Otherwise (with the minimum number of partitions per executor defined and greater than the number of KafkaOffsetRanges), getRanges splits KafkaOffsetRanges into smaller ones.

Note
getRanges is used exclusively when KafkaMicroBatchReader is requested to planInputPartitions.

KafkaOffsetRange — TopicPartition with From and Until Offsets and Optional Preferred Location

KafkaOffsetRange is a case class with the following attributes:

  • TopicPartition

  • fromOffset offset

  • untilOffset offset

  • Optional preferred location

KafkaOffsetRange knows the size, i.e. the number of records between the untilOffset and fromOffset offsets.

Selecting Preferred Executor for TopicPartition — getLocation Internal Method

getLocation(
  tp: TopicPartition,
  executorLocations: Seq[String]): Option[String]

getLocation…​FIXME

Note
getLocation is used exclusively when KafkaOffsetRangeCalculator is requested to calculate offset ranges.

results matching ""

    No results matching ""