KafkaConsumer — Primary Kafka Client

KafkaConsumer is a concrete Kafka client to consume records from Kafka topics for Kafka developers to write Kafka consumers.

KafkaConsumer.png
Figure 1. KafkaConsumer

KafkaConsumer is created with properties and (key and value) deserializers.

Note

bootstrap.servers and group.id properties are mandatory.

Use ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG and ConsumerConfig.GROUP_ID_CONFIG values in your source code, respectively.

val bootstrapServers = ":9092,localhost:9092"
val groupId = "kafka-sandbox"
import org.apache.kafka.clients.consumer.ConsumerConfig
val requiredConfigsOnly = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId
)
import org.apache.kafka.common.serialization.StringDeserializer
val keyDeserializer = new StringDeserializer
val valueDeserializer = new StringDeserializer

import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = new KafkaConsumer[String, String](
  requiredConfigsOnly.asJava,
  keyDeserializer,
  valueDeserializer)

Once created, KafkaConsumer is supposed to subscribe to topics or assign partitions.

scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]

import scala.collection.JavaConverters._
val topics = Seq("input").asJava
consumer.subscribe(topics)

KafkaConsumer is then requested to poll for records (in a loop).

scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]

import java.time.Duration.ZERO
val records = consumer.poll(ZERO)
records.asScala.foreach(println)

KafkaConsumer registers itself in JMX with kafka.consumer prefix.

Important

KafkaConsumer is not thread-safe, i.e. you should not use the same single instance of KafkaConsumer from multiple threads. You should use only one thread per KafkaConsumer instance.

KafkaConsumer uses light locks protecting itself from multi-threaded access and reports ConcurrentModificationException when it happens.

KafkaConsumer is not safe for multi-threaded access
Table 1. KafkaConsumer’s Internal Properties (e.g. Registries and Counters)
Name Description

assignors

Zero or more PartitionAssignors

Configured using ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG (aka partition.assignment.strategy) configuration property

Used exclusively to create the ConsumerCoordinator

client

ConsumerNetworkClient

Used mainly (?) to create the Fetcher and ConsumerCoordinator

Used also in poll, pollOnce and wakeup (but I think the usage should be limited to create Fetcher and ConsumerCoordinator)

clientId

coordinator

fetcher

Fetcher

Created right when KafkaConsumer is created.

Used when…​FIXME

interceptors

ConsumerInterceptors that holds ConsumerInterceptor instances (defined using interceptor.classes setting).

Used when…​FIXME

metadata

Created for a KafkaConsumer with the following:

metadata is requested to update itself immediately

Used to create a NetworkClient (for the ConsumerNetworkClient), the ConsumerNetworkClient itself, the ConsumerCoordinator and Fetcher

Used in…​FIXME

metrics

Metrics

retryBackoffMs

retry.backoff.ms property or a user-defined value

requestTimeoutMs

Corresponds to request.timeout.ms property

KafkaConsumer reports ConfigException when smaller or equal than session.timeout.ms and fetch.max.wait.ms properties.

subscriptions

SubscriptionState for auto.offset.reset setting.

Created when KafkaConsumer is created

Tip

Enable DEBUG or TRACE logging levels for org.apache.kafka.clients.consumer.KafkaConsumer logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.org.apache.kafka.clients.consumer.KafkaConsumer=TRACE

Refer to Logging.

(Manually) Assigning Partitions — assign Method

void assign(Collection<TopicPartition> partitions)
Note
assign is part of Consumer Contract to…​FIXME.

assign…​FIXME

unsubscribe Method

void unsubscribe()
Note
unsubscribe is part of Consumer Contract to…​FIXME.

unsubscribe…​FIXME

Subscribing to Topics — subscribe Method

void subscribe(Collection<String> topics) (1)
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
  1. A short-hand for the other subscribe with NoOpConsumerRebalanceListener as ConsumerRebalanceListener

subscribe subscribes KafkaConsumer to the given topics.

