val kafka = "0.10.2.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % kafka
// You should also define the logging binding for slf4j
// Kafka uses slf4j for logging
val logback = "1.2.3"
libraryDependencies += "ch.qos.logback" % "logback-core" % logback
libraryDependencies += "ch.qos.logback" % "logback-classic" % logback
Kafka Consumers
Multiple concurrent consumers read (aka pull) messages from topics however they want using offsets. Unlike typical messaging systems, Kafka consumers pull messages from a topic using offsets.
A Kafka consumer consume records from a Kafka cluster.
Note
|
Kafka 0.9.0.0 was about introducing a brand new Consumer API aka New Consumer. |
When a consumer is created, it requires bootstrap.servers which is the initial list of brokers to discover the full set of alive brokers in a cluster from.
A consumer has to subscribe to the topics it wants to read messages from called topic subscription.
Caution
|
FIXME Building a custom consumption strategy |
Using Kafka Consumer API requires kafka-clients dependency in your project.
Consumer Contract
public interface Consumer<K, V> extends Closeable {
// FIXME more methods...
ConsumerRecords<K, V> poll(long timeout)
}
Method | Description |
---|---|
Used to… |
Topic Subscription
Topic Subscription is the process of announcing the topics a consumer wants to read messages from.
void subscribe(Collection<String> topics)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
Note
|
subscribe method is not incremental and you always must include the full list of topics that you want to consume from.
|
You can change the set of topics a consumer is subscrib to at any time and (given the note above) any topics previously subscribed to will be replaced by the new list after subscribe
.
Automatic and Manual Partition Assignment
Caution
|
FIXME |
Consumer Groups
A consumer group is a set of Kafka consumers that share a common link:a set of consumers sharing a common group identifier#group.id[group identifier].
Partitions in a topic are assigned to exactly one member in a consumer group.
Group Coordination Protocol
Caution
|
FIXME |
-
the new consumer uses a group coordination protocol built into Kafka
-
For each group, one of the brokers is selected as the group coordinator. The coordinator is responsible for managing the state of the group. Its main job is to mediate partition assignment when new members arrive, old members depart, and when topic metadata changes. The act of reassigning partitions is known as rebalancing the group.
-
When a group is first initialized, the consumers typically begin reading from either the earliest or latest offset in each partition. The messages in each partition log are then read sequentially. As the consumer makes progress, it commits the offsets of messages it has successfully processed.
-
When a partition gets reassigned to another consumer in the group, the initial position is set to the last committed offset. If a consumer suddenly crashed, then the group member taking over the partition would begin consumption from the last committed offset (possibly reprocessing messages that the failed consumer would have processed already but not committed yet).