getRanges( fromOffsets: PartitionOffsetMap, untilOffsets: PartitionOffsetMap, executorLocations: Seq[String] = Seq.empty): Seq[KafkaOffsetRange]
getRanges finds the common
TopicPartitions that are the keys that are used in the
untilOffsets collections (intersection).
For every common
getRanges creates a KafkaOffsetRange with the from and until offsets from the
untilOffsets collections (and the preferredLoc undefined).
getRanges filters out the
TopicPartitions that have no records to consume (i.e. the difference between until and from offsets is not greater than
At this point,
getRanges knows the
TopicPartitions with records to consume.
getRanges branches off based on the defined minimum number of partitions per executor and the number of
TopicPartitions with records to consume).
For the minimum number of partitions per executor undefined or smaller than the number of
TopicPartitions to consume records from),
getRanges updates every
KafkaOffsetRange with the preferred executor based on the
TopicPartition and the
Otherwise (with the minimum number of partitions per executor defined and greater than the number of
KafkaOffsetRanges into smaller ones.
KafkaOffsetRange is a case class with the following attributes:
getLocation( tp: TopicPartition, executorLocations: Seq[String]): Option[String]