KafkaRDD

KafkaRDD is a RDD of Kafka’s ConsumerRecords from topics in Apache Kafka with support for HasOffsetRanges.

Note
Kafka’s ConsumerRecord holds a topic name, a partition number, the offset of the record in the Kafka partition and the record itself (as a key-value pair).

KafkaRDD uses KafkaRDDPartition as the partitions that define their preferred locations (as the host of the topic).

Note
The feature of defining placement preference (aka location preference) very well maps a KafkaRDD partition to a Kafka topic partition on a Kafka-closest host.

KafkaRDD is created:

KafkaRDD is also created when a DirectKafkaInputDStream restores KafkaRDDs from checkpoint.

Note
KafkaRDD is a private[spark] class.
Tip

Enable INFO logging level for org.apache.spark.streaming.kafka010.KafkaRDD logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.kafka010.KafkaRDD=INFO

Refer to Logging.

getPartitions Method

Caution
FIXME

Creating KafkaRDD Instance

KafkaRDD takes the following when created:

  • SparkContext

  • Collection of Kafka parameters with their values

  • Collection of OffsetRanges

  • Kafka’s TopicPartitions and their hosts

  • Flag to control whether to use consumer cache

Caution
FIXME Are the hosts in preferredHosts Kafka brokers?

KafkaRDD initializes the internal registries and counters.

Computing KafkaRDDPartition (in TaskContext) — compute Method

compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]]
Note
compute is a part of the RDD Contract.

compute assumes that it works with thePart as KafkaRDDPartition only. It asserts that the offsets are correct, i.e. fromOffset is at most untilOffset.

If the beginning and ending offsets are the same, you should see the following INFO message in the logs and compute returns an empty collection.

INFO KafkaRDD: Beginning offset [fromOffset] is the same as ending offset skipping [topic] [partition]

Otherwise, when the beginning and ending offsets are different, a KafkaRDDIterator is created (for the partition and the input TaskContext) and returned.

Getting Placement Preferences of Partition — getPreferredLocations Method

getPreferredLocations(thePart: Partition): Seq[String]
Note
getPreferredLocations is a part of RDD contract to define the placement preferences (aka preferred locations) of a partition.

getPreferredLocations casts thePart to KafkaRDDPartition.

getPreferredLocations finds all executors.

Caution
FIXME Use proper name for executors.

getPreferredLocations requests KafkaRDDPartition for the Kafka TopicPartition and finds the preferred hosts for the partition.

Note
getPreferredLocations uses preferredHosts that was given when KafkaRDD was created.

If getPreferredLocations did not find the preferred host for the partition, all executors are used. Otherwise, getPreferredLocations includes only executors on the preferred host.

If getPreferredLocations found no executors, all the executors are considered.

getPreferredLocations returns one matching executor (for the TopicPartition) or an empty collection.

Creating ExecutorCacheTaskLocations for All Executors in Cluster — executors Internal Method

executors(): Array[ExecutorCacheTaskLocation]

executors requests BlockManagerMaster for all BlockManager nodes (peers) in a cluster (that represent all the executors available).

Note
executors uses KafkaRDD's SparkContext to access the current BlockManager and in turn BlockManagerMaster.

executors creates ExecutorCacheTaskLocations using the peers' hosts and executor ids.

Note
executors are sorted by their host names and executor ids.
Caution
FIXME Image for sorted ExecutorCacheTaskLocations.
Note
executors is used exclusively when KafkaRDD is requested for its placement preferences (aka preferred locations).

KafkaRDDPartition

KafkaRDDPartition is…​FIXME

topicPartition

Caution
FIXME

results matching ""

    No results matching ""