In my previous blog, I discussed about the basics of akka-stream. Basically, it was an introduction to akka-stream to get you started. There I mentioned about materialization which is a really important concept in Akka-Streams. And in this blog I’m going to discuss about it and some related topics in detail. So, let’s begin !
Let’s revisit the basics first. So, the data processing in akka-streams is done as data flow from different stages of a graph. These stages have either one or more inputs and outputs.The basic building blocks are Sources (one output), Sinks (one input) and Flows (one input and one output). Using them, we can build linear pipelines/graphs. For example :
val numbers: Source[Int, NotUsed] = Source(1 to 10)
val sideEffectSink: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))
val flowToDouble: Flow[Int, Int, NotUsed] = Flow[Int].map(_ * 2)
Although we have built a graph but on execution nothing will be printed. Here lies the border between stream description and stream execution. We have created a graph that serves as a blueprint. And it is just a stream description.
To execute a stream we need to materialize it, and for that we need a Materializer. Materializer is a tool that actually runs a stream. Let’s dig deeper into it !
Stream materialization is the process of taking a stream description (
RunnableGraph) and allocating all the necessary resources it needs in order to run. In akka-streams, it means starting up the actors which works underneath streams and actually do the processing. It can also mean opening a file or a socket connection etc. dependent on whatever the stream requires.
“Terminal Operation” triggers materialization. It includes various forms of run() and runWith() operations defined on Sources and Flows. It also includes some syntactic sugars such as runForeach(elem => …) that is same as runWith(SInk.foreach(elem=> ….).
For all this you have to include a materializer and an actorsystem preferably implicit, as follows :
implicit val system: ActorSystem = ActorSystem("Demo") implicit val materializer = Materializer
And now you can do the following, and this time it will be execcuted :
It will print a series of double of the numbers.
- Materialization produces a materialized value.
- Each stage is capable of producing a materialized value.
- The materialized value at one stage is passed to another. It is upto the used to decide how to use them in that stage.
- This materialized value is calculated each time when the stream is run.
- It is independent of what was its value before.
Accesing Materialized value
Here, we will be looking at some functions that are related to materialize value. They are the following :
- to : It materializes the value of the stage which calls it.
- toMat : It allows you to transform/combine materialized values. It accepts a Sink and a function to combine materialized values.
- viaMat : It is just like toMat but accepts a Flow instead of Sink.
- run : It executes a RunnableGraph and returns materialized value.
val single: Source[String, NotUsed] = Source.single("no repetition") val streamToUpper = Flow[String].map(_.toUpperCase) single.viaMat(streamToUpper)(Keep.left).toMat(sideEffectSink)(Keep.left).run()
This Keep is a combine function. That is generally of the following type:
- Keep.both : Keeps the materialized value of both stage attached to it.
- Keep.left : Keeps the materialized value of left stage attached to it.
- Keep.right : Keeps the materialized value of right stage attached to it.
Apart from the above functions we have some shortcuts too that are good to know. They are following :
- runWith : It materializes the attached sink as well as executes the stream.
- runForeach : It creates and runs the Sink.foreach on the Source.
- runFold : It creates and runs the Sink.fold on the Source.
Fusion in Stream
Materialization is performed synchronously on the stream. By default, different stages are fused together and uses a single actor. But there are consequences of Fusing. Consequences are :
- passing elements from one operator to the next is a lot faster between fused operators due to avoiding the asynchronous messaging overhead
- fused stream operators do not run in parallel to each other, meaning that only up to one CPU core is used for each fused part
So to run them asynchronously you will have to insert asynchronous boundary manually in your flow and operators. You can achieve this by using async on Source, Sinks and Flows.
By using async we create boundaries and divide our data flow in different parts.
val streamSum = Flow[Int].reduce(_+_) numbers.via(streamSum).async.runWith(sideEffectSink)
Now, it will use two actors instead of one. The first one for the encircled block and second for remaining. You can add multiple async to create boundaries. All the stages within a single async boundary will be fused together.
So that’s all about Materialization in akka-streams. I hope now you are more clear with this concept. For any queries, feel free to comment below. Also I have added the coding part here. You can take reference from there.