When akka stream meets RabbitMQ

Table of contents
Reading Time: 3 minutes

“Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.”  – that is how reactive streams are defined in wikipedia. There are two other implementation to reactive streams then Akka-streams i.e. reactor and Netflix’s RxJava. However, since the reactive stream manifesto published the only mature implementation available is akka-stream. According to reactive manifesto the implementation must follow the following properties i.e. Responsive, Resilient, Elastic and Messaage driven.

reactive-systems-9-638

Now the question arise why do we need reactive streams. Well wikipedia gives that answer too “The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—like passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, back pressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded.”. Though Kafka is a perfect fit for this, we are going to RabbitMQ with this. No doubt many would debate on using of RabbitMQ, however for different application requirement we need to use different tactics. The reason for using RabbitMQ instead of Kafka here is to avoid the hassle of setting up Kafka with Zookeeper. For a small application with less load, using kafka and zookeeper feels a bit overhead, I know you may debate over it but that is how I feel and many other too.

We have talked about reactive streams, akka streams and a bit of Kafka too, but let’s talk a bit about RabbitMQ too before we go to the implementation part. Well, RabbitMQ is a lightweight, easy to deploy messaging service. It supports multiple messaging protocols. It is the most widely deployed open source message broker. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements.

Without further delay let us focus on implementing Akka-stream with RabbitMQ now. For the integration part we are going use open source library called op-rabbit. It is a high-level, type-safe, opinionated, composable, fault-tolerant library for interacting with RabbitMQ. It has different features like Recovery, Integration, Modular, Modeled, Reliability, Graceful shutdown and etc. Like we have mentioned earlier, combining an akka-stream rabbitmq consumer and publisher allows for guaranteed at-least-once message delivery from head to tail. In other words, don’t acknowledge the original message from the message queue until any and all side-effect events have been published to other queues and persisted. And we can do so with following code snippet

import com.spingo.op_rabbit.RabbitControl
import akka.actor.{ActorSystem, Props}
object OpRabbitController {
  implicit val actorSystem = ActorSystem("such-system")
  val rabbitControl = actorSystem.actorOf(Props[RabbitControl])
}
import com.spingo.op_rabbit._
import com.spingo.op_rabbit.stream._
import com.timcharper.acked.AckedSource
import play.api.libs.json.Format

object OpRabbitProducer {
  import OpRabbitController._
  implicit val workFormat = Format[Work]
  AckedSource(1 to 15).
    map(Message.queue(_, "queueName")).
    to(MessagePublisherSink(rabbitControl))
    .run
}
case class Work(id: String)

This example is similar to what is mentioned in the library.

Basically this piece of code describes how we can produce the message and send it to the rabbitmq broker so that it could be consume by the consumer at the other end. Similarly in order to consume messages we can do the following.

import com.spingo.op_rabbit._
import com.spingo.op_rabbit.stream._
import Directives._

object OpRabbitConsumer {
  import OpRabbitController._
  implicit val recoveryStrategy = RecoveryStrategy.drop()
  RabbitSource(
  rabbitControl,
  channel(qos = 3),
  consume(queue(
    "such-queue",
    durable = true,
    exclusive = false,
    autoDelete = false)),
  body(as[Person])).
  runForeach { person =>
    greet(person)
  }
  def greet(person: Person) = {
    println(person.id)
  }
}
case class Person(id: String)

Finally, we can see that how easy it is to integrate both of them. In order to use RabbitMQ with Akka-stream don’t forget to install RabbitMQ in your system. You can do so by going the this link. And for more detail on the op-rabbit you can visit there github repository.

KNOLDUS-advt-sticker

Written by 

Pranjut Gogoi is the enthusiast of Machine Learning and AI with 8+ years of experience. He is been implementing different machine learning projects in Knoldus. He started an initiative called MachineX through which they share knowledge with the world. With this initiative, he broadcasts different free webinars, write different blogs and contributes to open source communities on machine learning and AI.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading