ConsumerStrategy — Kafka Consumers' Post-Configuration API

ConsumerStrategy is a contract to create Kafka Consumers in a Spark Streaming application that allows for their custom configuration after the consumers have been created.

Note
Kafka consumers read records from topic partitions in a Kafka cluster.

ConsumerStrategy[K, V] is an abstract class with two methods, i.e. executorKafkaParams and onStart.

Table 1. ConsumerStrategy Contract and DirectKafkaInputDStream
Consumer Strategy DirectKafkaInputDStream Usage

executorKafkaParams

Used when a DirectKafkaInputDStream is created to initialize internal state.

onStart

Used to create a Kafka consumer (in DirectKafkaInputDStream)

The following table are the Kafka Consumer strategies currently available in Spark 2.0.

Table 2. Kafka Consumer Strategies in Spark Streaming
Consumer Strategy Description

Assign

Subscribe

SubscribePattern

You can access the predefined ConsumerStrategy implementations using ConsumerStrategies factory object.

import org.apache.spark.streaming.kafka010.ConsumerStrategies

val topics = List("topic1")
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map(
  "bootstrap.servers" -> "localhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "spark-streaming-notes",
  "auto.offset.reset" -> "earliest"
)
import org.apache.kafka.common.TopicPartition
val offsets = Map(new TopicPartition("topic3", 0) -> 2L)

val subscribeStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)

ConsumerStrategy Contract

executorKafkaParams Method

executorKafkaParams: ju.Map[String, Object]

onStart Method

onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]

Assign Strategy

class Assign[K, V](
  topicPartitions: java.util.Collection[TopicPartition],
  kafkaParams: java.util.Map[String, Object],
  offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]

Assign returns the input kafkaParams directly from executorKafkaParams method.

For onStart, Assign creates a KafkaConsumer (with kafkaParams) and explicitly assigns the list of partitions topicPartitions to this consumer (using Kafka’s KafkaConsumer.assign method). It then overrides the fetch offsets that the consumer will use (on the next poll) to onStart's input currentOffsets or offsets whatever is not empty (using Kafka’s KafkaConsumer.seek method).

Subscribe Strategy

class Subscribe[K, V](
  topics: java.util.Collection[jl.String],
  kafkaParams: java.util.Map[String, Object],
  offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]

Subscribe returns the input kafkaParams directly from executorKafkaParams method.

For onStart, Subscribe creates a KafkaConsumer (with kafkaParams) and subscribes to topics (using Kafka’s KafkaConsumer.subscribe method). For non-empty currentOffsets or offsets (whatever is not empty in that order), onStart polls data for topics or partitions (using Kafka’s KafkaConsumer.poll method). It then overrides the fetch offsets that the consumer will use (on the next poll) to onStart's input currentOffsets or offsets whatever is not empty (using Kafka’s KafkaConsumer.seek method).

Tip
You can suppress Kafka’s NoOffsetForPartitionException with Kafka’s auto.offset.reset setting set to NONE in kafkaParams.

In case of Kafka’s NoOffsetForPartitionException with exception suppression enabled, you can see the following WARN message in the logs:

WARN Catching NoOffsetForPartitionException since auto.offset.reset is none.  See KAFKA-3370
??? FIXME Example with the WARN above

SubscribePattern Strategy

class SubscribePattern[K, V](
    pattern: java.util.regex.Pattern,
    kafkaParams: java.util.Map[String, Object],
    offsets: java.util.Map[TopicPartition, java.util.Long]
) extends ConsumerStrategy[K, V]

SubscribePattern returns the input kafkaParams directly from executorKafkaParams method.

For onStart, SubscribePattern creates a KafkaConsumer (with kafkaParams) and subscribes to pattern topics with Kafka’s internal NoOpConsumerRebalanceListener (using Kafka’s KafkaConsumer.subscribe method).

Note
The only difference between SubscribePattern and Subscribe Consumer strategies is the use of Kafka’s KafkaConsumer.subscribe(Collection, ConsumerRebalanceListener) and KafkaConsumer.subscribe(Collection) methods, respectively.

results matching ""

    No results matching ""