log4j.logger.org.apache.spark.streaming.kafka010.KafkaRDD=INFO
KafkaRDD
KafkaRDD
is a RDD of Kafka’s ConsumerRecords
from topics in Apache Kafka with support for HasOffsetRanges.
Note
|
Kafka’s ConsumerRecord holds a topic name, a partition number, the offset of the record in the Kafka partition and the record itself (as a key-value pair). |
KafkaRDD
uses KafkaRDDPartition as the partitions that define their preferred locations (as the host of the topic).
Note
|
The feature of defining placement preference (aka location preference) very well maps a KafkaRDD partition to a Kafka topic partition on a Kafka-closest host.
|
KafkaRDD
is created:
-
On demand using KafkaUtils.createRDD
-
In batches using KafkaUtils.createDirectStream
KafkaRDD
is also created when a DirectKafkaInputDStream
restores KafkaRDDs
from checkpoint.
Note
|
KafkaRDD is a private[spark] class.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating KafkaRDD Instance
KafkaRDD
takes the following when created:
-
Collection of OffsetRanges
Caution
|
FIXME Are the hosts in preferredHosts Kafka brokers?
|
KafkaRDD
initializes the internal registries and counters.
Computing KafkaRDDPartition (in TaskContext) — compute
Method
compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]]
Note
|
compute is a part of the RDD Contract.
|
compute
assumes that it works with thePart
as KafkaRDDPartition only. It asserts that the offsets are correct, i.e. fromOffset
is at most untilOffset
.
If the beginning and ending offsets are the same, you should see the following INFO message in the logs and compute
returns an empty collection.
INFO KafkaRDD: Beginning offset [fromOffset] is the same as ending offset skipping [topic] [partition]
Otherwise, when the beginning and ending offsets are different, a KafkaRDDIterator is created (for the partition and the input TaskContext) and returned.
Getting Placement Preferences of Partition — getPreferredLocations
Method
getPreferredLocations(thePart: Partition): Seq[String]
Note
|
getPreferredLocations is a part of RDD contract to define the placement preferences (aka preferred locations) of a partition.
|
getPreferredLocations
casts thePart
to KafkaRDDPartition.
getPreferredLocations
finds all executors.
Caution
|
FIXME Use proper name for executors. |
getPreferredLocations
requests KafkaRDDPartition
for the Kafka TopicPartition
and finds the preferred hosts for the partition.
Note
|
getPreferredLocations uses preferredHosts that was given when KafkaRDD was created.
|
If getPreferredLocations
did not find the preferred host for the partition, all executors are used. Otherwise, getPreferredLocations
includes only executors on the preferred host.
If getPreferredLocations
found no executors, all the executors are considered.
getPreferredLocations
returns one matching executor (for the TopicPartition
) or an empty collection.
Creating ExecutorCacheTaskLocations for All Executors in Cluster — executors
Internal Method
executors(): Array[ExecutorCacheTaskLocation]
executors
requests BlockManagerMaster
for all BlockManager
nodes (peers) in a cluster (that represent all the executors available).
Note
|
executors uses KafkaRDD 's SparkContext to access the current BlockManager and in turn BlockManagerMaster.
|
executors
creates ExecutorCacheTaskLocations
using the peers' hosts and executor ids.
Note
|
executors are sorted by their host names and executor ids.
|
Caution
|
FIXME Image for sorted ExecutorCacheTaskLocations. |
Note
|
executors is used exclusively when KafkaRDD is requested for its placement preferences (aka preferred locations).
|