Example of stream transformation

I found Source(Iterator.continually(mySource)).flatten(Concat) in add Source.repeated(...) that got me curious how to turn it into something working.

What is Source?

From the scaladoc of akka.stream.scaladsl.Source:

A Source is a set of stream processing steps that has one open output and an attached input. Can be used as a Publisher

It makes sense, but the point here is to get it up and running and the input would be io.StdIn.readLine().

What is Concat?

The scaladoc of Concat:

Takes two streams and outputs an output stream formed from the two input streams by consuming one stream first emitting all of its elements, then consuming the second stream emitting all of its elements.

When tried the line with import akka.stream.scaladsl.{Concat, Source} the following error was thrown:

scala> Source(Iterator.continually(io.StdIn.readLine())).flatten(Concat)
<console>:12: error: overloaded method value apply with alternatives:
  [T](props: akka.actor.Props)akka.stream.scaladsl.PropsSource[T] <and>
  [T](graph: akka.stream.scaladsl.PartialFlowGraph)(block: akka.stream.scaladsl.FlowGraphBuilder => akka.stream.scaladsl.UndefinedSink[T])akka.stream.scaladsl.Source[T] <and>
  [T](future: scala.concurrent.Future[T])akka.stream.scaladsl.Source[T] <and>
  [T](iterable: scala.collection.immutable.Iterable[T])akka.stream.scaladsl.Source[T] <and>
  [T](f: () => Iterator[T])akka.stream.scaladsl.Source[T] <and>
  [T](publisher: org.reactivestreams.Publisher[T])akka.stream.scaladsl.Source[T]
 cannot be applied to (Iterator[String])
              Source(Iterator.continually(io.StdIn.readLine())).flatten(Concat)
              ^

It appears that few things have changed since the JIRA issue was reported.

Final Solution

After many tries and errors I ended up with the following data flow:

implicit val as = ActorSystem("akka-http-sandbox")
implicit val fm = FlowMaterializer()

lazy val source = Source(() => Iterator.from(0))
Source(List(source)).flatten(FlattenStrategy.concat).take(4).to(Sink.foreach(println)).run

I'm unsure what and how much I lost during the (self-taught) flow transformation.

results matching ""

    No results matching ""