KafkaRelation(strategy=[strategy], start=[startingOffsets], end=[endingOffsets])
KafkaRelation
KafkaRelation is a BaseRelation with a TableScan.
KafkaRelation is created exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).
KafkaRelation uses the fixed schema.
| Field Name | Data Type |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KafkaRelation uses the following human-readable text representation:
| Name | Description |
|---|---|
|
Timeout (in milliseconds) to poll data from Kafka (pollTimeoutMs for Initialized with the value of the following configuration properties (in the order until one found):
If neither is set, defaults to Used exclusively when |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating KafkaRelation Instance
KafkaRelation takes the following when created:
-
Source options (as
Map[String, String]) that directly correspond to the options of DataFrameReader -
Starting offsets (as KafkaOffsetRangeLimit)
-
Ending offsets (as KafkaOffsetRangeLimit)
KafkaRelation initializes the internal registries and counters.
Building Distributed Data Scan with Column Pruning (as TableScan) — buildScan Method
buildScan(): RDD[Row]
|
Note
|
buildScan is part of TableScan Contract to build a distributed data scan with column pruning.
|
buildScan kafkaParamsForDriver from the user-defined Kafka parameters and uses it to create a KafkaOffsetReader (together with the ConsumerStrategy, the source options and a unique group ID of the format spark-kafka-relation-[randomUUID]-driver).
buildScan then uses the KafkaOffsetReader to getPartitionOffsets for the starting and ending offsets and closes it right after.
buildScan creates a KafkaSourceRDDOffsetRange for every pair of the starting and ending offsets.
buildScan prints out the following INFO message to the logs:
GetBatch generating RDD of offset range: [comma-separated offsetRanges]
buildScan then kafkaParamsForExecutors and uses it to create a KafkaSourceRDD (with the pollTimeoutMs) and maps over all the elements (using RDD.map operator that creates a MapPartitionsRDD).
|
Tip
|
Use RDD.toDebugString to see the two RDDs, i.e. KafkaSourceRDD and MapPartitionsRDD, in the RDD lineage.
|
In the end, buildScan requests the SQLContext to create a DataFrame from the KafkaSourceRDD and the schema.
buildScan throws an IllegalStateException when the topic partitions for starting offsets are different from the ending offsets topics:
different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]
getPartitionOffsets Internal Method
getPartitionOffsets(
kafkaReader: KafkaOffsetReader,
kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]
getPartitionOffsets requests the input KafkaOffsetReader to fetchTopicPartitions.
getPartitionOffsets uses the input KafkaOffsetRangeLimit to return the mapping of offsets per Kafka TopicPartition fetched:
-
For
EarliestOffsetRangeLimit,getPartitionOffsetsreturns a map with everyTopicPartitionand-2L(as the offset) -
For
LatestOffsetRangeLimit,getPartitionOffsetsreturns a map with everyTopicPartitionand-1L(as the offset) -
For
SpecificOffsetRangeLimit,getPartitionOffsetsreturns a map from validateTopicPartitions
|
Note
|
getPartitionOffsets is used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan).
|
Validating TopicPartitions (Against Partition Offsets) — validateTopicPartitions Inner Method
validateTopicPartitions(
partitions: Set[TopicPartition],
partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
|
Note
|
validateTopicPartitions is a Scala inner method of getPartitionOffsets, i.e. validateTopicPartitions is defined within the body of getPartitionOffsets and so is visible and can only be used in getPartitionOffsets.
|
validateTopicPartitions asserts that the input set of Kafka TopicPartitions is exactly the set of the keys in the input partitionOffsets.
validateTopicPartitions prints out the following DEBUG message to the logs:
Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]
In the end, validateTopicPartitions returns the input partitionOffsets.
If the input set of Kafka TopicPartitions is not the set of the keys in the input partitionOffsets, validateTopicPartitions throws an AssertionError:
assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions.
Use -1 for latest, -2 for earliest, if you don't care.
Specified: [partitionOffsets] Assigned: [partitions]