KafkaSourceProvider — Data Source Provider for Apache Kafka

KafkaSourceProvider is a DataSourceRegister and registers a developer-friendly alias for kafka data source format in Spark Structured Streaming.

KafkaSourceProvider requires the following options (that you can set using option method of DataStreamReader or DataStreamWriter):

Tip
Refer to Kafka Data Source’s Options for the supported configuration options.

Internally, KafkaSourceProvider sets the properties for Kafka Consumers on executors (that are passed on to InternalKafkaConsumer when requested to create a Kafka consumer with a single TopicPartition manually assigned).

Table 1. KafkaSourceProvider’s Properties for Kafka Consumers on Executors
ConsumerConfig’s Key Value Description

KEY_DESERIALIZER_CLASS_CONFIG

ByteArrayDeserializer

FIXME

VALUE_DESERIALIZER_CLASS_CONFIG

ByteArrayDeserializer

FIXME

AUTO_OFFSET_RESET_CONFIG

none

FIXME

GROUP_ID_CONFIG

uniqueGroupId-executor

FIXME

ENABLE_AUTO_COMMIT_CONFIG

false

FIXME

RECEIVE_BUFFER_CONFIG

65536

Only when not set in the specifiedKafkaParams already

Tip

Enable ALL logging levels for org.apache.spark.sql.kafka010.KafkaSourceProvider logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider=ALL

Refer to Logging.

Creating Streaming Source — createSource Method

createSource(
  sqlContext: SQLContext,
  metadataPath: String,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): Source
Note
createSource is part of the StreamSourceProvider Contract to create a streaming source for a format or system (to continually read data).

createSource first validates stream options.

createSource…​FIXME

Validating General Options For Batch And Streaming Queries — validateGeneralOptions Internal Method

validateGeneralOptions(parameters: Map[String, String]): Unit
Note
Parameters are case-insensitive, i.e. OptioN and option are equal.

validateGeneralOptions makes sure that exactly one topic subscription strategy is used in parameters and can be:

  • subscribe

  • subscribepattern

  • assign

validateGeneralOptions reports an IllegalArgumentException when there is no subscription strategy in use or there are more than one strategies used.

