object KafkaStreamsApp extends App {
// Step 0. Imports for Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
// Step 1. Describe Topology
// Consume records from input topic and produce records to upper topic
val builder = new StreamsBuilder
val uppers = builder
.stream[String, String]("input") // Consume records as a stream
.mapValues(_.toUpperCase) // Transform (map) the values
// Produce records to "upper" topic
uppers.to("upper")
// Print out records to stdout for debugging purposes
import org.apache.kafka.streams.kstream.Printed
val sysout = Printed
.toSysOut[String, String]
.withLabel("stdout")
uppers.print(sysout)
// Step 2. Build Topology
val topology = builder.build
// You can describe the topology and just finish
println(topology.describe())
// That finishes the "declaration" part of developing a Kafka Stream application
// Nothing is executed at this time (no threads have started yet)
// Step 3. Specify Configuration
import java.util.Properties
val props = new Properties()
import org.apache.kafka.streams.StreamsConfig
val appId = this.getClass.getSimpleName.replace("$", "")
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
props.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")
// Step 4. Create Kafka Streams Client
import org.apache.kafka.streams.KafkaStreams
val ks = new KafkaStreams(topology, props)
// Step 5. Start Stream Processing, i.e. consuming, processing and producing records
ks.start
}
Kafka Streams — Stream Processing Library on Apache Kafka
Kafka Streams is a library for developing applications for processing records from topics in Apache Kafka.
A Kafka Streams application processes record streams through a topology (of stream processors). A Kafka Streams developer defines the processing logic using either the high-level Streams DSL (with streams and tables) or low-level Processor API (with stream processing nodes).
Once you have defined the processing logic in the form of a Kafka Stream topology, you should define the execution environment of the application using KafkaStreams API that you start in the end.
Since Kafka Streams is a library, you should define it as the dependency of your Kafka Streams application.
val kafkaVer = 2.3.0
libraryDependencies += "org.apache.kafka" % "kafka-streams" % kafkaVer
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVer
On the outside, a typical Kafka Streams application is made up of two main objects, i.e. Topology and KafkaStreams (aka streams client).
On the inside, the KafkaStreams
object creates a StateDirectory, a Metric
(with a JmxReporter
), a StreamsMetadataState, a GlobalStreamThread and one or more StreamThreads.
KafkaStreams
uses num.stream.threads configuration property for the number of StreamThreads
to create (default: 1
processing thread).
A single StreamThread
creates a restore Kafka Consumer (restoreConsumer
), a StoreChangelogReader, a StreamsMetricsThreadImpl, a ThreadCache, a TaskCreator (for StreamTasks), a StandbyTaskCreator (for StandbyTasks) and the TaskManager.
It is through the TaskManager that a StreamThread
manages the active stream tasks and processes records.
A StreamThread
creates also a RebalanceListener for…FIXME
Eventually, KafkaStreams
is started and the Kafka Streams application starts consuming, processing, and producing records (as described by the Topology).
When started, KafkaStreams
starts the GlobalStreamThread and the StreamThreads.
A StreamThread
runs the main record processing loop that uses a KafkaConsumer to subscribe to the source topics of the topology (with the ConsumerRebalanceListener for dynamically assigned partitions).
A StreamThread
then uses the KafkaConsumer to fetch records (from the source topics).
StreamThread
uses poll.ms configuration property for the poll time (default: 100L
milliseconds) or 0
when in PARTITIONS_ASSIGNED
state.
If there are records to process and there are active streams tasks, StreamThread
…FIXME