log4j.logger.org.apache.spark.sql.kafka010.KafkaRelation=ALL
KafkaRelation
KafkaRelation
represents a collection of rows with a predefined schema (BaseRelation
) that supports column pruning (TableScan
).
Tip
|
Read up on BaseRelation and TableScan in The Internals of Spark SQL online book. |
KafkaRelation
is created exclusively when KafkaSourceProvider
is requested to create a BaseRelation.
Name | Description |
---|---|
|
Tip
|
Enable Add the following line to Refer to Logging. |
getPartitionOffsets
Internal Method
getPartitionOffsets(
kafkaReader: KafkaOffsetReader,
kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]
Caution
|
FIXME |
Note
|
getPartitionOffsets is used exclusively when KafkaRelation builds RDD of rows (from the tuples).
|
Building Distributed Data Scan with Column Pruning — buildScan
Method
buildScan(): RDD[Row]
Note
|
buildScan is part of the TableScan contract to build a distributed data scan with column pruning.
|
buildScan
generates a unique group ID of the format spark-kafka-relation-[randomUUID] (to make sure that a streaming query creates a new consumer group).
buildScan
creates a KafkaOffsetReader with the following:
-
The given ConsumerStrategy and the source options
-
Kafka parameters for the driver based on the given specifiedKafkaParams
-
spark-kafka-relation-[randomUUID]-driver for the
driverGroupIdPrefix
buildScan
uses the KafkaOffsetReader
to getPartitionOffsets for the starting and ending offsets (based on the given KafkaOffsetRangeLimit and the KafkaOffsetRangeLimit, respectively). buildScan
requests the KafkaOffsetReader
to close afterwards.
buildScan
creates offset ranges (that are a collection of KafkaSourceRDDOffsetRanges
with a Kafka TopicPartition
, beginning and ending offsets and undefined preferred location).
buildScan
prints out the following INFO message to the logs:
Generating RDD of offset ranges: [offsetRanges]
buildScan
creates a KafkaSourceRDD with the following:
-
Kafka parameters for executors based on the given specifiedKafkaParams and the unique group ID (
spark-kafka-relation-[randomUUID]
) -
The offset ranges created
-
pollTimeoutMs configuration
-
The given failOnDataLoss flag
-
reuseKafkaConsumer
flag off (false
)
buildScan
requests the KafkaSourceRDD
to map
Kafka ConsumerRecords
to InternalRows
.
In the end, buildScan
requests the SQLContext to create a DataFrame
(with the name kafka and the predefined schema) that is immediately converted to a RDD[InternalRow]
.
buildScan
throws a IllegalStateException
when…FIXME
different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]
buildScan
throws a IllegalStateException
when…FIXME
[tp] doesn't have a from offset