Note
subscribe is a part of Consumer Contract to…​FIXME
val topics = Seq("topic1")
println(s"Subscribing to ${topics.mkString(", ")}")

import scala.collection.JavaConverters._
consumer.subscribe(topics.asJava)

Internally, subscribe prints out the following DEBUG message to the logs:

DEBUG Subscribed to topic(s): [comma-separated topics]

subscribe then requests SubscriptionState to subscribe for the topics and listener.

In the end, subscribe requests SubscriptionState for groupSubscription that it then passes along to Metadata to set the topics to track.

KafkaConsumer subscribe.png
Figure 2. KafkaConsumer subscribes to topics

Poll For ConsumerRecords (per TopicPartitions) — poll Method

ConsumerRecords<K, V> poll(final Duration timeout)
Note
poll is part of the Consumer Contract to poll for ConsumerRecords.
KafkaConsumer poll.png
Figure 3. KafkaConsumer polls topics

poll polls for new records until timeout expires.

scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]

import java.time.Duration.ZERO
while (true) {
  println(s"Polling for records for $ZERO secs")
  val records = consumer.poll(ZERO)
  // do something with the records
  // e.g. print them out to the console
  records.asScala.foreach(println)
}
Note
KafkaConsumer has to be subscribed to topics or assigned partitions before calling poll.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]

import java.time.Duration
scala> val records = consumer.poll(Duration.ZERO)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1171)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
  ... 36 elided

Internally, poll simply calls the internal poll method with the Time that expires after the given timeout and the includeMetadataInTimeout flag on.

poll Internal Method

ConsumerRecords<K, V> poll(
  final Timer timer,
  final boolean includeMetadataInTimeout)

poll first acquireAndEnsureOpen.

poll…​FIXME

commitSync Method

void commitSync()
Note
commitSync is part of Consumer Contract to…​FIXME.

commitSync…​FIXME

seek Method

void seek(TopicPartition partition, long offset)
Note
seek is part of Consumer Contract to…​FIXME.

seek…​FIXME

Getting Partitions For Topic — partitionsFor Method

Caution
FIXME

endOffsets Method

Caution
FIXME

offsetsForTimes Method

Caution
FIXME

updateFetchPositions Internal Method

boolean updateFetchPositions(final Timer timer)

updateFetchPositions…​FIXME

Note
updateFetchPositions is used when…​FIXME

Polling One-Off for ConsumerRecords per TopicPartition — pollOnce Internal Method

Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout)

pollOnce…​FIXME

Note
pollOnce is used exclusively when KafkaConsumer is requested to poll

Requesting Metadata for All Topics (From Brokers) — listTopics Method

Map<String, List<PartitionInfo>> listTopics()

Internally, listTopics simply requests Fetcher for metadata for all topics and returns it.

consumer.listTopics().asScala.foreach { case (name, partitions) =>
  println(s"topic: $name (partitions: ${partitions.size()})")
}
Note
listTopics uses requestTimeoutMs that corresponds to request.timeout.ms property.

beginningOffsets Method

Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)

beginningOffsets requests Fetcher for beginningOffsets and returns it.

Creating KafkaConsumer Instance

KafkaConsumer takes the following when created:

KafkaConsumer initializes the internal registries and counters.

Note
KafkaConsumer API offers other constructors that in the end use the public 3-argument constructor that in turn passes the call on to the private internal constructor.

KafkaConsumer Public Constructor

// Public API
KafkaConsumer(
  Map<String, Object> configs,
  Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer)

When created, KafkaConsumer adds the keyDeserializer and valueDeserializer to configs (as key.deserializer and value.deserializer properties respectively) and creates a ConsumerConfig.

KafkaConsumer passes the call on to the internal constructor.

KafkaConsumer Internal Constructor

KafkaConsumer(
  ConsumerConfig config,
  Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer)

When called, the internal KafkaConsumer constructor prints out the following DEBUG message to the logs:

