A Java Lagom service which only consumes from Kafka topic (Subscriber only service)


Subscriber only service means an application which only consumes, does not produce.

We have generally seen the applications which both produces and consumes data from a Kafka topic but sometimes we need to write an application which only consumes data i.e. consumes data from a 3rd party service. So in this blog I am going to explain how to write a Lagom service which only consumes data (does not produce any data).

Add below dependency to your impl pom.xml:

<dependency>
  <groupId>com.lightbend.lagom</groupId>
  <artifactId>lagom-javadsl-kafka-client_2.11</artifactId>
  <version>${lagom.version}</version>
</dependency>

We need to create a separate module kafka-connect in Lagom project. That module will contain a Service Descriptor declaring the topic service will consume from.

package com.knoldus.kafka;

import com.lightbend.lagom.javadsl.api.Descriptor;
import com.lightbend.lagom.javadsl.api.Service;
import com.lightbend.lagom.javadsl.api.broker.Topic;

import static com.lightbend.lagom.javadsl.api.Service.named;
import static com.lightbend.lagom.javadsl.api.Service.topic;

public interface KafkaService extends Service {

    Topic<String> greetingsTopic();

    String GREETINGS_TOPIC = "greetings";

    @Override
    default Descriptor descriptor() {
        return named("kafkaservice").withTopics(
                topic(GREETINGS_TOPIC, this::greetingsTopic)
        ).withAutoAcl(true);
    }
}

Then, add kafka-connect dependency in impl module:

<dependency>
  <artifactId>kafka-connect</artifactId>
  <groupId>com.knoldus.lagom-kafka-consumer-only</groupId>
  <version>1.0-SNAPSHOT</version>
</dependency>

Then, to consume data implement consumer as below:

@Singleton
public class KafkaConsumer {

    private final KafkaService kafkaService;
    private ObjectMapper jsonMapper = new ObjectMapper();

    @Inject
    public KafkaConsumer(KafkaService kafkaService) {
        this.kafkaService = kafkaService;
        kafkaService.greetingsTopic().subscribe()
                .atLeastOnce(Flow.fromFunction(this::displayMessage));
    }

    private Done displayMessage(String message) {
        System.out.println("Message :::::::::::  " + message);
        try {
            GreetingMessage greetingMessage = jsonMapper.readValue(message, GreetingMessage.class);
            if (StringUtils.isNotEmpty(greetingMessage.message)) {
                System.out.println("Action performed :::::::::::  " + message);

                // Do your action here
            }
        } catch (Exception ex) {
            System.out.println("Error in consuming kafka message");
        }
        return Done.getInstance();
    }
}

Finally, you need to bind KafkaConsumer as Eager singleton and KafkaService in HelloModule:

bind(KafkaConsumer.class).asEagerSingleton();
bindClient(KafkaService.class);

To connect with external kafka, made below changes in root pom:

<plugin>
  <groupId>com.lightbend.lagom</groupId>
  <artifactId>lagom-maven-plugin</artifactId>
  <version>${lagom.version}</version>
  <configuration>
    <kafkaEnabled>false</kafkaEnabled>
  </configuration>
</plugin>

and, add below configuration in application.conf file:

lagom.broker.kafka {
  service-name = ""

  brokers = "127.0.0.1:9092"
  brokers = ${?KAFKA_BROKERS}
}

That’s it. Now you need to start zookeper and Kafka at your local machine, create topic greetings and then produce any message in Kafka. As a message will be produced in Kafka, consumer will consume data and will print the message.

I hope you enjoyed the blog.

You can get full code here.

 

Advertisements

About Rishi Khandelwal

Sr. Software Consultant having more than 6 years industry experience. He has working experience in various technologies such as Scala, Java, Play, Akka, Spark, Hive, Cassandra, Akka-http, ElasticSearch, Backbone.js, html5, javascript, Less, Amazon EC2, WebRTC, SBT
This entry was posted in Akka, Apache Kafka, Architecture, Best Practices, big data, Functional Programming, github, Java, MessagesAPI, Microservices, Scala. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s