About Akka Streams
Notes (aka shameless copies of entire paragraphs) from the article akka-stream: Akka's reactive stream implementation. An excellent blog post I wholeheartedly recommend reading from the beginning to the end few times!
- Reactive Streams "is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure on the JVM".
- The Akka project has an experiental akka-stream module that implements the Reactive Streams specification with a useful set of abstractions for stream processing.
- Use
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-M4"
to include the module in a sbt project
- Use
- Review akka-http-stream-example for the code accompanying the article (and the notes).
- allows for subscribers to signal that they can handle more items
- for publishers to push items to the subscribers when they're ready
- the publisher can only push data to a subscriber if and only if the subsciber has indicated that it can handle more data.
- the Subscriber interface
- The subscriber must provide stream elements to the subscriber by invoking the
onNext
function, but must not exceed the total number elements that the subscriber has expressed demand for.
- The subscriber must provide stream elements to the subscriber by invoking the
- the Subscription interface = A subscription is a subscriber's communication channel back to the publisher that allows it to either cancel the subscription or signal demand by requesting more data.
request
signals to the publisher how many more elements a subscriber can handle = the mechanism by which non blocking back pressure is achieved in Reactive Streams.cancel
allows a subscriber to terminate a subscription meaning the publisher will no longer send data to the subscriber
- the Publisher interface
- a subscriber is subscribed via the
subscribe
function publisher.subscribe(subscriber)
- the publisher must honor the subscriber's
cancel
request if thecancel
function is invoked on the subscription, as well as be responsible for invoking theonError
andonComplete
functions if an error is encountered or the stream is completed respectively.
- a subscriber is subscribed via the
- the Processor interface
- combines the Subscriber and Publisher interfaces
- allows stream processors to be created that can consume, transform, and publish streams while retaining back pressure propogation with the original stream source
- each of the intermediary steps that both take in a stream a produce a corresponding stream are themselves processors.
- the processor concept is integral to retaining back pressure propogation
- the blog post presents an example with a publisher that produces fibonacci values, a transformer that doubles every value that comes through, and a subscriber that merely logs values as they are consumed along with a fixed delay to simulate a slow consumer.
- How to use akka-stream to create a publisher?
- Akka == an actor-based concurrency framework so (it should as no surprise that) Akka Streams allows us to create reactive streams completely out of actors.
- the
ActorPublisher[T]
trait- can be mixed into any actor and provides important publisher-specific lifecycle management.
- handles most of the book keeping required to properly manage the life cycle of a publisher
- help implementing a reactive publisher
- provides a number of functions to interrogate the current state of the subscription and manipulate the stream.
- provides functions that track the state of the subscription
- isActive - Returns a boolean indicating if the stream is in an active state
- isCompleted - Returns a boolean indicating if the stream has been completed
- isErrorEmitted - Returns a boolean indicating if the stream has encountered an exception
- totalDemand - Returns an long indicating the total demand that has been signaled by subscriber
- functions updated automatically during the lifetime of the subscription
- as the subscriber signals that they can handle more demand, the ActorPublisher trait automatically increases the totalDemand.
- also defines several functions for interacting with the stream
onNext
pushes an element into the stream (Throws an exception if isActive is false or totalDemand < 1)onComplete
completes the streamonError
terminates the stream with an error
- two messages are used to signal when the subscriber either cancels the subscription or signals more demand:
Cancel
andRequest(n: Long)
Cancel
tocontext.stop(self)
Request
checksisActive && totalDemand > 0
before proceeding usingonNext
- Actor maintains a state
val pubRef = system.actorOf(Props[FibonacciPublisher])
to instantiate the actorval publisher = ActorPublisher[BigInteger](pubRef)
- note to myself: I'm unsure whether it's really needed- the
ActorSubscriber
trait- a trait to manage the lifecycle of a subscriber
- behaves very similarly to ActorProducer[T] in that it provides several facilities to assist us properly behave the way that a reactive subscriber should.
- provides a number of messages that will be sent to the actor as normal stream lifecycle events occur
- OnNext - Emitted when publisher pushes a new element into the stream
- OnComplete - Emitted when publisher completes the stream
- OnError - Emitted when the publisher ends the stream due to an exception
- the ActorSubscriber trait forces any subclass to define a request strategy.
- A request strategy is used to control stream back pressure and as such dictates when a request is made of the subscription and for how many elements.
- A request strategy is a simple object that the subscriber uses after every element pushed into the stream to determine whether or not more elements should be requested.
- the RequestStrategy trait
- several implementations provided by akka-stream
- WatermarkRequestStrategy - This strategy allows us to specify a high and low watermark that is used to request more elements. If the number of unhandled messages is less than the low watermark this strategy will request enough elements to meet the high watermark. This is a good choice if all work is done directly inside the current actor, however it would be a poor choice to use if this actor is delegating work to a third party.
- MaxInFlightRequestStrategy - This strategy tracks the number of elements that are currently "in flight", meaning they may or may not have been consumed, but not fully handled. To use this strategy we would need to implement a subclass of this strategy that defines the function inFlightInternally: Int that tracks the current number of elements being processed. We will use this strategy to implement our Doubling Processor.
- ZeroRequestStrategy - Finally, we can define no request strategy at all and simply call the request function provided by the ActorSubcriber trait explictly when we want more elements.
- A middle man in stream processing pipeline = processor being an actor that is both an
ActorPublisher[T]
and anActorSubscriber
- The example uses
mutable.Queue
to store all unpublished doubled values