KafkaConsumer — Primary Kafka Client
KafkaConsumer is a concrete Kafka client to consume records from Kafka topics for Kafka developers to write Kafka consumers.
KafkaConsumer is created with properties and (key and value) deserializers.
|
Note
|
bootstrap.servers and group.id properties are mandatory. Use ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG and ConsumerConfig.GROUP_ID_CONFIG values in your source code, respectively. |
val bootstrapServers = ":9092,localhost:9092"
val groupId = "kafka-sandbox"
import org.apache.kafka.clients.consumer.ConsumerConfig
val requiredConfigsOnly = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId
)
import org.apache.kafka.common.serialization.StringDeserializer
val keyDeserializer = new StringDeserializer
val valueDeserializer = new StringDeserializer
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = new KafkaConsumer[String, String](
requiredConfigsOnly.asJava,
keyDeserializer,
valueDeserializer)
Once created, KafkaConsumer is supposed to subscribe to topics or assign partitions.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import scala.collection.JavaConverters._
val topics = Seq("input").asJava
consumer.subscribe(topics)
KafkaConsumer is then requested to poll for records (in a loop).
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration.ZERO
val records = consumer.poll(ZERO)
records.asScala.foreach(println)
KafkaConsumer registers itself in JMX with kafka.consumer prefix.
|
Important
|
|
| Name | Description |
|---|---|
|
Zero or more PartitionAssignors Configured using ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG (aka Used exclusively to create the ConsumerCoordinator |
|
Used mainly (?) to create the Fetcher and ConsumerCoordinator Used also in poll, pollOnce and wakeup (but I think the usage should be limited to create Fetcher and ConsumerCoordinator) |
|
|
|
|
|
Created right when Used when…FIXME |
|
Used when…FIXME |
|
Created for a KafkaConsumer with the following:
Used to create a NetworkClient (for the ConsumerNetworkClient), the ConsumerNetworkClient itself, the ConsumerCoordinator and Fetcher Used in…FIXME |
|
|
|
retry.backoff.ms property or a user-defined value |
|
Corresponds to request.timeout.ms property
|
|
Created when |
|
Tip
|
Enable Add the following line to
Refer to Logging. |
(Manually) Assigning Partitions — assign Method
void assign(Collection<TopicPartition> partitions)
|
Note
|
assign is part of Consumer Contract to…FIXME.
|
assign…FIXME
unsubscribe Method
void unsubscribe()
|
Note
|
unsubscribe is part of Consumer Contract to…FIXME.
|
unsubscribe…FIXME
Subscribing to Topics — subscribe Method
void subscribe(Collection<String> topics) (1)
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
-
A short-hand for the other subscribe with
NoOpConsumerRebalanceListenerasConsumerRebalanceListener
subscribe subscribes KafkaConsumer to the given topics.
|
Note
|
subscribe is a part of Consumer Contract to…FIXME
|
val topics = Seq("topic1")
println(s"Subscribing to ${topics.mkString(", ")}")
import scala.collection.JavaConverters._
consumer.subscribe(topics.asJava)
Internally, subscribe prints out the following DEBUG message to the logs:
DEBUG Subscribed to topic(s): [comma-separated topics]
subscribe then requests SubscriptionState to subscribe for the topics and listener.
In the end, subscribe requests SubscriptionState for groupSubscription that it then passes along to Metadata to set the topics to track.
Poll For ConsumerRecords (per TopicPartitions) — poll Method
ConsumerRecords<K, V> poll(
Duration timeout)
|
Note
|
poll is part of the Consumer Contract to poll for ConsumerRecords.
|
poll polls for new records until timeout expires.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration.ZERO
while (true) {
println(s"Polling for records for $ZERO secs")
val records = consumer.poll(ZERO)
// do something with the records
// e.g. print them out to the console
records.asScala.foreach(println)
}
|
Note
|
KafkaConsumer has to be subscribed to topics or assigned partitions before calling poll.
|
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration
scala> val records = consumer.poll(Duration.ZERO)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1171)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
... 36 elided
commitSync Method
void commitSync()
|
Note
|
commitSync is part of Consumer Contract to…FIXME.
|
commitSync…FIXME
seek Method
void seek(TopicPartition partition, long offset)
|
Note
|
seek is part of Consumer Contract to…FIXME.
|
seek…FIXME
updateFetchPositions Internal Method
boolean updateFetchPositions(final Timer timer)
updateFetchPositions…FIXME
|
Note
|
updateFetchPositions is used when…FIXME
|
Polling One-Off for ConsumerRecords per TopicPartition — pollOnce Internal Method
Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout)
pollOnce…FIXME
|
Note
|
pollOnce is used exclusively when KafkaConsumer is requested to poll
|
Requesting Metadata for All Topics (From Brokers) — listTopics Method
Map<String, List<PartitionInfo>> listTopics()
Internally, listTopics simply requests Fetcher for metadata for all topics and returns it.
consumer.listTopics().asScala.foreach { case (name, partitions) =>
println(s"topic: $name (partitions: ${partitions.size()})")
}
|
Note
|
listTopics uses requestTimeoutMs that corresponds to request.timeout.ms property.
|
beginningOffsets Method
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
beginningOffsets requests Fetcher for beginningOffsets and returns it.
Creating KafkaConsumer Instance
KafkaConsumer takes the following when created:
-
Consumer configuration (that is converted internally to ConsumerConfig)
-
Deserializer for keys
-
Deserializer for values
KafkaConsumer initializes the internal registries and counters.
|
Note
|
KafkaConsumer API offers other constructors that in the end use the public 3-argument constructor that in turn passes the call on to the private internal constructor.
|
KafkaConsumer Public Constructor
// Public API
KafkaConsumer(
Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
When created, KafkaConsumer adds the keyDeserializer and valueDeserializer to configs (as key.deserializer and value.deserializer properties respectively) and creates a ConsumerConfig.
KafkaConsumer passes the call on to the internal constructor.
KafkaConsumer Internal Constructor
KafkaConsumer(
ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
When called, the internal KafkaConsumer constructor prints out the following DEBUG message to the logs:
DEBUG Starting the Kafka consumer
KafkaConsumer sets the internal requestTimeoutMs to request.timeout.ms property.
KafkaConsumer sets the internal clientId to client.id or generates one with prefix consumer- (starting from 1) if not set.
KafkaConsumer sets the internal Metrics (and JmxReporter with kafka.consumer prefix).
KafkaConsumer sets the internal retryBackoffMs to retry.backoff.ms property.
|
Caution
|
FIXME Finish me! |
KafkaConsumer creates the internal Metadata with the following arguments:
-
allowAutoTopicCreationenabled -
topicExpiryEnableddisabled -
ClusterResourceListeners with user-defined list of ConsumerInterceptors in interceptor.classes property
KafkaConsumer updates metadata with bootstrap.servers.
|
Caution
|
FIXME Finish me! |
KafkaConsumer creates a NetworkClient with…FIXME
|
Caution
|
FIXME Finish me! |
KafkaConsumer creates Fetcher with the following properties:
In the end, KafkaConsumer prints out the following DEBUG message to the logs:
DEBUG Kafka consumer created
Any issues while creating a KafkaConsumer are reported as KafkaException.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
wakeup Method
void wakeup()
|
Note
|
wakeup is a part of Consumer Contract.
|
wakeup simply requests ConsumerNetworkClient to wakeup.
|
Note
|
Quoting
Read about Selection in java.nio.channels.Selector's javadoc. |
|
Note
|
wakeup is used when…FIXME
|
Configuring ClusterResourceListeners — configureClusterResourceListeners Internal Method
ClusterResourceListeners configureClusterResourceListeners(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<?>... candidateLists)
configureClusterResourceListeners creates a ClusterResourceListeners and registers ClusterResourceListener instances from the input candidateLists, keyDeserializer and valueDeserializer.
|
Note
|
|
throwIfNoAssignorsConfigured Internal Method
void throwIfNoAssignorsConfigured()
throwIfNoAssignorsConfigured…FIXME
|
Note
|
throwIfNoAssignorsConfigured is used exclusively when KafkaConsumer is requested to subscribe to topics.
|
updateAssignmentMetadataIfNeeded Internal Method
boolean updateAssignmentMetadataIfNeeded(
Timer timer)
updateAssignmentMetadataIfNeeded requests the ConsumerCoordinator to poll until the Timer expires.
updateAssignmentMetadataIfNeeded returns false if the poll was unsuccessful, i.e. FIXME
If the poll was successful, updateAssignmentMetadataIfNeeded updateFetchPositions.
|
Note
|
updateAssignmentMetadataIfNeeded is used exclusively when KafkaConsumer is requested to poll for records.
|
commitAsync Method
void commitAsync()
void commitAsync(
OffsetCommitCallback callback)
void commitAsync(
Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback)
|
Note
|
commitAsync is part of the Consumer Contract to…FIXME.
|
commitAsync…FIXME
committed Method
OffsetAndMetadata committed(
TopicPartition partition)
OffsetAndMetadata committed(
TopicPartition partition,
Duration timeout)
|
Note
|
committed is part of the Consumer Contract to…FIXME.
|
committed…FIXME
close Method
void close()
void close(
Duration timeout)
|
Note
|
close is part of the Consumer Contract to…FIXME.
|
close…FIXME
position Method
long position(
TopicPartition partition)
long position(
TopicPartition partition,
Duration timeout)
|
Note
|
position is part of the Consumer contract.
|
position…FIXME
poll Internal Method
ConsumerRecords<K, V> poll(
Timer timer,
boolean includeMetadataInTimeout)
poll first acquireAndEnsureOpen.
poll requests the ConsumerNetworkClient to maybeTriggerWakeup.
poll…FIXME
pollForFetches Internal Method
Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(
Timer timer)
pollForFetches…FIXME
|
Note
|
pollForFetches is used exclusively when KafkaConsumer is requested to poll for records.
|