// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._
val builder = new StreamsBuilder
def alwaysTrue(k: String, v: String) = true
builder
.stream[String, String]("input")
.branch(alwaysTrue)
val topology = builder.build
scala> println(topology.describe)
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [input])
--> KSTREAM-BRANCH-0000000001
Processor: KSTREAM-BRANCH-0000000001 (stores: [])
--> KSTREAM-BRANCHCHILD-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-BRANCHCHILD-0000000002 (stores: [])
--> none
<-- KSTREAM-BRANCH-0000000001
KStreamBranch — ProcessorSupplier of KStreamBranchProcessors
KStreamBranch
is a custom ProcessorSupplier of KStreamBranchProcessors for KStream.branch operator.
KStreamBranch
takes the following to be created:
Note
|
KStreamImpl is the default KStream.
|
When requested for a Processor, KStreamBranch
gives a new KStreamBranchProcessor.
KStreamBranchProcessor
KStreamBranchProcessor
is a custom record processor (indirectly as AbstractProcessor) that allows for forwarding a record to exactly one of the child processors (branching on them).
When requested to process a record, KStreamBranchProcessor
walks over the predicates and requests each and every predicate to test the record. When true
, process
requests the ProcessorContext
to forward the record to a corresponding child processor.
Note
|
process requests the predicates until positive is found or finishes without forwarding a record.
|