// start Spark application like spark-shell with the following package
// --packages org.apache.spark:spark-sql-kafka-0-10_2.12:2.4.5
scala> val fromKafkaTopic1 = spark.
read.
format("kafka").
option("subscribe", "topic1"). // subscribe, subscribepattern, or assign
option("kafka.bootstrap.servers", "localhost:9092").
load("gauge_one")
KafkaSourceProvider
KafkaSourceProvider
is a DataSourceRegister and registers itself to handle kafka data source format.
Note
|
KafkaSourceProvider uses META-INF/services/org.apache.spark.sql.sources.DataSourceRegister file for the registration which is available in the source code of Apache Spark.
|
KafkaSourceProvider
is a RelationProvider and a CreatableRelationProvider.
KafkaSourceProvider
uses a fixed schema (and makes sure that a user did not set a custom one).
import org.apache.spark.sql.types.StructType
val schema = new StructType().add($"id".int)
scala> spark
.read
.format("kafka")
.option("subscribe", "topic1")
.option("kafka.bootstrap.servers", "localhost:9092")
.schema(schema) // <-- defining a custom schema is not supported
.load
org.apache.spark.sql.AnalysisException: kafka does not allow user-specified schemas.;
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
... 48 elided
Note
|
You can find more on Spark Structured Streaming in my gitbook Spark Structured Streaming. |
Creating BaseRelation — createRelation
Method (from RelationProvider)
createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation
Note
|
createRelation is part of RelationProvider Contract to create a BaseRelation (for reading or writing).
|
createRelation
starts by validating the Kafka options (for batch queries) in the input parameters
.
createRelation
collects all kafka.
-prefixed key options (in the input parameters
) and creates a local specifiedKafkaParams
with the keys without the kafka.
prefix (e.g. kafka.whatever
is simply whatever
).
createRelation
gets the desired KafkaOffsetRangeLimit with the startingoffsets
offset option key (in the given parameters
) and EarliestOffsetRangeLimit as the default offsets.
createRelation
makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws an AssertionError
.
createRelation
gets the desired KafkaOffsetRangeLimit, but this time with the endingoffsets
offset option key (in the given parameters
) and LatestOffsetRangeLimit as the default offsets.
createRelation
makes sure that the KafkaOffsetRangeLimit is not EarliestOffsetRangeLimit or throws a AssertionError
.
In the end, createRelation
creates a KafkaRelation with the subscription strategy (in the given parameters
), failOnDataLoss option, and the starting and ending offsets.
Validating Kafka Options (for Batch Queries) — validateBatchOptions
Internal Method
validateBatchOptions(caseInsensitiveParams: Map[String, String]): Unit
validateBatchOptions
gets the desired KafkaOffsetRangeLimit for the startingoffsets option in the input caseInsensitiveParams
and with EarliestOffsetRangeLimit as the default KafkaOffsetRangeLimit
.
validateBatchOptions
then matches the returned KafkaOffsetRangeLimit as follows:
-
EarliestOffsetRangeLimit is acceptable and
validateBatchOptions
simply does nothing -
LatestOffsetRangeLimit is not acceptable and
validateBatchOptions
throws anIllegalArgumentException
:starting offset can't be latest for batch queries on Kafka
-
SpecificOffsetRangeLimit is acceptable unless one of the offsets is -1L for which
validateBatchOptions
throws anIllegalArgumentException
:startingOffsets for [tp] can't be latest for batch queries on Kafka
Note
|
validateBatchOptions is used exclusively when KafkaSourceProvider is requested to create a BaseRelation (as a RelationProvider).
|
Writing DataFrame to Kafka Topic — createRelation
Method (from CreatableRelationProvider)
createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
df: DataFrame): BaseRelation
Note
|
createRelation is part of the CreatableRelationProvider Contract to write the rows of a structured query (a DataFrame) to an external data source.
|
createRelation
gets the topic option from the input parameters
.
createRelation
gets the Kafka-specific options for writing from the input parameters
.
createRelation
then uses the KafkaWriter
helper object to write the rows of the DataFrame to the Kafka topic.
In the end, createRelation
creates a fake BaseRelation that simply throws an UnsupportedOperationException
for all its methods.
createRelation
supports Append and ErrorIfExists only. createRelation
throws an AnalysisException
for the other save modes:
Save mode [mode] not allowed for Kafka. Allowed save modes are [Append] and [ErrorIfExists] (default).
sourceSchema
Method
sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType)
sourceSchema
…FIXME
val fromKafka = spark.read.format("kafka")...
scala> fromKafka.printSchema
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Note
|
sourceSchema is part of Structured Streaming’s StreamSourceProvider Contract.
|
Getting Desired KafkaOffsetRangeLimit (for Offset Option) — getKafkaOffsetRangeLimit
Object Method
getKafkaOffsetRangeLimit(
params: Map[String, String],
offsetOptionKey: String,
defaultOffsets: KafkaOffsetRangeLimit): KafkaOffsetRangeLimit
getKafkaOffsetRangeLimit
tries to find the given offsetOptionKey
in the input params
and converts the value found to a KafkaOffsetRangeLimit as follows:
-
latest
becomes LatestOffsetRangeLimit -
earliest
becomes EarliestOffsetRangeLimit -
For a JSON text,
getKafkaOffsetRangeLimit
uses theJsonUtils
helper object to read per-TopicPartition offsets from it and creates a SpecificOffsetRangeLimit
When the input offsetOptionKey
was not found, getKafkaOffsetRangeLimit
returns the input defaultOffsets
.
Note
|
|
Getting ConsumerStrategy per Subscription Strategy Option — strategy
Internal Method
strategy(caseInsensitiveParams: Map[String, String]): ConsumerStrategy
strategy
finds one of the strategy options: subscribe, subscribepattern and assign.
For assign, strategy
uses the JsonUtils
helper object to deserialize TopicPartitions from JSON (e.g. {"topicA":[0,1],"topicB":[0,1]}
) and returns a new AssignStrategy.
For subscribe, strategy
splits the value by ,
(comma) and returns a new SubscribeStrategy.
For subscribepattern, strategy
returns a new SubscribePatternStrategy
Note
|
|
failOnDataLoss
Internal Method
failOnDataLoss(caseInsensitiveParams: Map[String, String]): Boolean
failOnDataLoss
…FIXME
Note
|
failOnDataLoss is used when KafkaSourceProvider is requested to create a BaseRelation (and also in createSource and createContinuousReader for Spark Structured Streaming).
|
Setting Kafka Configuration Parameters for Driver — kafkaParamsForDriver
Object Method
kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): java.util.Map[String, Object]
kafkaParamsForDriver
simply sets the additional Kafka configuration parameters for the driver.
Tip
|
Enable Add the following line to
Refer to Logging. |
Note
|
|