KafkaSourceProvider

KafkaSourceProvider is a DataSourceRegister and registers itself to handle kafka data source format.

Note
KafkaSourceProvider uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file for the registration which is available in the source code of Apache Spark.

KafkaSourceProvider is a RelationProvider and a CreatableRelationProvider.

// start Spark application like spark-shell with the following package
// --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5
scala> val fromKafkaTopic1 = spark.
  read.
  format("kafka").
  option("subscribe", "topic1").  // subscribe, subscribepattern, or assign
  option("kafka.bootstrap.servers", "localhost:9092").
  load("gauge_one")

KafkaSourceProvider uses a fixed schema (and makes sure that a user did not set a custom one).

import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"id".int)
scala> spark
  .read
  .format("kafka")
  .option("subscribe", "topic1")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .schema(schema) // <-- defining a custom schema is not supported
  .load
org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.;
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
  ... 48 elided
Note

KafkaSourceProvider is also a StreamSourceProvider, a StreamSinkProvider, a StreamWriteSupport and a ContinuousReadSupport that are contracts used in Spark Structured Streaming.

You can find more on Spark Structured Streaming in my gitbook Spark Structured Streaming.

Creating BaseRelation — createRelation Method (from RelationProvider)

createRelation(
  sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation
Note
createRelation is part of RelationProvider Contract to create a BaseRelation (for reading or writing).

createRelation starts by validating the Kafka options (for batch queries) in the input parameters.

createRelation collects all kafka.-prefixed key options (in the input parameters) and creates a local specifiedKafkaParams with the keys without the kafka. prefix (e.g. kafka.whatever is simply whatever).

createRelation gets the desired KafkaOffsetRangeLimit with the startingoffsets offset option key (in the given parameters) and EarliestOffsetRangeLimit as the default offsets.

createRelation makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws an AssertionError.

createRelation gets the desired KafkaOffsetRangeLimit, but this time with the endingoffsets offset option key (in the given parameters) and LatestOffsetRangeLimit as the default offsets.

createRelation makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws a AssertionError.

In the end, createRelation creates a KafkaRelation with the subscription strategy (in the given parameters), failOnDataLoss option, and the starting and ending offsets.

Validating Kafka Options (for Batch Queries) — validateBatchOptions Internal Method

validateBatchOptions(caseInsensitiveParams: Map[String, String]): Unit

validateBatchOptions gets the desired KafkaOffsetRangeLimit for the startingoffsets option in the input caseInsensitiveParams and with EarliestOffsetRangeLimit as the default KafkaOffsetRangeLimit.

validateBatchOptions then matches the returned KafkaOffsetRangeLimit as follows:

  1. EarliestOffsetRangeLimit is acceptable and validateBatchOptions simply does nothing

  2. LatestOffsetRangeLimit is not acceptable and validateBatchOptions throws an IllegalArgumentException:

    starting offset can't be latest for batch queries on Kafka
  3. SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which validateBatchOptions throws an IllegalArgumentException:

    startingOffsets for [tp] can't be latest for batch queries on Kafka
Note
validateBatchOptions is used exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).

Writing DataFrame to Kafka Topic — createRelation Method (from CreatableRelationProvider)

createRelation(
  sqlContext: SQLContext,
  mode: SaveMode,
  parameters: Map[String, String],
  df: DataFrame): BaseRelation
Note
createRelation is part of the CreatableRelationProvider Contract to write the rows of a structured query (a DataFrame) to an external data source.

createRelation gets the topic option from the input parameters.

createRelation gets the Kafka-specific options for writing from the input parameters.

createRelation then uses the KafkaWriter helper object to write the rows of the DataFrame to the Kafka topic.

In the end, createRelation creates a fake BaseRelation that simply throws an UnsupportedOperationException for all its methods.

createRelation supports Append and ErrorIfExists only. createRelation throws an AnalysisException for the other save modes:

Save mode [mode] not allowed for Kafka. Allowed save modes are [Append] and [ErrorIfExists] (default).

sourceSchema Method

