KafkaRelation

KafkaRelation is a BaseRelation with a TableScan.

KafkaRelation is created exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).

KafkaRelation uses the fixed schema.

Table 1. KafkaRelation’s Schema (in the positional order)
Field Name Data Type

key

BinaryType

value

BinaryType

topic

StringType

partition

IntegerType

offset

LongType

timestamp

TimestampType

timestampType

IntegerType

KafkaRelation uses the following human-readable text representation:

KafkaRelation(strategy=[strategy], start=[startingOffsets], end=[endingOffsets])
Table 2. KafkaRelation’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

pollTimeoutMs

Timeout (in milliseconds) to poll data from Kafka (pollTimeoutMs for KafkaSourceRDD)

Initialized with the value of the following configuration properties (in the order until one found):

  1. kafkaConsumer.pollTimeoutMs in the source options

  2. spark.network.timeout in the SparkConf

If neither is set, defaults to 120s.

Used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (and creates a KafkaSourceRDD).

Tip

Enable INFO or DEBUG logging level for org.apache.spark.sql.kafka010.KafkaRelation logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaRelation=DEBUG

Refer to Logging.

Creating KafkaRelation Instance

KafkaRelation takes the following when created:

  • SQLContext

  • ConsumerStrategy

  • Source options (as Map[String, String]) that directly correspond to the options of DataFrameReader

  • User-defined Kafka parameters (as Map[String, String])

  • failOnDataLoss flag

  • Starting offsets (as KafkaOffsetRangeLimit)

  • Ending offsets (as KafkaOffsetRangeLimit)

KafkaRelation initializes the internal registries and counters.

Building Distributed Data Scan with Column Pruning (as TableScan) — buildScan Method

buildScan(): RDD[Row]
Note
buildScan is part of TableScan Contract to build a distributed data scan with column pruning.

buildScan kafkaParamsForDriver from the user-defined Kafka parameters and uses it to create a KafkaOffsetReader (together with the ConsumerStrategy, the source options and a unique group ID of the format spark-kafka-relation-[randomUUID]-driver).

buildScan then uses the KafkaOffsetReader to getPartitionOffsets for the starting and ending offsets and closes it right after.

buildScan creates a KafkaSourceRDDOffsetRange for every pair of the starting and ending offsets.

buildScan prints out the following INFO message to the logs:

GetBatch generating RDD of offset range: [comma-separated offsetRanges]

buildScan then kafkaParamsForExecutors and uses it to create a KafkaSourceRDD (with the pollTimeoutMs) and maps over all the elements (using RDD.map operator that creates a MapPartitionsRDD).

Tip
Use RDD.toDebugString to see the two RDDs, i.e. KafkaSourceRDD and MapPartitionsRDD, in the RDD lineage.

In the end, buildScan requests the SQLContext to create a DataFrame from the KafkaSourceRDD and the schema.

buildScan throws an IllegalStateException when the topic partitions for starting offsets are different from the ending offsets topics:

different topic partitions for starting offsets topics[[fromTopics]] and ending offsets topics[[untilTopics]]

getPartitionOffsets Internal Method

getPartitionOffsets(
  kafkaReader: KafkaOffsetReader,
  kafkaOffsets: KafkaOffsetRangeLimit): Map[TopicPartition, Long]

getPartitionOffsets requests the input KafkaOffsetReader to fetchTopicPartitions.

getPartitionOffsets uses the input KafkaOffsetRangeLimit to return the mapping of offsets per Kafka TopicPartition fetched:

  1. For EarliestOffsetRangeLimit, getPartitionOffsets returns a map with every TopicPartition and -2L (as the offset)

  2. For LatestOffsetRangeLimit, getPartitionOffsets returns a map with every TopicPartition and -1L (as the offset)

  3. For SpecificOffsetRangeLimit, getPartitionOffsets returns a map from validateTopicPartitions

Note
getPartitionOffsets is used exclusively when KafkaRelation is requested to build a distributed data scan with column pruning (as a TableScan).

Validating TopicPartitions (Against Partition Offsets) — validateTopicPartitions Inner Method

validateTopicPartitions(
  partitions: Set[TopicPartition],
  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
Note
validateTopicPartitions is a Scala inner method of getPartitionOffsets, i.e. validateTopicPartitions is defined within the body of getPartitionOffsets and so is visible and can only be used in getPartitionOffsets.

validateTopicPartitions asserts that the input set of Kafka TopicPartitions is exactly the set of the keys in the input partitionOffsets.

validateTopicPartitions prints out the following DEBUG message to the logs:

Partitions assigned to consumer: [partitions]. Seeking to [partitionOffsets]

In the end, validateTopicPartitions returns the input partitionOffsets.

If the input set of Kafka TopicPartitions is not the set of the keys in the input partitionOffsets, validateTopicPartitions throws an AssertionError:

assertion failed: If startingOffsets contains specific offsets, you must specify all TopicPartitions.
Use -1 for latest, -2 for earliest, if you don't care.
Specified: [partitionOffsets] Assigned: [partitions]

results matching ""

    No results matching ""