log4j.logger.org.apache.spark.sql.kafka010.KafkaSourceProvider=ALL
KafkaSourceProvider — Data Source Provider for Apache Kafka
KafkaSourceProvider
is a DataSourceRegister
and registers a developer-friendly alias for kafka data source format in Spark Structured Streaming.
Tip
|
Read up on DataSourceRegister in The Internals of Spark SQL book. |
KafkaSourceProvider
supports micro-batch stream processing (through MicroBatchReadSupport contract) and creates a specialized KafkaMicroBatchReader.
KafkaSourceProvider
requires the following options (that you can set using option
method of DataStreamReader or DataStreamWriter):
-
Exactly one of the following options: subscribe, subscribePattern or assign
Tip
|
Refer to Kafka Data Source’s Options for the supported configuration options. |
Internally, KafkaSourceProvider
sets the properties for Kafka Consumers on executors (that are passed on to InternalKafkaConsumer
when requested to create a Kafka consumer with a single TopicPartition
manually assigned).
ConsumerConfig’s Key | Value | Description |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Only when not set in the specifiedKafkaParams already |
Tip
|
Enable Add the following line to Refer to Logging. |
Creating Streaming Source — createSource
Method
createSource(
sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source
Note
|
createSource is part of the StreamSourceProvider Contract to create a streaming source for a format or system (to continually read data).
|
createSource
first validates stream options.
createSource
…FIXME
Validating General Options For Batch And Streaming Queries — validateGeneralOptions
Internal Method
validateGeneralOptions(parameters: Map[String, String]): Unit
Note
|
Parameters are case-insensitive, i.e. OptioN and option are equal.
|
validateGeneralOptions
makes sure that exactly one topic subscription strategy is used in parameters
and can be:
-
subscribe
-
subscribepattern
-
assign
validateGeneralOptions
reports an IllegalArgumentException
when there is no subscription strategy in use or there are more than one strategies used.
validateGeneralOptions
makes sure that the value of subscription strategies meet the requirements:
-
assign
strategy starts with{
(the opening curly brace) -
subscribe
strategy has at least one topic (in a comma-separated list of topics) -
subscribepattern
strategy has the pattern defined
validateGeneralOptions
makes sure that group.id
has not been specified and reports an IllegalArgumentException
otherwise.
Kafka option 'group.id' is not supported as user-specified consumer groups are not used to track offsets.
validateGeneralOptions
makes sure that auto.offset.reset
has not been specified and reports an IllegalArgumentException
otherwise.
Kafka option 'auto.offset.reset' is not supported. Instead set the source option 'startingoffsets' to 'earliest' or 'latest' to specify where to start. Structured Streaming manages which offsets are consumed internally, rather than relying on the kafkaConsumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that 'startingoffsets' only applies when a new Streaming query is started, and that resuming will always pick up from where the query left off. See the docs for more details.
validateGeneralOptions
makes sure that the following options have not been specified and reports an IllegalArgumentException
otherwise:
-
kafka.key.deserializer
-
kafka.value.deserializer
-
kafka.enable.auto.commit
-
kafka.interceptor.classes
In the end, validateGeneralOptions
makes sure that kafka.bootstrap.servers
option was specified and reports an IllegalArgumentException
otherwise.
Option 'kafka.bootstrap.servers' must be specified for configuring Kafka consumer
Creating ConsumerStrategy — strategy
Internal Method
strategy(caseInsensitiveParams: Map[String, String])
Internally, strategy
finds the keys in the input caseInsensitiveParams
that are one of the following and creates a corresponding ConsumerStrategy.
Key | ConsumerStrategy |
---|---|
|
AssignStrategy with Kafka’s TopicPartitions.
The topic names and partitions are mapped directly to Kafka’s |
|
SubscribeStrategy with topic names
|
|
SubscribePatternStrategy with topic subscription regex pattern (that uses Java’s java.util.regex.Pattern for the pattern), e.g.
|
Note
|
|
Describing Streaming Source with Name and Schema — sourceSchema
Method
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
Note
|
sourceSchema is part of the StreamSourceProvider Contract to describe a streaming source with a name and the schema.
|
sourceSchema
gives the short name (i.e. kafka
) and the fixed schema.
Internally, sourceSchema
validates Kafka options and makes sure that the optional input schema
is indeed undefined.
When the input schema
is defined, sourceSchema
reports a IllegalArgumentException
.
Kafka source has a fixed schema and cannot be set with a custom one
Validating Kafka Options for Streaming Queries — validateStreamOptions
Internal Method
validateStreamOptions(caseInsensitiveParams: Map[String, String]): Unit
Firstly, validateStreamOptions
makes sure that endingoffsets
option is not used. Otherwise, validateStreamOptions
reports a IllegalArgumentException
.
ending offset not valid in streaming queries
validateStreamOptions
then validates the general options.
Note
|
validateStreamOptions is used when KafkaSourceProvider is requested the schema for Kafka source and to create a KafkaSource.
|
Creating ContinuousReader for Continuous Stream Processing — createContinuousReader
Method
createContinuousReader(
schema: Optional[StructType],
metadataPath: String,
options: DataSourceOptions): KafkaContinuousReader
Note
|
createContinuousReader is part of the ContinuousReadSupport Contract to create a ContinuousReader.
|
createContinuousReader
…FIXME
Converting Configuration Options to KafkaOffsetRangeLimit — getKafkaOffsetRangeLimit
Object Method
getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit
getKafkaOffsetRangeLimit
finds the given offsetOptionKey
in the params
and does the following conversion:
-
latest becomes LatestOffsetRangeLimit
-
earliest becomes EarliestOffsetRangeLimit
-
A JSON-formatted text becomes SpecificOffsetRangeLimit
-
When the given
offsetOptionKey
is not found,getKafkaOffsetRangeLimit
returns the givendefaultOffsets
Note
|
getKafkaOffsetRangeLimit is used when KafkaSourceProvider is requested to createSource, createMicroBatchReader, createContinuousReader, createRelation, and validateBatchOptions.
|
Creating MicroBatchReader for Micro-Batch Stream Processing — createMicroBatchReader
Method
createMicroBatchReader(
schema: Optional[StructType],
metadataPath: String,
options: DataSourceOptions): KafkaMicroBatchReader
Note
|
createMicroBatchReader is part of the MicroBatchReadSupport Contract to create a MicroBatchReader in Micro-Batch Stream Processing.
|
createMicroBatchReader
validateStreamOptions (in the given DataSourceOptions
).
createMicroBatchReader
generates a unique group ID of the format spark-kafka-source-[randomUUID]-[metadataPath_hashCode] (to make sure that a new streaming query creates a new consumer group).
createMicroBatchReader
finds all the parameters (in the given DataSourceOptions
) that start with kafka. prefix, removes the prefix, and creates the current Kafka parameters.
createMicroBatchReader
creates a KafkaOffsetReader with the following:
-
strategy (in the given
DataSourceOptions
) -
Properties for Kafka consumers on the driver (given the current Kafka parameters, i.e. without kafka. prefix)
-
The given
DataSourceOptions
-
spark-kafka-source-[randomUUID]-[metadataPath_hashCode]-driver for the
driverGroupIdPrefix
In the end, createMicroBatchReader
creates a KafkaMicroBatchReader with the following:
-
the
KafkaOffsetReader
-
Properties for Kafka consumers on executors (given the current Kafka parameters, i.e. without kafka. prefix) and the unique group ID (
spark-kafka-source-[randomUUID]-[metadataPath_hashCode]-driver
) -
The given
DataSourceOptions
and themetadataPath
-
Starting stream offsets (startingOffsets option with the default of
LatestOffsetRangeLimit
offsets)
Creating BaseRelation — createRelation
Method
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
Note
|
createRelation is part of the RelationProvider contract to create a BaseRelation .
|
createRelation
…FIXME
Validating Configuration Options for Batch Processing — validateBatchOptions
Internal Method
validateBatchOptions(caseInsensitiveParams: Map[String, String]): Unit
validateBatchOptions
…FIXME
Note
|
validateBatchOptions is used exclusively when KafkaSourceProvider is requested to createSource.
|
kafkaParamsForDriver
Method
kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): Map[String, Object]
kafkaParamsForDriver
…FIXME
Note
|
kafkaParamsForDriver is used when…FIXME
|
kafkaParamsForExecutors
Method
kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): Map[String, Object]
kafkaParamsForExecutors
sets the Kafka properties for executors.
While setting the properties, kafkaParamsForExecutors
prints out the following DEBUG message to the logs:
executor: Set [key] to [value], earlier value: [value]
Note
|
|
Looking Up failOnDataLoss Configuration Property — failOnDataLoss
Internal Method
failOnDataLoss(caseInsensitiveParams: Map[String, String]): Boolean
failOnDataLoss
simply looks up the failOnDataLoss
configuration property in the given caseInsensitiveParams
(in case-insensitive manner) or defaults to true
.
Note
|
failOnDataLoss is used when KafkaSourceProvider is requested to createSource (for a KafkaSource), createMicroBatchReader (for a KafkaMicroBatchReader), createContinuousReader (for a KafkaContinuousReader), and createRelation (for a KafkaRelation).
|