How to Use Graph DSL with Akka Streams

Knoldus Blog Audio
Reading Time: 3 minutes

This blog is for the Scala programmer with a little bit of Akka background, but who is utterly baffled by the seemingly magical Akka Streams Graph DSL. This article will use bits of Scala code that are easy to understand.

Introduction

In Akka Streams computation graphs are not expressed using a fluent DSL like linear computations are, instead they are written in a more graph-resembling DSL which aims to make translating graph drawings.

Graphs are needed whenever you want to perform any kind of fan-in (“multiple inputs”) or fan-out (“multiple outputs”) operations. Considering linear Flows to be like roads, we can picture graph operations as junctions: multiple flows being connected at a single point. Some operators which are common enough and fit the linear style of Flows, such as concat (which concatenates two streams, such that the second one is consumed after the first one has completed), may have shorthand methods defined on Flow or Source themselves, however, you should keep in mind that those are also implemented as graph junctions.

If you want to code with me, add the following to your build.sbt file:

val akkaVersion = "2.12.11"

libraryDependencies ++= Seq(
  // akka streams
  "com.typesafe.akka" %% "akka-stream" % akkaVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
)

Steps for Creating graph

I’m assuming you know what Akka tries to do with actors and a bit of Akka Streams, at least what a Source, Sink and Flow do. Here is an example where I would like to create a stream that deals with the numbers. So it has some kind of number source as an input.

I am going to declare an an input as source 1 to 1000.

For every one of these numbers, not one but two hard computations should be evaluated in parallel. They should be tuples paired together. Also, I have a flow going to call this incremental which has one of the hard computations. The other one is for the multiplier.

I would like to execute both of these floes in parallel. Somehow merge back the results in a tuple or a pair. So I have an output as a sink for each.

Here’s code for an implementations:

  implicit val system = ActorSystem("Graphs")
  implicit val materializer = ActorMaterializer()

  val input = Source(1 to 1000)
  val incrementer = Flow[Int].map(x => x+1)
  val multiplier = Flow[Int].map(x => x * 10)
  val output = Sink.foreach[(Int,Int)](println)

Step 1: Setting up the Fundamentals

 val graph = RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => // builder = MUTABLE data structure
      import GraphDSL.Implicits._ // brings some nice operators into scope

The GraphDSL.create is a curried function. The first argument list is empty second argument is a function. That function takes a mutable data structure called a Builder which is typed with a materialized value, which in our case will be NotUsed, as we aren’t surfacing anything outside of the stream. Inside the function block, we are already importing import GraphDSL.Implicits._ to bring some alien operators in scope.

Step 2: Add the Components

After the implicit import, still inside the block of the function, we need to add the individual components.

val broadcast = builder.add(Broadcast[Int](2)) //fan-out operator
val zip = builder.add(Zip[Int,Int]) //fan-in operator

Step 3: Tying up the Components

The squiggly arrow mutates the Builder which (internally) describes the layout of our stream. This step is one of the most powerful in Akka Streams.

input ~> broadcast
broadcast.out(0) ~> incrementer ~> zip.in0
broadcast.out(1) ~> multiplier ~> zip.in1
zip.out ~> output

Step 4: Closing

val graph = RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] => 
      import GraphDSL.Implicits._ // brings some nice operators into scope


ClosedShape
}

ClosedShape is an object which is a marker for Akka when you will materialize this graph, to make sure you didn’t leave any internal component with any input or output hanging or unconnected.

Final code

After you are done creating the graph, you will need to materialize it to run:

Akka Stream code
Graph code

Reference:

https://doc.akka.io/docs/akka/current/stream/stream-graphs.html

Leave a Reply