KStream API — Record Stream

KStream is the abstraction of a record stream (of key-value pairs).

KStream can be created directly from one or many Kafka topics (using StreamsBuilder.stream operator) or as a result of transformations on an existing KStream.

Tip
Use Scala API for Kafka Streams to make your Kafka Streams development more pleasant if Scala is your programming language.
// Scala API for Kafka Streams
import org.apache.kafka.streams.scala._
import ImplicitConversions._
import Serdes._

val builder = new StreamsBuilder

// Use type annotation to describe the stream, i.e. stream[String, String]
// Else...Scala type inferencer gives us a stream of "nothing", i.e. KStream[Nothing, Nothing]
val input = builder.stream[String, String]("input")

scala> :type input
org.apache.kafka.streams.scala.kstream.KStream[String,String]

KStream comes with a rich set of operators (aka KStream API) that allow for building topologies to consume, process and produce key-value records.

Table 1. KStream API / Operators
Operator Description

branch

KStream<K, V>[] branch(
  Predicate<? super K, ? super V>... predicates)

filter

KStream<K, V> filter(
  Predicate<? super K, ? super V> predicate)

filterNot

KStream<K, V> filterNot(
  Predicate<? super K, ? super V> predicate)

flatMap

KStream<KR, VR> flatMap(
  KeyValueMapper<
    ? super K,
    ? super V,
    ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper)

flatMapValues

KStream<K, VR> flatMapValues(
  ValueMapper<
    ? super V,
    ? extends Iterable<? extends VR>> mapper)
KStream<K, VR> flatMapValues(
  ValueMapperWithKey<
    ? super K,
    ? super V,
    ? extends Iterable<? extends VR>> mapper)

flatTransform

KStream<K1, V1> flatTransform(
  TransformerSupplier<
    ? super K,
    ? super V,
    Iterable<KeyValue<K1, V1>>> transformerSupplier,
  String... stateStoreNames)

flatTransformValues

KStream<K, VR> flatTransformValues(
  ValueTransformerSupplier<
    ? super V,
    Iterable<VR>> valueTransformerSupplier,
  String... stateStoreNames)
KStream<K, VR> flatTransformValues(
  ValueTransformerWithKeySupplier<
    ? super K,
    ? super V,
    Iterable<VR>> valueTransformerSupplier,
  String... stateStoreNames)

foreach

void foreach(
  ForeachAction<? super K, ? super V> action)

groupBy

KGroupedStream<KR, V> groupBy(
  KeyValueMapper<? super K, ? super V, KR> selector)
KGroupedStream<KR, V> groupBy(
  KeyValueMapper<? super K, ? super V, KR> selector,
  Grouped<KR, V> grouped)

Creates a KGroupedStream with a given KeyValueMapper

groupByKey

KGroupedStream<K, V> groupByKey()
KGroupedStream<K, V> groupByKey(
  Grouped<K, V> grouped)

Creates a KGroupedStream

join

KStream<K, RV> join(
  GlobalKTable<GK, GV> globalKTable,
  KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
  ValueJoiner<? super V, ? super GV, ? extends RV> joiner)
KStream<K, VR> join(
  KStream<K, VO> otherStream,
  ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows)
KStream<K, VR> join(
  KStream<K, VO> otherStream,
  ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows,
  Joined<K, V, VO> joined)
KStream<K, VR> join(
  KTable<K, VT> table,
  ValueJoiner<? super V, ? super VT, ? extends VR> joiner)
KStream<K, VR> join(
  KTable<K, VT> table,
  ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
  Joined<K, V, VT> joined)

leftJoin

KStream<K, RV> leftJoin(
  GlobalKTable<GK, GV> globalKTable,
  KeyValueMapper<? super K, ? super V, ? extends GK> keyValueMapper,
  ValueJoiner<? super V, ? super GV, ? extends RV> valueJoiner)
KStream<K, VR> leftJoin(
  KStream<K, VO> otherStream,
  ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows)
KStream<K, VR> leftJoin(
  KStream<K, VO> otherStream,
  ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows,
  Joined<K, V, VO> joined)
KStream<K, VR> leftJoin(
  KTable<K, VT> table,
  ValueJoiner<? super V, ? super VT, ? extends VR> joiner)
KStream<K, VR> leftJoin(
  KTable<K, VT> table,
  ValueJoiner<? super V, ? super VT, ? extends VR> joiner,
  Joined<K, V, VT> joined)

map

KStream<KR, VR> map(
  KeyValueMapper<
    ? super K,
    ? super V,
    ? extends KeyValue<? extends KR, ? extends VR>> mapper)

mapValues

KStream<K, VR> mapValues(
  ValueMapper<? super V, ? extends VR> mapper)
KStream<K, VR> mapValues(
  ValueMapperWithKey<? super K, ? super V, ? extends VR> mapper)

merge

KStream<K, V> merge(
  KStream<K, V> stream)

outerJoin

KStream<K, VR> outerJoin(
  KStream<K, VO> otherStream,
  ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows)
KStream<K, VR> outerJoin(
  KStream<K, VO> otherStream,
  ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
  JoinWindows windows,
  Joined<K, V, VO> joined)

peek

KStream<K, V> peek(
  ForeachAction<? super K, ? super V> action)

print

void print(
  Printed<K, V> printed)

process

void process(
  ProcessorSupplier<? super K, ? super V> processorSupplier,
  String... stateStoreNames)

selectKey

KStream<KR, V> selectKey(
  KeyValueMapper<
    ? super K,
    ? super V,
    ? extends KR> mapper)

through

KStream<K, V> through(
  String topic)
KStream<K, V> through(
  String topic,
  Produced<K, V> produced)

Materializes the stream to a given topic (passes it through) and creates a new KStream from the topic (using the Produced for configuration)

to

void to(
  String topic)
void to(
  String topic,
  Produced<K, V> produced)
void to(
  TopicNameExtractor<K, V> topicExtractor)
void to(
  TopicNameExtractor<K, V> topicExtractor,
  Produced<K, V> produced)

Produces records to a given topic or using dynamic routing based on TopicNameExtractor

Note
Topics should be created manually before the Kafka Streams application is started.

transform

KStream<K1, V1> transform(
  TransformerSupplier<
    ? super K,
    ? super V,
    KeyValue<K1, V1>> transformerSupplier,
  String... stateStoreNames)

Stateful record transformation

transformValues

KStream<K, VR> transformValues(
  ValueTransformerSupplier<
    ? super V,
    ? extends VR> valueTransformerSupplier,
  String... stateStoreNames)
KStream<K, VR> transformValues(
  ValueTransformerWithKeySupplier<
    ? super K,
    ? super V,
    ? extends VR> valueTransformerSupplier,
  String... stateStoreNames)

Stateful record-by-record value transformation

transformValues uses ValueTransformerSupplier to create a ValueTransformer that is used for a stateful transformation of record values in a stream.

Note
KStreamImpl is the one and only known implementation of the KStream Contract in Kafka Streams.

results matching ""

    No results matching ""