KafkaSink

KafkaSink is a streaming sink that KafkaSourceProvider registers as the kafka format.

// start spark-shell or a Spark application with spark-sql-kafka-0-10 module
// spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0-SNAPSHOT
import org.apache.spark.sql.SparkSession
val spark: SparkSession = ...
spark.
  readStream.
  format("text").
  load("server-logs/*.out").
  as[String].
  writeStream.
  queryName("server-logs processor").
  format("kafka").  // <-- uses KafkaSink
  option("topic", "topic1").
  option("checkpointLocation", "/tmp/kafka-sink-checkpoint"). // <-- mandatory
  start

// in another terminal
$ echo hello > server-logs/hello.out

// in the terminal with Spark
FIXME

Creating KafkaSink Instance

KafkaSink takes the following when created:

  • SQLContext

  • Kafka parameters (used on executor) as a map of (String, Object) pairs

  • Optional topic name

addBatch Method

addBatch(batchId: Long, data: DataFrame): Unit

Internally, addBatch requests KafkaWriter to write the input data to the topic (if defined) or a topic in executorKafkaParams.

Note
addBatch is a part of Sink Contract to "add" a batch of data to the sink.

results matching ""

    No results matching ""