KafkaUtils — Creating Kafka DStreams and RDDs

KafkaUtils is the object with the factory methods to create input dstreams and RDDs from records in topics in Apache Kafka.

import org.apache.spark.streaming.kafka010.KafkaUtils

Enable WARN logging level for org.apache.spark.streaming.kafka010.KafkaUtils logger to see what happens inside.

Add the following line to conf/log4j.properties:


Refer to Logging.

Creating Kafka DStream — createDirectStream Method

createDirectStream[K, V](
  ssc: StreamingContext,
  locationStrategy: LocationStrategy,
  consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]]

createDirectStream is a method that creates a DirectKafkaInputDStream from a StreamingContext, LocationStrategy, and ConsumerStrategy.


Enable DEBUG logging level for org.apache.kafka.clients.consumer.KafkaConsumer logger to see what happens inside the Kafka consumer that is used to communicate with Kafka broker(s).

The following DEBUGs are from when a DirectKafkaInputDStream is started.

DEBUG KafkaConsumer: Starting the Kafka consumer
DEBUG KafkaConsumer: Kafka consumer created
DEBUG KafkaConsumer: Subscribed to topic(s): basic1, basic2, basic3

Add the following line to conf/log4j.properties:


Refer to Logging.

Using KafkaUtils.createDirectStream to Connect to Kafka Brokers
// Include org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT dependency in the CLASSPATH, e.g.
// $ ./bin/spark-shell --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT

import org.apache.spark.streaming._
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))

import org.apache.spark.streaming.kafka010._

val preferredHosts = LocationStrategies.PreferConsistent
val topics = List("topic1", "topic2", "topic3")
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-notes",
  "auto.offset.reset" -> "earliest"
import org.apache.kafka.common.TopicPartition
val offsets = Map(new TopicPartition("topic3", 0) -> 2L)

val dstream = KafkaUtils.createDirectStream[String, String](
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

dstream.foreachRDD { rdd =>
  // Get the offset ranges in the RDD
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")


// the above code is printing out topic details every 5 seconds
// until you stop it.

ssc.stop(stopSparkContext = false)

Creating Kafka RDD — createRDD Method

def createRDD[K, V](
  sc: SparkContext,
  kafkaParams: java.util.Map[String, Object],
  offsetRanges: Array[OffsetRange],
  locationStrategy: LocationStrategy): RDD[ConsumerRecord[K, V]]

createRDD creates a KafkaRDD.


fixKafkaParams Internal Method

fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit

fixKafkaParams fixes Kafka parameters to prevent any issues with communicating with Kafka on Spark executors.


results matching ""

    No results matching ""