KStreamBranch — ProcessorSupplier of KStreamBranchProcessors

KStreamBranch is a custom ProcessorSupplier of KStreamBranchProcessors for KStream.branch operator.

// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

val builder = new StreamsBuilder
def alwaysTrue(k: String, v: String) = true
builder
  .stream[String, String]("input")
  .branch(alwaysTrue)
val topology = builder.build
scala> println(topology.describe)
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input])
      --> KSTREAM-BRANCH-0000000001
    Processor: KSTREAM-BRANCH-0000000001 (stores: [])
      --> KSTREAM-BRANCHCHILD-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-BRANCHCHILD-0000000002 (stores: [])
      --> none
      <-- KSTREAM-BRANCH-0000000001

KStreamBranch takes the following to be created:

  • Predicate<K, V>[]

  • Child nodes

KStreamBranch is created exclusively when KStreamImpl is requested to branch.

Note
KStreamImpl is the default KStream.

When requested for a Processor, KStreamBranch gives a new KStreamBranchProcessor.

KStreamBranchProcessor

KStreamBranchProcessor is a custom record processor (indirectly as AbstractProcessor) that allows for forwarding a record to exactly one of the child processors (branching on them).

When requested to process a record, KStreamBranchProcessor walks over the predicates and requests each and every predicate to test the record. When true, process requests the ProcessorContext to forward the record to a corresponding child processor.

Note
process requests the predicates until positive is found or finishes without forwarding a record.

results matching ""

    No results matching ""