KafkaConsumer — Main Class For Kafka Consumers

KafkaConsumer is the main public class that you and other Kafka developers use to write Kafka consumers that consume records from a Kafka cluster.

KafkaConsumer.png
Figure 1. KafkaConsumer

KafkaConsumer is a part of the public API and is created with properties and (key and value) deserializers as configuration.

Note
bootstrap.servers and group.id properties are mandatory. They usually appear in the code as ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG and ConsumerConfig.GROUP_ID_CONFIG values, respectively.

KafkaConsumer follows the Consumer contract.

// sandbox/kafka-sandbox
val bootstrapServers = "localhost:9092"
val groupId = "kafka-sandbox"
import org.apache.kafka.clients.consumer.ConsumerConfig
val configs: Map[String, Object] = Map(
  // required properties
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId
)
import org.apache.kafka.common.serialization.StringDeserializer
val keyDeserializer = new StringDeserializer
val valueDeserializer = new StringDeserializer

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = new KafkaConsumer[String, String](
  configs.asJava,
  keyDeserializer,
  valueDeserializer)

KafkaConsumer registers itself in JMX with kafka.consumer prefix.

Caution
FIXME How does the JMX registration happen?
Important

KafkaConsumer does not support multi-threaded access. You should use only one thread per KafkaConsumer instance.

KafkaConsumer uses light locks protecting itself from multi-threaded access and reports ConcurrentModificationException when it happens.

KafkaConsumer is not safe for multi-threaded access
Table 1. KafkaConsumer’s Internal Properties (e.g. Registries and Counters)
Name Description

client

ConsumerNetworkClient

Used mainly (?) to create the Fetcher and ConsumerCoordinator

Used also in poll, pollOnce and wakeup (but I think the usage should be limited to create Fetcher and ConsumerCoordinator)

clientId

coordinator

  • Initialized when KafkaConsumer is created

  • Used for…​FIXME

fetcher

Fetcher

Created right when KafkaConsumer is created.

Used when…​FIXME

interceptors

ConsumerInterceptors that holds ConsumerInterceptor instances (defined using interceptor.classes setting).

Used when…​FIXME

metadata

Metadata

Created right when KafkaConsumer is created.

Used when…​FIXME

metrics

Metrics

retryBackoffMs

retry.backoff.ms property or a user-defined value

requestTimeoutMs

Corresponds to request.timeout.ms property

KafkaConsumer reports ConfigException when smaller or equal than session.timeout.ms and fetch.max.wait.ms properties.

subscriptions

SubscriptionState for auto.offset.reset setting.

Created when KafkaConsumer is created.

Tip

Enable DEBUG or TRACE logging levels for org.apache.kafka.clients.consumer.KafkaConsumer logger to see what happens inside.

Add the following line to config/tools-log4j.properties:

log4j.logger.org.apache.kafka.clients.consumer.KafkaConsumer=TRACE

Refer to Logging.

(Manually) Assigning Partitions — assign Method

void assign(Collection<TopicPartition> partitions)
Note
assign is part of Consumer Contract to…​FIXME.

assign…​FIXME

unsubscribe Method

void unsubscribe()
Note
unsubscribe is part of Consumer Contract to…​FIXME.

unsubscribe…​FIXME

Subscribing to Topics — subscribe Method

void subscribe(Collection<String> topics) (1)
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
  1. A short-hand for the other subscribe with NoOpConsumerRebalanceListener as ConsumerRebalanceListener

subscribe subscribes KafkaConsumer to the given topics.

Note
subscribe is a part of Consumer Contract to…​FIXME
val topics = Seq("topic1")
println(s"Subscribing to ${topics.mkString(", ")}")

import scala.collection.JavaConverters._
consumer.subscribe(topics.asJava)

Internally, subscribe prints out the following DEBUG message to the logs:

DEBUG Subscribed to topic(s): [comma-separated topics]

subscribe then requests SubscriptionState to subscribe for the topics and listener.

In the end, subscribe requests SubscriptionState for groupSubscription that it then passes along to Metadata to set the topics to track.

KafkaConsumer subscribe.png
Figure 2. KafkaConsumer subscribes to topics

Poll Specified Milliseconds For ConsumerRecords per TopicPartitions — poll Method

ConsumerRecords<K, V> poll(long timeout)

poll polls for new records until timeout expires.

Note
KafkaConsumer has to be subscribed to some topics or assigned partitions before calling poll.
Note
The input timeout should be 0 or greater and represents the milliseconds to poll for records.
val seconds = 10
while (true) {
  println(s"Polling for records for $seconds secs")
  val records = consumer.poll(seconds * 1000)
  // do something with the records here
}

Internally, poll starts by polling once (for timeout milliseconds).

