When akka stream meets RabbitMQ


“Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.”  – that is how reactive streams are defined in wikipedia. There are two other implementation to reactive streams then Akka-streams i.e. reactor and Netflix’s RxJava. However, since the reactive stream manifesto published the only mature implementation available is akka-stream. According to reactive manifesto the implementation must follow the following properties i.e. Responsive, Resilient, Elastic and Messaage driven.

reactive-systems-9-638

Now the question arise why do we need reactive streams. Well wikipedia gives that answer too “The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—like passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded.”. Though Kafka is a perfect fit for this, we are going to RabbitMQ with this. No doubt many would debate on using of RabbitMQ, however for different application requirement we need to use different tactics. The reason for using RabbitMQ instead of Kafka here is to avoid the hassle of setting up Kafka with Zookeeper. For a small application with less load, using kafka and zookeeper feels a bit overhead, I know you may debate over it but that is how I feel and many other too.

We have talked about reactive streams, akka streams and a bit of Kafka too, but let’s talk a bit about RabbitMQ too before we go to the implementation part. Well, RabbitMQ is a lightweight, easy to deploy messaging service. It supports multiple messaging protocols. It is the most widely deployed open source message broker. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Without further delay let us focus on implementing Akka-stream with RabbitMQ now. For the integration part we are going use open source library called op-rabbit. It is a high-level, type-safe, opinionated, composable, fault-tolerant library for interacting with RabbitMQ. It has different features like Recovery, Integration, Modular, Modeled, Reliability, Graceful shutdown and etc. Like we have mentioned earlier, combining an akka-stream rabbitmq consumer and publisher allows for guaranteed at-least-once message delivery from head to tail. In other words, don’t acknowledge the original message from the message queue until any and all side-effect events have been published to other queues and persisted. And we can do so with following code snippet

import com.spingo.op_rabbit.RabbitControl
import akka.actor.{ActorSystem, Props}
object OpRabbitController {
  implicit val actorSystem = ActorSystem("such-system")
  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
}
import com.spingo.op_rabbit._
import com.spingo.op_rabbit.stream._
import com.timcharper.acked.AckedSource
import play.api.libs.json.Format

object OpRabbitProducer {
  import OpRabbitController._
  implicit val workFormat = Format[Work]
  AckedSource(1 to 15).
    map(Message.queue(_, "queueName")).
    to(MessagePublisherSink(rabbitControl))
    .run
}
case class Work(id: String)

This example is similar to what is mentioned in the library.

Basically this piece of code describes how we can produce the message and send it to the rabbitmq broker so that it could be consume by the consumer at the other end. Similarly in order to consume messages we can do the following.

import com.spingo.op_rabbit._
import com.spingo.op_rabbit.stream._
import Directives._

object OpRabbitConsumer {
  import OpRabbitController._
  implicit val recoveryStrategy = RecoveryStrategy.drop()
  RabbitSource(
  rabbitControl,
  channel(qos = 3),
  consume(queue(
    "such-queue",
    durable = true,
    exclusive = false,
    autoDelete = false)),
  body(as[Person])).
  runForeach { person =>
    greet(person)
  }
  def greet(person: Person) = {
    println(person.id)
  }
}
case class Person(id: String)

Finally, we can see that how easy it is to integrate both of them. In order to use RabbitMQ with Akka-stream don’t forget to install RabbitMQ in your system. You can do so by going the this link. And for more detail on the op-rabbit you can visit there github repository.

KNOLDUS-advt-sticker

This entry was posted in Scala. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s