KStreamPeek — ProcessorSupplier of KStreamPeekProcessors

KStreamPeek is a custom ProcessorSupplier of KStreamPeekProcessors for KStream.foreach and KStream.peek operators.

// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

val builder = new StreamsBuilder
builder
  .stream[String, String]("input")
  .peek { (k,v) => println(s"($k, $v)") }
  .foreach { (k,v) => println(s"($k, $v)") }
val topology = builder.build
scala> println(topology.describe)
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-PEEK-0000000001
    Processor: KSTREAM-PEEK-0000000001 (stores: [])
      --> KSTREAM-FOREACH-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FOREACH-0000000002 (stores: [])
      --> none
      <-- KSTREAM-PEEK-0000000001

KStreamPeek takes the following to be created:

  • ForeachAction

  • forwardDownStream flag

KStreamPeek is created when KStreamImpl is requested to foreach (the forwardDownStream flag is disabled) and peek (the forwardDownStream flag is enabled).

Note
KStreamImpl is the default KStream.

When requested for a Processor, KStreamPeek gives a new KStreamPeekProcessor.

KStreamPeekProcessor

KStreamPeekProcessor is a custom record processor (indirectly as AbstractProcessor) that allows for executing an action with records (peeks at them).

When requested to process a record, KStreamPeekProcessor executes the ForeachAction with the record. If the forwardDownStream flag is enabled, KStreamPeekProcessor requests the ProcessorContext to forward the record downstreams.

results matching ""

    No results matching ""