sourceSchema(
  sqlContext: SQLContext,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): (String, StructType)

sourceSchema…​FIXME

val fromKafka = spark.read.format("kafka")...
scala> fromKafka.printSchema
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)
Note
sourceSchema is part of Structured Streaming’s StreamSourceProvider Contract.

Getting Desired KafkaOffsetRangeLimit (for Offset Option) — getKafkaOffsetRangeLimit Object Method

getKafkaOffsetRangeLimit(
  params: Map[String, String],
  offsetOptionKey: String,
  defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit

getKafkaOffsetRangeLimit tries to find the given offsetOptionKey in the input params and converts the value found to a KafkaOffsetRangeLimit as follows:

When the input offsetOptionKey was not found, getKafkaOffsetRangeLimit returns the input defaultOffsets.

Note

getKafkaOffsetRangeLimit is used when:

Getting ConsumerStrategy per Subscription Strategy Option — strategy Internal Method

strategy(caseInsensitiveParams: Map[String, String]): ConsumerStrategy

strategy finds one of the strategy options: subscribe, subscribepattern and assign.

For assign, strategy uses the JsonUtils helper object to deserialize TopicPartitions from JSON (e.g. {"topicA":[0,1],"topicB":[0,1]}) and returns a new AssignStrategy.

For subscribe, strategy splits the value by , (comma) and returns a new SubscribeStrategy.

For subscribepattern, strategy returns a new SubscribePatternStrategy

Note

strategy is used when:

  • KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider)

  • (Spark Structured Streaming) KafkaSourceProvider is requested to createSource and createContinuousReader

failOnDataLoss Internal Method

failOnDataLoss(caseInsensitiveParams: Map[String, String]): Boolean

failOnDataLoss…​FIXME

Note
failOnDataLoss is used when KafkaSourceProvider is requested to create a BaseRelation (and also in createSource and createContinuousReader for Spark Structured Streaming).

Setting Kafka Configuration Parameters for Driver — kafkaParamsForDriver Object Method

kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): java.util.Map[String, Object]

kafkaParamsForDriver simply sets the additional Kafka configuration parameters for the driver.

Table 1. Driver’s Kafka Configuration Parameters
Name Value ConsumerConfig Description

key.deserializer

org.apache.kafka.common.serialization.ByteArrayDeserializer

KEY_DESERIALIZER_CLASS_CONFIG

Deserializer class for keys that implements the Kafka Deserializer interface.

value.deserializer

org.apache.kafka.common.serialization.ByteArrayDeserializer

VALUE_DESERIALIZER_CLASS_CONFIG

Deserializer class for values that implements the Kafka Deserializer interface.

auto.offset.reset

earliest

AUTO_OFFSET_RESET_CONFIG

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest — automatically reset the offset to the earliest offset

  • latest — automatically reset the offset to the latest offset

  • none — throw an exception to the Kafka consumer if no previous offset is found for the consumer’s group

  • anything else — throw an exception to the Kafka consumer

enable.auto.commit

false

ENABLE_AUTO_COMMIT_CONFIG

If true the Kafka consumer’s offset will be periodically committed in the background

max.poll.records

1

MAX_POLL_RECORDS_CONFIG

The maximum number of records returned in a single call to Consumer.poll()

receive.buffer.bytes

65536

MAX_POLL_RECORDS_CONFIG

Only set if not set already

Tip

Enable DEBUG logging level for org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater logger to see updates of Kafka configuration parameters.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider.ConfigUpdater=DEBUG

Refer to Logging.

Note

kafkaParamsForDriver is used when:

kafkaParamsForExecutors Object Method

kafkaParamsForExecutors(
  specifiedKafkaParams: Map[String, String],
  uniqueGroupId: String): java.util.Map[String, Object]

kafkaParamsForExecutors…​FIXME

Note
kafkaParamsForExecutors is used when…​FIXME

kafkaParamsForProducer Object Method

kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String]

kafkaParamsForProducer…​FIXME

Note
kafkaParamsForProducer is used when…​FIXME

results matching ""

    No results matching ""