DEBUG Starting the Kafka consumer

KafkaConsumer sets the internal requestTimeoutMs to request.timeout.ms property.

KafkaConsumer sets the internal clientId to client.id or generates one with prefix consumer- (starting from 1) if not set.

KafkaConsumer sets the internal Metrics (and JmxReporter with kafka.consumer prefix).

KafkaConsumer sets the internal retryBackoffMs to retry.backoff.ms property.

Caution
FIXME Finish me!

KafkaConsumer creates the internal Metadata with the following arguments:

  1. retryBackoffMs

  2. metadata.max.age.ms

  3. allowAutoTopicCreation enabled

  4. topicExpiryEnabled disabled

  5. ClusterResourceListeners with user-defined list of ConsumerInterceptors in interceptor.classes property

KafkaConsumer updates metadata with bootstrap.servers.

Caution
FIXME Finish me!

KafkaConsumer creates a NetworkClient with…​FIXME

Caution
FIXME Finish me!

KafkaConsumer creates Fetcher with the following properties:

In the end, KafkaConsumer prints out the following DEBUG message to the logs:

DEBUG Kafka consumer created

Any issues while creating a KafkaConsumer are reported as KafkaException.

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

wakeup Method

void wakeup()
Note
wakeup is a part of Consumer Contract.

wakeup simply requests ConsumerNetworkClient to wakeup.

KafkaConsumer wakeup.png
Figure 4. KafkaConsumer’s wakeup Method
Note

Quoting wakeup of Java’s java.nio.channels.Selector given wakeup simply passes through the intermediaries and in the end triggers it.

Causes the first selection operation that has not yet returned to return immediately.

Read about Selection in java.nio.channels.Selector's javadoc.

Note
wakeup is used when…​FIXME

Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method

ClusterResourceListeners configureClusterResourceListeners(
  Deserializer<K> keyDeserializer,
  Deserializer<V> valueDeserializer,
  List<?>... candidateLists)

configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keyDeserializer and valueDeserializer.

Note

configureClusterResourceListeners is used exclusively when KafkaConsumer is created (to create the Metadata) with the following input arguments:

throwIfNoAssignorsConfigured Internal Method

void throwIfNoAssignorsConfigured()

throwIfNoAssignorsConfigured…​FIXME

Note
throwIfNoAssignorsConfigured is used exclusively when KafkaConsumer is requested to subscribe to topics.

updateAssignmentMetadataIfNeeded Internal Method

boolean updateAssignmentMetadataIfNeeded(final Timer timer)

updateAssignmentMetadataIfNeeded requests the ConsumerCoordinator to poll until the Timer expires.

updateAssignmentMetadataIfNeeded returns false if the poll was unsuccessful, i.e. FIXME

If the poll was successful, updateAssignmentMetadataIfNeeded updateFetchPositions.

Note
updateAssignmentMetadataIfNeeded is used exclusively when KafkaConsumer is requested to poll for records.

pollForFetches Internal Method

Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer)

pollForFetches…​FIXME

Note
pollForFetches is used exclusively when KafkaConsumer is requested to poll for records.

commitAsync Method

void commitAsync()
void commitAsync(
  OffsetCommitCallback callback)
void commitAsync(
  final Map<TopicPartition, OffsetAndMetadata> offsets,
  OffsetCommitCallback callback)
Note
commitAsync is part of the Consumer Contract to…​FIXME.

commitAsync…​FIXME

committed Method

OffsetAndMetadata committed(
  TopicPartition partition)
OffsetAndMetadata committed(
  TopicPartition partition,
  final Duration timeout)
Note
committed is part of the Consumer Contract to…​FIXME.

committed…​FIXME

close Method

void close()
void close(Duration timeout)
Note
close is part of the Consumer Contract to…​FIXME.

close…​FIXME

close Internal Method

void close(long timeoutMs, boolean swallowException)

close…​FIXME

results matching ""

    No results matching ""