Communicating Through RabbitMQ In Scala

Reading Time: 3 minutes

Introduction to RabbitMQ

In this blog, we will be seeing how to integrate RabbitMQ with Scala. First, we will briefly discuss what RabbitMQ is and then we will quickly move on to the steps we require to use RabbitMQ in our application.

First things first, What is RabbitMQ?

RabbitMQ is a message broker which helps to communicate between microservices. With the increasing usage of microservice architecture in application development, there has been an increased reliance on message brokers like RabbitMQ

A message broker acts as a middle man between 2 microservices. One microservice sends a message to the broker and the broker then sends it to the other microservices that might be waiting for the message.

This helps us in developing applications that are asynchronous and resilient.

Integrating RabbitMQ with Scala

There are a few steps we need to follow create a RabbitMQ producer and consumer in Scala:

  1. Add the required dependencies
  2. Establishing a connection with RabbitMQ
  3. Creating a channel using the connection obtained
  4. Declare the queue onto which the publisher will publish and the consumer will consume the messages.
  5. Publish the message in the case of the Publisher and consume the message in the case of the Consumer.
  6. For the consumer, one additional step is required which is to create a callback when the message is received.

Adding Dependencies

Add the following dependency in your build.sbt file

libraryDependencies += "com.rabbitmq" % "amqp-client" % "5.14.2"

Creating a RabbitMQ Connection

We use the ConnectionFactory class present inside the package com.rabbitmq.client to create a connection. We can then set the username and password if required.

Finally, we call the newConnection() method to establish a connection.

  def getConnection: Try[Connection] = Try {

    val connectionFactory: ConnectionFactory = new ConnectionFactory()

    connectionFactory.setUsername("admin")

    connectionFactory.setPassword("password")

    connectionFactory.newConnection()

  }

Creating the channel

We can think of channels as lightweight connections over a single TCP connection. It is unwise to open multiple connections to the broker as they are resource-heavy.

Instead, we have a Channel interface inside com.rabbitmq.client package. The Channel can not exist independent of the Connection over which it is made.

As we close the connection all the associated channels are automatically closed.

def initializeConnection(): Try[Channel] = {

    // Get the Connection

    val rabbitMQConnection: Try[Connection] = RabbitMQConnection.getConnection  

    // Create Channel if connection is successfully established

    val connectionChannel: Try[Channel] = rabbitMQConnection match {

      case Failure(exception) =>

        logger.error("Could not establish connection with RabbitMQ")

        throw new Exception(exception.getMessage)

      case Success(connection) => Try(connection.createChannel())

    }

    connectionChannel
}

Declare the RabbitMQ Queue

We declare the queue using the queueDeclare method. The method takes, among other things, the name of the queue as a parameter.

channel.queueDeclare("greet", false, false, false, null)

The first argument is the name of the queue.

The second argument is a boolean value which denotes if the queue will survive a server restart

The third argument denotes if the queue is an exclusive queue i.e. it is restricted to that particular connection

The fourth argument denotes whether the server will delete the queue if it is no longer used.

The fifth argument is to pass additional properties to the queue

Publishing/ Consuming messages

// To pusblish a message

channel.basicPublish("", "greet", null, message.getBytes(StandardCharsets.UTF_8)) 

// To consume a message

channel.basicConsume("greet", true, "", deliverCallback, cancelCallback)


basicPublish method takes the following arguments:

  1. Name of the exchange. We can think of an exchange as a postman. It decides which message will go to which queue. For simplicity, we will be using an exchange system to route the message to the queue whose name we pass as a parameter.
  2. routingKey: It is the routingKey that helps the exchange to decide to which queue we should route the message
  3. props: Other properties of the message like routing headers etc..
  4. body: The message body

basicConsume method takes the folowing arguments”

  1. queue: Name of the queue
  2. autoAck: if true then the server does not require explicit acknowledgment.
  3. consumerTag: client generates a consumer tag to establish context.
  4. deliverCallback: callback when the message reaches the consumer.
  5. cancelCallback: Callback interface that is notified of the cancellation of a consumer.

Creating callbacks for the consumer

// Callback in case the message is delivered

val deliverCallback: DeliverCallback = (consumerTag: String, message: 

Delivery) => 

{

    val messageBody = new String(message.getBody, "UTF-8")

    logger.info(s"[x] Received '$messageBody'")

  }

// Callback in case the consumer is cancelled

val cancelCallback: CancelCallback = (consumerTag: String) => 

logger.info("Cancelled")

Conclusion

After following all the above steps we will successfully create a simple producer and consumer applications that communicates by passing messages to the RabbitMQ broker.

We can also check the messages by logging into the management UI provided by the RabbitMQ at http://localhost:15672/

References: https://www.rabbitmq.com/tutorials/tutorial-one-java.html