Kafka Streams — Stream Processing Library on Apache Kafka

Kafka Streams is a library for developing applications for processing records from topics in Apache Kafka.

A Kafka Streams application processes record streams through a topology (of stream processors). A Kafka Streams developer defines the processing logic using either the high-level Streams DSL (with streams and tables) or low-level Processor API (with stream processing nodes).

Once you have defined the processing logic in the form of a Kafka Stream topology, you should define the execution environment of the application using KafkaStreams API that you start in the end.

object KafkaStreamsApp extends App {

  // Step 0. Imports for Scala API for Kafka Streams
  import org.apache.kafka.streams.scala._
  import ImplicitConversions._
  import Serdes._

  // Step 1. Describe Topology
  // Consume records from input topic and produce records to upper topic
  val builder = new StreamsBuilder

  val uppers = builder
    .stream[String, String]("input") // Consume records as a stream
    .mapValues(_.toUpperCase)        // Transform (map) the values

  // Produce records to "upper" topic
  uppers.to("upper")

  // Print out records to stdout for debugging purposes
  import org.apache.kafka.streams.kstream.Printed
  val sysout = Printed
    .toSysOut[String, String]
    .withLabel("stdout")
  uppers.print(sysout)

  // Step 2. Build Topology
  val topology = builder.build

  // You can describe the topology and just finish
  println(topology.describe())

  // That finishes the "declaration" part of developing a Kafka Stream application
  // Nothing is executed at this time (no threads have started yet)

  // Step 3. Specify Configuration
  import java.util.Properties
  val props = new Properties()
  import org.apache.kafka.streams.StreamsConfig
  val appId = this.getClass.getSimpleName.replace("$", "")
  props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId)
  props.put(StreamsConfig.CLIENT_ID_CONFIG, appId)
  props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ":9092")

  // Step 4. Create Kafka Streams Client
  import org.apache.kafka.streams.KafkaStreams
  val ks = new KafkaStreams(topology, props)

  // Step 5. Start Stream Processing, i.e. consuming, processing and producing records
  ks.start
}

Since Kafka Streams is a library, you should define it as the dependency of your Kafka Streams application.

val kafkaVer = 2.2.0
libraryDependencies += "org.apache.kafka" % "kafka-streams" % kafkaVer
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVer

On the outside, a typical Kafka Streams application is made up of two main objects, i.e. Topology and KafkaStreams (aka streams client).

On the inside, the KafkaStreams object creates a StateDirectory, a Metric (with a JmxReporter), a StreamsMetadataState, a GlobalStreamThread and one or more StreamThreads.

KafkaStreams uses num.stream.threads configuration property for the number of StreamThreads to create (default: 1 processing thread).

A single StreamThread creates a restore Kafka Consumer (restoreConsumer), a StoreChangelogReader, a StreamsMetricsThreadImpl, a ThreadCache, a TaskCreator (for StreamTasks), a StandbyTaskCreator (for StandbyTasks) and the TaskManager.

It is through the TaskManager that a StreamThread manages the active stream tasks and processes records.

A StreamThread creates also a RebalanceListener for…​FIXME

Eventually, KafkaStreams is started and the Kafka Streams application starts consuming, processing, and producing records (as described by the Topology).

When started, KafkaStreams starts the GlobalStreamThread and the StreamThreads.

A StreamThread runs the main record processing loop that uses a KafkaConsumer to subscribe to the source topics of the topology (with the ConsumerRebalanceListener for dynamically assigned partitions).

A StreamThread then uses the KafkaConsumer to fetch records (from the source topics).

StreamThread uses poll.ms configuration property for the poll time (default: 100L milliseconds) or 0 when in PARTITIONS_ASSIGNED state.

If there are records to process and there are active streams tasks, StreamThread…​FIXME

results matching ""

    No results matching ""