validateGeneralOptions makes sure that the value of subscription strategies meet the requirements:

  • assign strategy starts with { (the opening curly brace)

  • subscribe strategy has at least one topic (in a comma-separated list of topics)

  • subscribepattern strategy has the pattern defined

validateGeneralOptions makes sure that group.id has not been specified and reports an IllegalArgumentException otherwise.

Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.

validateGeneralOptions makes sure that auto.offset.reset has not been specified and reports an IllegalArgumentException otherwise.

Kafka option 'auto.offset.reset' is not supported.
Instead set the source option 'startingoffsets' to 'earliest' or 'latest' to specify where to start. Structured Streaming manages which offsets are consumed internally, rather than relying on the kafkaConsumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that 'startingoffsets' only applies when a new Streaming query is started, and
that resuming will always pick up from where the query left off. See the docs for more details.

validateGeneralOptions makes sure that the following options have not been specified and reports an IllegalArgumentException otherwise:

  • kafka.key.deserializer

  • kafka.value.deserializer

  • kafka.enable.auto.commit

  • kafka.interceptor.classes

In the end, validateGeneralOptions makes sure that kafka.bootstrap.servers option was specified and reports an IllegalArgumentException otherwise.

Option 'kafka.bootstrap.servers' must be specified for configuring Kafka consumer
Note
validateGeneralOptions is used when KafkaSourceProvider validates options for streaming and batch queries.

Creating ConsumerStrategy — strategy Internal Method

strategy(caseInsensitiveParams: Map[String, String])

Internally, strategy finds the keys in the input caseInsensitiveParams that are one of the following and creates a corresponding ConsumerStrategy.

Table 2. KafkaSourceProvider.strategy’s Key to ConsumerStrategy Conversion
Key ConsumerStrategy

assign


strategy uses JsonUtils.partitions method to parse a JSON with topic names and partitions, e.g.

{"topicA":[0,1],"topicB":[0,1]}

The topic names and partitions are mapped directly to Kafka’s TopicPartition objects.

subscribe

SubscribeStrategy with topic names


strategy extracts topic names from a comma-separated string, e.g.

topic1,topic2,topic3

subscribepattern

SubscribePatternStrategy with topic subscription regex pattern (that uses Java’s java.util.regex.Pattern for the pattern), e.g.

topic\d
Note

strategy is used when:

Describing Streaming Source with Name and Schema — sourceSchema Method

sourceSchema(
  sqlContext: SQLContext,
  schema: Option[StructType],
  providerName: String,
  parameters: Map[String, String]): (String, StructType)
Note
sourceSchema is part of the StreamSourceProvider Contract to describe a streaming source with a name and the schema.

sourceSchema gives the short name (i.e. kafka) and the fixed schema.

Internally, sourceSchema validates Kafka options and makes sure that the optional input schema is indeed undefined.

When the input schema is defined, sourceSchema reports a IllegalArgumentException.

Kafka source has a fixed schema and cannot be set with a custom one

Validating Kafka Options for Streaming Queries — validateStreamOptions Internal Method

validateStreamOptions(caseInsensitiveParams: Map[String, String]): Unit

Firstly, validateStreamOptions makes sure that endingoffsets option is not used. Otherwise, validateStreamOptions reports a IllegalArgumentException.

ending offset not valid in streaming queries

validateStreamOptions then validates the general options.

Note
validateStreamOptions is used when KafkaSourceProvider is requested the schema for Kafka source and to create a KafkaSource.

Creating ContinuousReader for Continuous Stream Processing — createContinuousReader Method

createContinuousReader(
  schema: Optional[StructType],
  metadataPath: String,
  options: DataSourceOptions): KafkaContinuousReader
Note
createContinuousReader is part of the ContinuousReadSupport Contract to create a ContinuousReader.

createContinuousReader…​FIXME

Converting Configuration Options to KafkaOffsetRangeLimit — getKafkaOffsetRangeLimit Object Method

getKafkaOffsetRangeLimit(
  params: Map[String, String],
  offsetOptionKey: String,
  defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit

getKafkaOffsetRangeLimit finds the given offsetOptionKey in the params and does the following conversion:

Note
getKafkaOffsetRangeLimit is used when KafkaSourceProvider is requested to createSource, createMicroBatchReader, createContinuousReader, createRelation, and validateBatchOptions.

Creating MicroBatchReader for Micro-Batch Stream Processing — createMicroBatchReader Method

createMicroBatchReader(
  schema: Optional[StructType],
  metadataPath: String,
  options: DataSourceOptions): KafkaMicroBatchReader
Note
createMicroBatchReader is part of the MicroBatchReadSupport Contract to create a MicroBatchReader in Micro-Batch Stream Processing.

createMicroBatchReader validateStreamOptions (in the given DataSourceOptions).

createMicroBatchReader generates a unique group ID of the format spark-kafka-source-[randomUUID]-[metadataPath_hashCode] (to make sure that a new streaming query creates a new consumer group).

createMicroBatchReader finds all the parameters (in the given DataSourceOptions) that start with kafka. prefix, removes the prefix, and creates the current Kafka parameters.

createMicroBatchReader creates a KafkaOffsetReader with the following:

  • strategy (in the given DataSourceOptions)

  • Properties for Kafka consumers on the driver (given the current Kafka parameters, i.e. without kafka. prefix)

  • The given DataSourceOptions

  • spark-kafka-source-[randomUUID]-[metadataPath_hashCode]-driver for the driverGroupIdPrefix

In the end, createMicroBatchReader creates a KafkaMicroBatchReader with the following:

Creating BaseRelation — createRelation Method

createRelation(
  sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation
Note
createRelation is part of the RelationProvider contract to create a BaseRelation.

createRelation…​FIXME

Validating Configuration Options for Batch Processing — validateBatchOptions Internal Method

validateBatchOptions(caseInsensitiveParams: Map[String, String]): Unit

validateBatchOptions…​FIXME

Note
validateBatchOptions is used exclusively when KafkaSourceProvider is requested to createSource.

kafkaParamsForDriver Method

kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): Map[String, Object]

kafkaParamsForDriver…​FIXME

Note
kafkaParamsForDriver is used when…​FIXME

kafkaParamsForExecutors Method

kafkaParamsForExecutors(
  specifiedKafkaParams: Map[String, String],
  uniqueGroupId: String): Map[String, Object]

kafkaParamsForExecutors sets the Kafka properties for executors.

While setting the properties, kafkaParamsForExecutors prints out the following DEBUG message to the logs:

executor: Set [key] to [value], earlier value: [value]
Note

kafkaParamsForExecutors is used when:

Looking Up failOnDataLoss Configuration Property — failOnDataLoss Internal Method

failOnDataLoss(caseInsensitiveParams: Map[String, String]): Boolean

failOnDataLoss simply looks up the failOnDataLoss configuration property in the given caseInsensitiveParams (in case-insensitive manner) or defaults to true.

Note
failOnDataLoss is used when KafkaSourceProvider is requested to createSource (for a KafkaSource), createMicroBatchReader (for a KafkaMicroBatchReader), createContinuousReader (for a KafkaContinuousReader), and createRelation (for a KafkaRelation).

results matching ""

    No results matching ""