log4j.logger.org.apache.spark.streaming.kafka010.KafkaRDD=INFO
KafkaRDD
KafkaRDD is a RDD of Kafka’s ConsumerRecords from topics in Apache Kafka with support for HasOffsetRanges.
|
Note
|
Kafka’s ConsumerRecord holds a topic name, a partition number, the offset of the record in the Kafka partition and the record itself (as a key-value pair). |
KafkaRDD uses KafkaRDDPartition as the partitions that define their preferred locations (as the host of the topic).
|
Note
|
The feature of defining placement preference (aka location preference) very well maps a KafkaRDD partition to a Kafka topic partition on a Kafka-closest host.
|
KafkaRDD is created:
-
On demand using KafkaUtils.createRDD
-
In batches using KafkaUtils.createDirectStream
KafkaRDD is also created when a DirectKafkaInputDStream restores KafkaRDDs from checkpoint.
|
Note
|
KafkaRDD is a private[spark] class.
|
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating KafkaRDD Instance
KafkaRDD takes the following when created:
-
Collection of OffsetRanges
|
Caution
|
FIXME Are the hosts in preferredHosts Kafka brokers?
|
KafkaRDD initializes the internal registries and counters.
Computing KafkaRDDPartition (in TaskContext) — compute Method
compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]]
|
Note
|
compute is a part of the RDD Contract.
|
compute assumes that it works with thePart as KafkaRDDPartition only. It asserts that the offsets are correct, i.e. fromOffset is at most untilOffset.
If the beginning and ending offsets are the same, you should see the following INFO message in the logs and compute returns an empty collection.
INFO KafkaRDD: Beginning offset [fromOffset] is the same as ending offset skipping [topic] [partition]
Otherwise, when the beginning and ending offsets are different, a KafkaRDDIterator is created (for the partition and the input TaskContext) and returned.
Getting Placement Preferences of Partition — getPreferredLocations Method
getPreferredLocations(thePart: Partition): Seq[String]
|
Note
|
getPreferredLocations is a part of RDD contract to define the placement preferences (aka preferred locations) of a partition.
|
getPreferredLocations casts thePart to KafkaRDDPartition.
getPreferredLocations finds all executors.
|
Caution
|
FIXME Use proper name for executors. |
getPreferredLocations requests KafkaRDDPartition for the Kafka TopicPartition and finds the preferred hosts for the partition.
|
Note
|
getPreferredLocations uses preferredHosts that was given when KafkaRDD was created.
|
If getPreferredLocations did not find the preferred host for the partition, all executors are used. Otherwise, getPreferredLocations includes only executors on the preferred host.
If getPreferredLocations found no executors, all the executors are considered.
getPreferredLocations returns one matching executor (for the TopicPartition) or an empty collection.
Creating ExecutorCacheTaskLocations for All Executors in Cluster — executors Internal Method
executors(): Array[ExecutorCacheTaskLocation]
executors requests BlockManagerMaster for all BlockManager nodes (peers) in a cluster (that represent all the executors available).
|
Note
|
executors uses KafkaRDD's SparkContext to access the current BlockManager and in turn BlockManagerMaster.
|
executors creates ExecutorCacheTaskLocations using the peers' hosts and executor ids.
|
Note
|
executors are sorted by their host names and executor ids.
|
|
Caution
|
FIXME Image for sorted ExecutorCacheTaskLocations. |
|
Note
|
executors is used exclusively when KafkaRDD is requested for its placement preferences (aka preferred locations).
|