// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
val builder = new StreamsBuilder
builder
.stream[String, String]("input")
.peek { (k,v) => println(s"($k, $v)") }
.foreach { (k,v) => println(s"($k, $v)") }
val topology = builder.build
scala> println(topology.describe)
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-PEEK-0000000001
Processor: KSTREAM-PEEK-0000000001 (stores: [])
--> KSTREAM-FOREACH-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-FOREACH-0000000002 (stores: [])
--> none
<-- KSTREAM-PEEK-0000000001
KStreamPeek — ProcessorSupplier of KStreamPeekProcessors
KStreamPeek
is a custom ProcessorSupplier of KStreamPeekProcessors for KStream.foreach and KStream.peek operators.
KStreamPeek
takes the following to be created:
KStreamPeek
is created when KStreamImpl
is requested to foreach (the forwardDownStream flag is disabled) and peek (the forwardDownStream flag is enabled).
Note
|
KStreamImpl is the default KStream.
|
When requested for a Processor, KStreamPeek
gives a new KStreamPeekProcessor.
KStreamPeekProcessor
KStreamPeekProcessor
is a custom record processor (indirectly as AbstractProcessor) that allows for executing an action with records (peeks at them).
When requested to process a record, KStreamPeekProcessor
executes the ForeachAction with the record. If the forwardDownStream flag is enabled, KStreamPeekProcessor
requests the ProcessorContext
to forward the record downstreams.