In my previous blogs, I discussed about the basics of akka-streams and materialization. Now let’s dig deeper into Graphs in Akka-Streams.
Graphs
Till now we know how to create a linear pipeline/linear graph. But in real life scenario we generally don’t have linear graphs to implement. The graphs can be complex. In Akka Streams computation graphs are written in a more graph-resembling DSL. It aims to make translating graph drawings (e.g. from notes taken from design discussions, or illustrations in protocol specifications) to and from code simpler.
Graphs are used to perform fan-in and fan-out operations. You can consider graph operations as junctions(multiple flows connected at a single point).
- Fan-in : It takes multiple inputs and produces a single output.
- Fan-out : It produces multiple outputs by taking a single input.
Constructing Graphs
For building graphs, we use linear flows that serves as linear connection in graphs. We also use junctions which serve as fan-in and fan-out points.
Some of the junctions provided by Akka-Streams are following :
Fan-out Junctions
- Broadcast[T] – (1 input, N outputs) given an input element emits to each output
- Balance[T] – (1 input, N outputs) given an input element emits to one of its output ports
- UnzipWith[In,A,B,…] – (1 input, N outputs) takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
- UnZip[A,B] – (1 input, 2 outputs) splits a stream of (A,B) tuples into two streams, one of type A and one of type B
Fan-in Junctions
- Merge[In] – (N inputs , 1 output) picks randomly from inputs pushing them one by one to its output
- MergePrioritized[In] – like Merge but if elements are available on all input ports, it picks from them randomly based on their priority
- ZipWith[A,B,…,Out] – (N inputs, 1 output) which takes a function of N inputs that given a value for each input emits 1 output element
- Zip[A,B] – (2 inputs, 1 output) is a specialised ZipWith for zipping input streams of A and B into a (A,B) tuple stream
Let’s look at an example before moving forward. Consider the following graph :
So it basically connects a source named numbers to sink named sideEffectSink through a graph. In this all the elements of the stream are broadcasted to each outlet. One output stream goes through addTwo and other through prodTwo which adds 2 and multiply the element by 2 respectively. After that the streams are merged to give a single output stream.
In the following way we can implement this graph:
val sideEffectSink: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))
val numbers: Source[Int, NotUsed] = Source(1 to 10)
val graphAddProd =
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val addTwo: Flow[Int, Int, NotUsed] = Flow[Int].map(_ + 2)
val prodTwo: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
numbers ~> broadcast ~> addTwo ~> merge ~> sideEffectSink
broadcast ~> prodTwo ~> merge
ClosedShape
})
The import GraphDSL.Implicits._ brings into scope ~> operator (knows as ‘edge’, ‘via’ or ‘to’) and it’s inverted counterpart <~ (for noting downflows in the opposite direction).
Constructing Partial Graphs
Partial Graphs are not complete computational Graphs rather they are a part/phase of a computational graph. We can build the partial graph at different places and join them later to create a complete graph.
The need of partial graph rises as it is not always possible to build a computational graph or sometimes it is not even needed.
So, now question arises, how to build partial graphs? To build a partial graph we can return a different shape in place of ClosedShape. There are different shapes. For example :
- SourceShape
val pairs = Source.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(Zip[Int, Int]())
numbers.filter(_ % 2 != 0) ~> zip.in0
numbers.filter(_ % 2 == 0) ~> zip.in1
SourceShape(zip.out)
})
- FlowShape
val pairUpWithToString =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Int, String]())
broadcast.out(0).map(identity) ~> zip.in0
broadcast.out(1).map(_.toString) ~> zip.in1
FlowShape(broadcast.in, zip.out)
})
- Sinkshape
val extractOne = Sink.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val unzip = builder.add(Unzip[Int, Int])
unzip.out1 ~> ignoreSink
unzip.out0 ~> sideEffectSink
SinkShape(unzip.in)
})
Also, to make a RunnableGraph all ports has to be connected,so there won’t be errors in wiring these parts. You will know any such error at construction time.
Using Simplified API to Combine Source and Sink
There is a simplified API that can connect Sinks and Sources with junctions without using Graph DSL. For this, we use combine method that takes care of building the necessary graph underneath.
Look at the following example in which we combine two sources into one:
val sourceOne = Source(List("hi", "hello"))
val sourceTwo = Source(List(1, 2))
val source = Source.combine(sourceOne, sourceTwo)(Merge(_))
We can do the same for Sink but with that you have to use a Fan-out junction.
So that’s all about Graphs in Akka-Streams. I hope it was helpful to you.
Visit here for the full code.
Reference