Akka Stream: Map And MapAsync

In this blog, we will discuss what are “map” and “mapAsync” when used in the Akka stream and how to use them.

The difference is highlighted in their signatures:-

Flow.map takes in a function that returns a type T, while

Flow.mapAsync takes in a function that returns a type Future[T].

Let’s take one practical example to understand both:-

Problem – Suppose we have a user with a userId and we want to fetch his details from some Database on basis of userId.

Let’s start with solving the problem which may not be the most efficient solution.

Possible Solution – Given an Akka stream Source of UserId values, we could use Flow.map within a Stream to query the database and print the full names to the console:

package com.example.stream

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._

case class UserID(id:String)

object UserDetails extends App {

  implicit val actorSystem: ActorSystem = ActorSystem("akka-streams-example")
  implicit val materializer: ActorMaterializer = ActorMaterializer()

  val userIDSource: Source[UserId, NotUsed] = Source(List(UserID("id1")))

  val stream =
    userIDSource.via(Flow[UserID].map(getFromDatabase))
      .to(Sink.foreach(println))
      .run()

  def getFromDatabase(userID: UserID): String =
  {
    userId.id match {
      case "id1" => "piyush"
      case "id2" => "girish"
      case "id3" => "vidisha"
      case _ => "anuj"
    }
  }
}

This is a simple stream and a not so efficient solution to the problem.

One limitation that I can think of in this approach is that this stream will only make 1 database query at a time. Thus serial querying will become a “bottleneck” and will surely prevent maximum throughput in our stream.

We could try to improve performance through concurrent queries using a Future:

 

val userIDSource: Source[UserID, NotUsed] = Source(List(UserID("id1"),UserID("id2")))

val stream =
  userIDSource.via(Flow[UserID].map(concurrentDBLookup))
    .to(Sink.foreach[Future[String]](_ foreach println))
      .run()

def concurrentDBLookup(userID : UserID) : Future[String] =
  Future { getFromDatabase(userID) }

Though we have effectively removed the backpressure by making the Sink just pulling in the Future and adding a “foreach println”, which is relatively fast compared to database queries.

But now the streams will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map. Therefore, there is no limit to the number of databaseLookup running concurrently. Hence we have introduced a slowness in our program as un-restricted parallel querying could eventually overload the database.

Solution – we have now the Flow.mapAsync to the rescue; we can have concurrent database access while at the same time limiting the number of simultaneous lookups:

val maxLookupCount = 10

val maxLookupConcurrentStream = 
  userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
              .to(Sink.foreach(println))
              .run()

Also, notice that the Sink.foreach has again got simpler, it no longer takes in a Future but just the result instead.

Unordered Async Map

If maintaining a sequential order of the UserIDs to Names is unnecessary then you can use Flow.mapAsyncUnordered. For example, you just need to print all of the names to the console but didn’t care about the order they were printed.

knoldus-advt-sticker

References:

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/mapAsync.html#description

https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/mapAsyncUnordered.html

Written by 

Piyush Rana is a Senior Software Consultant having experience of more than 6 years. He is familiar with Object Oriented Programming Paradigms and .NET based technologies. From the past 2 years he has been handling Big Data and is working on technologies like Hadoop, Hive, Pig, Hbase. His hobbies includes gaming (strategy based, FPS and role-playing), watching series, and listening songs.

Leave a Reply

Knoldus Pune Careers - Hiring Freshers

Get a head start on your career at Knoldus. Join us!