How to throttle messages using Akka Fsm ?


A few days ago, I was having an issue that the rate of incoming message requests to the process was too high.  All requests were thus being processed in future. By default, every message request was getting processed in parallel. But this was too much parallelism. It was flooding the thread pool with a lot of simultaneous work. A better way to do it is to limit how much you are doing in parallel.

After researching a lot, I read that Akka FSM might help to throttle the messages. So, I tried to go with it.

The goal is to implement a message throttler on the basis of size, a piece of code that ensures that messages are not sent out at a very high rate.

According to documentation, an FSM can be described as a set of relations of the form:

State(S) x Event(E) -> Actions (A), State(S’)

If we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.

Abstract Idea:

  1. We will make throttler machine with two states, Waiting and Active.
  2. There will be two types of messages, incoming messages and Flush ( message which will tell start processing messages)
  3. Start with Waiting state using empty Queue and accumulate incoming message requests.
  4. A flush message will decide when to change state from waiting to active.
  5. While changing state from Waiting to Active, process requests in onTransition.
  6.   onTransition is a partial function, which takes as input a pair of states—the current and the next state.
  7. From active, go to waiting for state again after dequeuing processed requests.

Implementation:

sealed trait State
case object Waiting extends State
case object Active extends State

case class Msg(a: Int)
case object Flush

case class StateData(queue: immutable.Queue[Msg])

class SizeBasedThrottler extends FSM[State, StateData] {
startWith(Waiting, StateData(Queue.empty))

onTransition {
case Waiting -> Active =>
nextStateData match {
case StateData(queue) =>
for(x <- queue) yield println(s"$curTime processing ${x.a} ") Thread.sleep(2000L) // used just to depict as real time problem take time to process request } } when(Active) { case Event(msg: Msg, _) =>
println(s"$curTime at Active $msg" )
goto(Waiting) using StateData(Queue(msg))//StateData.single(msg)
}

when(Waiting, stateTimeout = 2 seconds){
case Event(msg: Msg, StateData(oldQueue)) =>
val newQueue = oldQueue :+ msg
println(s"$curTime at Idle $newQueue")
stay() using StateData(newQueue)

case Event(Flush, StateData(queue)) => goto(Active) using StateData(queue)

case Event(StateTimeout, StateData(queue)) => goto(Active) using StateData(queue)

}

initialize()
}

Send the Flush message as soon as the number of requests reaches to the configured value. The processing actually happens in the transition Waiting – >.Active Probably the most tricky point is to not forget that when FSM is in the stateActive, new messages will arrive and should be processed by adding to a queue (or rather starting a new queue with the data from that message).  There might be a chance that size of request never reaches to the configured value. For that, we have used StateTimeOut if the message does not come for state timeout period, process all requests.

The aim of this blog is to demonstrate how Akka FSM can be used to throttle the messages and rescue the application from the crash.

For the complete example, the entire code is available on GitHub.

References:

  1. Akka FSM
  2. Throttling Messages in Akka

knoldus-advt-sticker


About Mahesh Chand Kandpal

Explorer + Technology Enthusiast + Foodie + Movie Buff
This entry was posted in Scala. Bookmark the permalink.

One Response to How to throttle messages using Akka Fsm ?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s