If there are records available, poll checks Fetcher for sendFetches and ConsumerNetworkClient for pendingRequestCount flag. If either is positive, poll requests ConsumerNetworkClient to pollNoWakeup.

Caution
FIXME Make the above more user-friendly

poll returns the available ConsumerRecords directly when no ConsumerInterceptors are defined or passes them through ConsumerInterceptors using onConsume.

Caution
FIXME Make the above more user-friendly, e.g. when could interceptors be empty?
KafkaConsumer poll.png
Figure 3. KafkaConsumer polls topics
Note
poll is a part of Consumer contract to…​FIXME

commitSync Method

void commitSync()
Note
commitSync is part of Consumer Contract to…​FIXME.

commitSync…​FIXME

seek Method

void seek(TopicPartition partition, long offset)
Note
seek is part of Consumer Contract to…​FIXME.

seek…​FIXME

Getting Partitions For Topic — partitionsFor Method

Caution
FIXME

endOffsets Method

Caution
FIXME

offsetsForTimes Method

Caution
FIXME

updateFetchPositions Method

Caution
FIXME

Polling One-Off for ConsumerRecords per TopicPartition — pollOnce Internal Method

Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout)

pollOnce…​FIXME

Note
pollOnce is used exclusively when KafkaConsumer is requested to poll

Requesting Metadata for All Topics (From Brokers) — listTopics Method

Map<String, List<PartitionInfo>> listTopics()

Internally, listTopics simply requests Fetcher for metadata for all topics and returns it.

consumer.listTopics().asScala.foreach { case (name, partitions) =>
  println(s"topic: $name (partitions: ${partitions.size()})")
}
Note
listTopics uses requestTimeoutMs that corresponds to request.timeout.ms property.

beginningOffsets Method

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)

beginningOffsets requests Fetcher for beginningOffsets and returns it.

Creating KafkaConsumer Instance

KafkaConsumer takes the following when created:

KafkaConsumer initializes the internal registries and counters.

Note
KafkaConsumer API offers other constructors that in the end use the public 3-argument constructor that in turn passes the call on to the private internal constructor.

KafkaConsumer Public Constructor

// Public API
KafkaConsumer(
  Map<String, Object> configs,
  Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer)

When created, KafkaConsumer adds the keyDeserializer and valueDeserializer to configs (as key.deserializer and value.deserializer properties respectively) and creates a ConsumerConfig.

KafkaConsumer passes the call on to the internal constructor.

KafkaConsumer Internal Constructor

KafkaConsumer(
  ConsumerConfig config,
  Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer)

When called, the internal KafkaConsumer constructor prints out the following DEBUG message to the logs:

DEBUG Starting the Kafka consumer

KafkaConsumer sets the internal requestTimeoutMs to request.timeout.ms property.

KafkaConsumer sets the internal clientId to client.id or generates one with prefix consumer- (starting from 1) if not set.

KafkaConsumer sets the internal Metrics (and JmxReporter with kafka.consumer prefix).

KafkaConsumer sets the internal retryBackoffMs to retry.backoff.ms property.

Caution
FIXME Finish me!

KafkaConsumer creates the internal Metadata with the following arguments:

  1. retryBackoffMs

  2. metadata.max.age.ms

  3. allowAutoTopicCreation enabled

  4. topicExpiryEnabled disabled

  5. ClusterResourceListeners with user-defined list of ConsumerInterceptors in interceptor.classes property

KafkaConsumer updates metadata with bootstrap.servers.

Caution
FIXME Finish me!

KafkaConsumer creates a NetworkClient with…​FIXME

Caution
FIXME Finish me!

KafkaConsumer creates Fetcher with the following properties:

In the end, KafkaConsumer prints out the following DEBUG message to the logs:

DEBUG Kafka consumer created

Any issues while creating a KafkaConsumer are reported as KafkaException.

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

wakeup Method

void wakeup()
Note
wakeup is a part of Consumer Contract.

wakeup simply requests ConsumerNetworkClient to wakeup.

KafkaConsumer wakeup.png
Figure 4. KafkaConsumer’s wakeup Method
Note

Quoting wakeup of Java’s java.nio.channels.Selector given wakeup simply passes through the intermediaries and in the end triggers it.

Causes the first selection operation that has not yet returned to return immediately.

Read about Selection in java.nio.channels.Selector's javadoc.

Note
wakeup is used when…​FIXME

Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method

ClusterResourceListeners configureClusterResourceListeners(
  Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer,
  List<?>... candidateLists)

configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keyDeserializer and valueDeserializer.

Note

configureClusterResourceListeners is used exclusively when KafkaConsumer is created (to create the Metadata) with the following input arguments:

results matching ""

    No results matching ""