KafkaConsumer — Primary Kafka Client
KafkaConsumer
is a concrete Kafka client to consume records from Kafka topics for Kafka developers to write Kafka consumers.
KafkaConsumer
is created with properties and (key and value) deserializers.
Note
|
bootstrap.servers and group.id properties are mandatory. Use ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG and ConsumerConfig.GROUP_ID_CONFIG values in your source code, respectively. |
val bootstrapServers = ":9092,localhost:9092"
val groupId = "kafka-sandbox"
import org.apache.kafka.clients.consumer.ConsumerConfig
val requiredConfigsOnly = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId
)
import org.apache.kafka.common.serialization.StringDeserializer
val keyDeserializer = new StringDeserializer
val valueDeserializer = new StringDeserializer
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer
val consumer = new KafkaConsumer[String, String](
requiredConfigsOnly.asJava,
keyDeserializer,
valueDeserializer)
Once created, KafkaConsumer
is supposed to subscribe to topics or assign partitions.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import scala.collection.JavaConverters._
val topics = Seq("input").asJava
consumer.subscribe(topics)
KafkaConsumer
is then requested to poll for records (in a loop).
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration.ZERO
val records = consumer.poll(ZERO)
records.asScala.foreach(println)
KafkaConsumer
registers itself in JMX with kafka.consumer prefix.
Important
|
|
Name | Description |
---|---|
|
Zero or more PartitionAssignors Configured using ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG (aka Used exclusively to create the ConsumerCoordinator |
|
Used mainly (?) to create the Fetcher and ConsumerCoordinator Used also in poll, pollOnce and wakeup (but I think the usage should be limited to create Fetcher and ConsumerCoordinator) |
|
|
|
|
|
Created right when Used when…FIXME |
|
Used when…FIXME |
|
Created for a KafkaConsumer with the following:
Used to create a NetworkClient (for the ConsumerNetworkClient), the ConsumerNetworkClient itself, the ConsumerCoordinator and Fetcher Used in…FIXME |
|
|
|
retry.backoff.ms property or a user-defined value |
|
Corresponds to request.timeout.ms property
|
|
Created when |
Tip
|
Enable Add the following line to
Refer to Logging. |
(Manually) Assigning Partitions — assign
Method
void assign(Collection<TopicPartition> partitions)
Note
|
assign is part of Consumer Contract to…FIXME.
|
assign
…FIXME
unsubscribe
Method
void unsubscribe()
Note
|
unsubscribe is part of Consumer Contract to…FIXME.
|
unsubscribe
…FIXME
Subscribing to Topics — subscribe
Method
void subscribe(Collection<String> topics) (1)
void subscribe(Collection<String> topics, ConsumerRebalanceListener listener)
void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
-
A short-hand for the other subscribe with
NoOpConsumerRebalanceListener
asConsumerRebalanceListener
subscribe
subscribes KafkaConsumer
to the given topics.
Note
|
subscribe is a part of Consumer Contract to…FIXME
|
val topics = Seq("topic1")
println(s"Subscribing to ${topics.mkString(", ")}")
import scala.collection.JavaConverters._
consumer.subscribe(topics.asJava)
Internally, subscribe
prints out the following DEBUG message to the logs:
DEBUG Subscribed to topic(s): [comma-separated topics]
subscribe
then requests SubscriptionState to subscribe
for the topics
and listener
.
In the end, subscribe
requests SubscriptionState for groupSubscription
that it then passes along to Metadata to set the topics to track.
Poll For ConsumerRecords (per TopicPartitions) — poll
Method
ConsumerRecords<K, V> poll(
Duration timeout)
Note
|
poll is part of the Consumer Contract to poll for ConsumerRecords.
|
poll
polls for new records until timeout
expires.
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration.ZERO
while (true) {
println(s"Polling for records for $ZERO secs")
val records = consumer.poll(ZERO)
// do something with the records
// e.g. print them out to the console
records.asScala.foreach(println)
}
Note
|
KafkaConsumer has to be subscribed to topics or assigned partitions before calling poll.
|
scala> :type consumer
org.apache.kafka.clients.consumer.KafkaConsumer[String,String]
import java.time.Duration
scala> val records = consumer.poll(Duration.ZERO)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1171)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
... 36 elided
commitSync
Method
void commitSync()
Note
|
commitSync is part of Consumer Contract to…FIXME.
|
commitSync
…FIXME
seek
Method
void seek(TopicPartition partition, long offset)
Note
|
seek is part of Consumer Contract to…FIXME.
|
seek
…FIXME
updateFetchPositions
Internal Method
boolean updateFetchPositions(final Timer timer)
updateFetchPositions
…FIXME
Note
|
updateFetchPositions is used when…FIXME
|
Polling One-Off for ConsumerRecords per TopicPartition — pollOnce
Internal Method
Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout)
pollOnce
…FIXME
Note
|
pollOnce is used exclusively when KafkaConsumer is requested to poll
|
Requesting Metadata for All Topics (From Brokers) — listTopics
Method
Map<String, List<PartitionInfo>> listTopics()
Internally, listTopics
simply requests Fetcher for metadata for all topics and returns it.
consumer.listTopics().asScala.foreach { case (name, partitions) =>
println(s"topic: $name (partitions: ${partitions.size()})")
}
Note
|
listTopics uses requestTimeoutMs that corresponds to request.timeout.ms property.
|
beginningOffsets
Method
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
beginningOffsets
requests Fetcher for beginningOffsets and returns it.
Creating KafkaConsumer Instance
KafkaConsumer
takes the following when created:
-
Consumer configuration (that is converted internally to ConsumerConfig)
-
Deserializer for keys
-
Deserializer for values
KafkaConsumer
initializes the internal registries and counters.
Note
|
KafkaConsumer API offers other constructors that in the end use the public 3-argument constructor that in turn passes the call on to the private internal constructor.
|
KafkaConsumer Public Constructor
// Public API
KafkaConsumer(
Map<String, Object> configs,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
When created, KafkaConsumer
adds the keyDeserializer and valueDeserializer to configs (as key.deserializer and value.deserializer properties respectively) and creates a ConsumerConfig.
KafkaConsumer
passes the call on to the internal constructor.
KafkaConsumer Internal Constructor
KafkaConsumer(
ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer)
When called, the internal KafkaConsumer
constructor prints out the following DEBUG message to the logs:
DEBUG Starting the Kafka consumer
KafkaConsumer
sets the internal requestTimeoutMs to request.timeout.ms property.
KafkaConsumer
sets the internal clientId to client.id or generates one with prefix consumer- (starting from 1) if not set.
KafkaConsumer
sets the internal Metrics (and JmxReporter
with kafka.consumer prefix).
KafkaConsumer
sets the internal retryBackoffMs to retry.backoff.ms property.
Caution
|
FIXME Finish me! |
KafkaConsumer
creates the internal Metadata with the following arguments:
-
allowAutoTopicCreation
enabled -
topicExpiryEnabled
disabled -
ClusterResourceListeners with user-defined list of ConsumerInterceptors in interceptor.classes property
KafkaConsumer
updates metadata
with bootstrap.servers.
Caution
|
FIXME Finish me! |
KafkaConsumer
creates a NetworkClient with…FIXME
Caution
|
FIXME Finish me! |
KafkaConsumer
creates Fetcher with the following properties:
In the end, KafkaConsumer
prints out the following DEBUG message to the logs:
DEBUG Kafka consumer created
Any issues while creating a KafkaConsumer
are reported as KafkaException
.
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
wakeup
Method
void wakeup()
Note
|
wakeup is a part of Consumer Contract.
|
wakeup
simply requests ConsumerNetworkClient to wakeup.
Note
|
Quoting
Read about Selection in java.nio.channels.Selector's javadoc. |
Note
|
wakeup is used when…FIXME
|
Configuring ClusterResourceListeners — configureClusterResourceListeners
Internal Method
ClusterResourceListeners configureClusterResourceListeners(
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
List<?>... candidateLists)
configureClusterResourceListeners
creates a ClusterResourceListeners and registers ClusterResourceListener
instances from the input candidateLists
, keyDeserializer
and valueDeserializer
.
Note
|
|
throwIfNoAssignorsConfigured
Internal Method
void throwIfNoAssignorsConfigured()
throwIfNoAssignorsConfigured
…FIXME
Note
|
throwIfNoAssignorsConfigured is used exclusively when KafkaConsumer is requested to subscribe to topics.
|
updateAssignmentMetadataIfNeeded
Internal Method
boolean updateAssignmentMetadataIfNeeded(
Timer timer)
updateAssignmentMetadataIfNeeded
requests the ConsumerCoordinator to poll until the Timer
expires.
updateAssignmentMetadataIfNeeded
returns false
if the poll was unsuccessful, i.e. FIXME
If the poll was successful, updateAssignmentMetadataIfNeeded
updateFetchPositions.
Note
|
updateAssignmentMetadataIfNeeded is used exclusively when KafkaConsumer is requested to poll for records.
|
commitAsync
Method
void commitAsync()
void commitAsync(
OffsetCommitCallback callback)
void commitAsync(
Map<TopicPartition, OffsetAndMetadata> offsets,
OffsetCommitCallback callback)
Note
|
commitAsync is part of the Consumer Contract to…FIXME.
|
commitAsync
…FIXME
committed
Method
OffsetAndMetadata committed(
TopicPartition partition)
OffsetAndMetadata committed(
TopicPartition partition,
Duration timeout)
Note
|
committed is part of the Consumer Contract to…FIXME.
|
committed
…FIXME
close
Method
void close()
void close(
Duration timeout)
Note
|
close is part of the Consumer Contract to…FIXME.
|
close
…FIXME
position
Method
long position(
TopicPartition partition)
long position(
TopicPartition partition,
Duration timeout)
Note
|
position is part of the Consumer contract.
|
position
…FIXME
poll
Internal Method
ConsumerRecords<K, V> poll(
Timer timer,
boolean includeMetadataInTimeout)
poll
first acquireAndEnsureOpen.
poll
requests the ConsumerNetworkClient to maybeTriggerWakeup.
poll
…FIXME
pollForFetches
Internal Method
Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(
Timer timer)
pollForFetches
…FIXME
Note
|
pollForFetches is used exclusively when KafkaConsumer is requested to poll for records.
|