AMQP and AKKA


AMQP is a message protocol that deals with publishers and consumers. It would look a lot like JMS but it is not. The main entities are Exchanges, Queues and Bindings. Look at the following diagram

So a producer would send a message to the exchange and it is the job of the message broker (RabbitMQ in our case) to ensure that the messages are delivered to the right queue.
But first the connections have to be built so that publishers can publish on the exchange and consumers can listen from it. Let us see how this is done in Akka in our project. Our framework gets initialized by the following method

def initializeFramework(xml: Elem) {
    connectionInfo = getAMQPChannelInformation(xml)
    val connection = getAMQPChannel(connectionInfo)
    val exchangeParameters = getExchangeParameters(connectionInfo)
    instantiateProducer(connection, exchangeParameters)

    logger.info("VajraPM is now initialized and ready to process messages...")
  }

I have taken out the business logic and left out the methods which would form the core of this post. Let us go method by method, first, we get all the connection information which would be relevant for us to make the connection

def getAMQPChannelInformation(xml: Elem): ConnectionInfo = {
    val momServer = (xml  "VAJRA"  "RABBITSERVER").text
    val momPort = (xml  "VAJRA"  "RABBITPORT").text
    val userName = (xml  "VAJRA"  "USERNAME").text
    val password = (xml  "VAJRA"  "PASSWORD").text
    val queueName = (xml  "VAJRA"  "QUEUENAME").text
    val exchangeName = (xml  "VAJRA"  "EXCHANGENAME").text
    val durable = (xml  "VAJRA"  "DURABLE").text
    val connectionInfo = new ConnectionInfo(momServer, momPort, userName, password, queueName, exchangeName, durable)
    connectionInfo
  }

Here, use the Scala XML magic and get done soon. Our sample XML through which

<VAJRA>
		<RABBITSERVER>localhost</RABBITSERVER>
		<RABBITPORT>5672</RABBITPORT>
		<USERNAME>guest</USERNAME>
		<PASSWORD>guest</PASSWORD>
		<QUEUENAME>myqueue</QUEUENAME>
		<EXCHANGENAME>myexchange</EXCHANGENAME>
	</VAJRA>

Now let us see how we get a connection

def getAMQPChannel(connectionInfo: ConnectionInfo): ActorRef = {
    val myAddresses = Array(new Address(connectionInfo.momServer, connectionInfo.momPort.toInt))
    val connectionParameters = ConnectionParameters(myAddresses, connectionInfo.userName, connectionInfo.password)
    val connection = AMQP.newConnection(connectionParameters)
    connection
  }

As you would notice, all the connection parameters are passed to the akka.amqp.AMQP.ConnectionParameters and a connection object is received.

Once we have the connection, let us instantiate the producer. The method instantiateProducer(connection, exchangeParameters) looks like this

 def instantiateProducer(connection: ActorRef, exchangeParameters: ExchangeParameters) {
    producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters)))
  }

As you would notice, we pass exchangeParameters to the producer. You would see in the diagram above that AMQP producer requires a reference of the exchange to which it can send messages. We get the exchange information as follows

  def getExchangeParameters(connectionInfo: ConnectionInfo): ExchangeParameters = {
    var exchangeParameters: ExchangeParameters = ExchangeParameters(connectionInfo.exchangeName, Topic)
    connectionInfo.durable match {
      case "DURABLE" => exchangeParameters = ExchangeParameters(connectionInfo.exchangeName, Topic, ActiveDeclaration(durable = true, autoDelete = false))
      case _ => exchangeParameters
    }
    exchangeParameters
  }

If in our configuration we define the exchange to be durable, then we set the durable and autodelete properties of the exchange.

This would set up our producer actor and we would be able to send messages to the producer like this with the bang operator

producer ! Message(message)

As soon as the producer actor would receive the message, it would put it on the exchange that we have specified. Now, let us look at the consumer

 def createConsumer(exchangeParameters: ExchangeParameters, connection: ActorRef, queueName: String, actor: ActorRef, connectionInfo: ConnectionInfo): ActorRef = {
    connectionInfo.durable match {
      case "DURABLE" => {
        val queueDeclaration = ActiveDeclaration(durable = true, autoDelete = false)
        AMQP.newConsumer(connection, ConsumerParameters("@vajrafeed", actor, Some(queueName), Some(exchangeParameters), queueDeclaration))
      }
      case _ => AMQP.newConsumer(connection, ConsumerParameters("@vajrafeed", actor, Some(queueName), Some(exchangeParameters)))
    }
  }

Hence, now as shown in the figure above, we have a consumer listening on the queue called myqueue which is listening to the exchange myexchange. You would notice that we have a “@vajrafeed” This is the binding key which lets the messgage broker decide, which messages it should pass on onto the consumer. If there was another consumer attached to the queue with a binding key say @nyse then the messages for @vajrafeed would not be delivered to it. You can get more information about AMQP here.

About Vikas Hazrati

Vikas is the CTO @ Knoldus which is a group of software industry veterans who have joined hands to add value to the art of software development. We do niche product and project development on Scala and Java. We consult and coach on effective software development and agile practices. With our focus on software craftsmanship you can be assured of a good quality at the right price. To know more, send a mail to info@knoldus.com or visit www.knoldus.com
This entry was posted in Architecture, Scala and tagged , , , . Bookmark the permalink.

2 Responses to AMQP and AKKA

  1. Pingback: Power of Parallel Processing in Akka « Inphina Thoughts

  2. Pingback: AMQP and AKKA « Inphina Thoughts

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s