Apache Camel ActiveMQ with Producer and Consumer Service

Reading Time: 4 minutes

Apache Camel Introduction

  • Apache Camel is java based integration framework.
  • It provides runtime support for Spring boot, Quarks, JavaEE, Micro profile, OSGI, standalone etc.
  • Camel is based on Enterprise Integration Patterns.
  • Come with 300+ components(Connectors).
  • DSL(java or XML) to describe integration flow(routers).
  • Can integrate almost everything.

Apache Camel is a black box that receives messages from some endpoint and sends it to other one. Within the black box, the messages may be processed or simply redirected.

How to install and login into ActiveMQ console in Ubuntu

1.Download ActiveMQ from the Apache.

$ wget http://archive.apache.org/dist/activemq/5.16.3/apache-activemq-5.16.3-bin.tar.gz

2. Extract the downloaded file.



$ sudo tar -xvzf apache-activemq-5.16.3-bin.tar.gz

3. Create a directory named /opt/activemq.



$ sudo mkdir /opt/activemq

4. Move the extracted files to the /opt/activemq directory.

$ sudo mv apache-activemq-5.16.3/* /opt/activemq

5. Create a group account activemq to run Apache ActiveMQ.

$ sudo addgroup --quiet --system activemq

6. Create a user for the group.

$ sudo adduser --quiet --system --ingroup activemq --no-create-home --disabled-password activemq

7. Change the permissions of the /opt/activemq directory.

$ sudo chown -R activemq:activemq /opt/activemq

8. Create an ActiveMQ systemd service file to control the Apache ActiveMQ service.

$ sudo nano /etc/systemd/system/activemq.service

9. Add the below code into the file. Save and close the file.

[Unit]
Description=Apache ActiveMQ
After=network.target
[Service]
Type=forking
User=activemq
Group=activemq

ExecStart=/opt/activemq/bin/activemq start
ExecStop=/opt/activemq/bin/activemq stop

[Install]
WantedBy=multi-user.target

10. Edit the jetty.xml configuration file to change the host.

$ sudo nano /opt/activemq/conf/jetty.xml

11. Find the line below:

<property name="host" value="127.0.0.1"/> replace with <property name="host" value="0.0.0.0"/>

12. Save and close the file.

13. Reload the system daemon.

$ sudo systemctl daemon-reload

14. Start the Apache ActiveMQ service.

$ sudo systemctl start activemq

15. Verify the status as given in image:

Let’s get started on the project now. In this section will create two service:

  • Apache Camel ActiveMQ Message Producer Service
  • Apache Camel ActiveMQ Message Consumer Service

Apache Camel Message Producer and Consumer Service

Project Structure

1.Create Spring Boot Maven project with Apache Camel dependencies from spring initializr.

2.Import camel-spring-boot-starter and camel-activemq-starter dependencies in pom.xml.

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
	<groupId>org.apache.camel.springboot</groupId>
	<artifactId>camel-spring-boot-starter</artifactId>
	<version>3.17.0</version>
</dependency>

3. Configure Active MQ in application.properties

server.port = 9091
activemq.broker.url=tcp://localhost:61616

4. Write code for producer and consumer routes as given under:

Create SpringActiveMQConfig.java

package com.spring.apachecamel.config;

import javax.jms.Queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;

@Configuration
@EnableJms
public class SpringActiveMQConfig {
    @Value("${activemq.broker.url}")
    private String brokerUrl;

    @Bean
    public Queue queue() {
        return new ActiveMQQueue("camelConsumerExample-One");
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL(brokerUrl);
        return activeMQConnectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        return new JmsTemplate(activeMQConnectionFactory());
    }

}

Create Dto Product.java

package com.spring.apachecamel.dto;

import java.io.Serializable;

public class Product implements Serializable {
    private static final long serialVersionUID = 1L;
    private String productId;
    private String name;
    private String productDetails;

    public String getProductId() {
        return productId;
    }

    public void setProductId(String productId) {
        this.productId = productId;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getProductDetails() {
        return productDetails;
    }

    public void setProductDetails(String productDetails) {
        this.productDetails = productDetails;
    }

}

Consumer Route

package com.spring.apachecamel.consumer;


import javax.jms.Queue;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.spring.apachecamel.dto.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/consume")
public class Consumer {
    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Queue queue;

    @GetMapping("/message")
    public Product consumeMessage() {

        Product product = null;
        try {
            ObjectMapper mapper = new ObjectMapper();
            String jsonMessage = (String) jmsTemplate.receiveAndConvert(queue);
            product = mapper.readValue(jsonMessage, Product.class);

        } catch (Exception e) {
            e.printStackTrace();
        }
        return product;
    }
}

Producer Route

package com.spring.apachecamel.producer;

import com.spring.apachecamel.dto.Product;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import com.fasterxml.jackson.databind.ObjectMapper;

import javax.jms.Queue;

@RestController
@RequestMapping("/produce")
public class Producer {
    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired
    private Queue queue;

    @PostMapping("/message")
    public Product sendMessage(@RequestBody Product product) {

        try {
            ObjectMapper mapper = new ObjectMapper();
            String productAsJson = mapper.writeValueAsString(product);

            jmsTemplate.convertAndSend(queue, productAsJson);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return product;
    }
}

5. Deploy the application and send the message using postman.

6. Let’s login to ActiveMQ console with url

http://localhost:8161/admin/
credentials as username="admin" and password="admin".

7. Click on queue and verify number of pending messages should be one.

8. Let’s consume http://localhost:9091/consume/message endpoint.

9. Verify the ActiveMQ console. Number of pending message should be zero.

Conclusion

We have seen what is all about Apache Camel, how to Install ActiveMQ in Ubuntu. Along with this, we also learned how to write code for SpringBoot ActiveMQ Producer and Consumer Service(using rest endpoints) from scratch.

I hope the above information helps to get started with Apache Camel Integration with ActiveMQ. Continue learning about camel by clicking here.