Spring Cloud GCP – Send Pub/Sub Messages and its implementation with SpringBoot

Reading Time: 5 minutes

Overview

Pub/Sub is an asynchronous communications system that is both dependable and scalable. The service is based on a Google infrastructure component that has been used by numerous Google products for over a decade.
This infrastructure is used by Google products such as Ads, Search, and G-mail to send over 500 million messages per second, totaling over 1TB/s of data.

Basics of a Publish/Subscribe Service

Pub/Sub is a publish/subscribe (Pub/Sub) service, which is a messaging service in which message senders and receivers are separated. In a Pub/Sub service, there are several fundamental concepts:

1.The data that goes through the service is referred to as a message.

2. A named entity that represents a message feed is referred to as a topic.

3. A subscription is a named entity that expresses a desire to receive messages on a specific topic.

4. Publisher (also known as a producer): generates messages and sends them to the messaging service on a certain topic (publishes).

5. A subscriber (also known as a consumer) is someone who receives messages based on a subscription.

Implementation

In your pom.xml, include the dependencies. The spring-cloud-gcp-starter-pubsub, spring-integration-core, and spring-cloud-gcp-dependencies are the most important dependencies for Pub/Sub.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.2</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.gcp.pubsub</groupId>
	<artifactId>demo</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>11</java.version>
		<spring-cloud-gcp.version>1.2.5.RELEASE</spring-cloud-gcp.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-core</artifactId>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-gcp-dependencies</artifactId>
				<version>${spring-cloud-gcp.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

The main file with the annotation SpringBootApplication

package com.cloudgcp.pubsub.demo;

import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication implements CommandLineRunner {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

	@Override
	public void run(String... args) throws Exception {
		System.out.println("Application Started!!");
	}
}

Consuming Pub/Sub Messages

We have created Topics, subscriptions, and set up a maven project. The first thing we need to add the subscription that we created in application.properties

spring.cloud.gcp.project-id=staticweb-test
pubsub.subscription=projects/staticweb-test/subscriptions/s-dummy-bucket

There are two abstract methods one is for subscription and another is consume.

package com.cloudgcp.pubsub.demo.consumer;

import com.google.cloud.pubsub.v1.Subscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage;

import java.util.function.Consumer;

public abstract class PubSubConsumer {

    @Autowired
    private PubSubTemplate pubSubTemplate;

    /* Name of the Subscription */
    public abstract String subscription();

    protected abstract void consume(BasicAcknowledgeablePubsubMessage message);

    public Consumer<BasicAcknowledgeablePubsubMessage> consumer() {
        return basicAcknowledgeablePubsubMessage -> consume(basicAcknowledgeablePubsubMessage);
    }

    public Subscriber consumeMessage() {
        return this.pubSubTemplate.subscribe(this.subscription(), this.consumer());
    }
}

we are getting the subscription from the application.properties file. We are using EventListener annotation to start listening when the application is ready.

package com.cloudgcp.pubsub.demo.consumer;

import com.google.pubsub.v1.PubsubMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Component
public class DemoConsumer extends PubSubConsumer {

    private static final Logger LOG = LoggerFactory.getLogger(DemoConsumer.class);

    @Autowired
    private PubSubTemplate pubSubTemplate;

    @Value("${pubsub.subscription}")
    private String subscription;

    @Override
    public String subscription() {
        return this.subscription;
    }

    @Override
    protected void consume(BasicAcknowledgeablePubsubMessage basicAcknowledgeablePubsubMessage) {

        PubsubMessage message = basicAcknowledgeablePubsubMessage.getPubsubMessage();

        try {
            System.out.println(message.getData().toStringUtf8());
            System.out.println(message.getAttributesMap());
            String objectName = message.getAttributesMap().get("objectId");
            String bucketName = message.getAttributesMap().get("bucketId");
            String eventType = message.getAttributesMap().get("eventType");

            LOG.info("Event Type:::::" + eventType);
            LOG.info("File Name::::::" + objectName);
            LOG.info("Bucket Name::::" + bucketName);


        }catch(Exception ex) {
            LOG.error("Error Occured while receiving pubsub message:::::", ex);
        }
        basicAcknowledgeablePubsubMessage.ack();
    }

    @EventListener(ApplicationReadyEvent.class)
    public void subscribe() {
        LOG.info("Subscribing {} to {} ", this.getClass().getSimpleName(), this.subscription());
        pubSubTemplate.subscribe(this.subscription(), this.consumer());
    }
}

We print the EventType, FileName, and Bucket Name before acknowledging the message when we receive it.

We upload the input.txt file to the appropriate bucket, and our Spring boot App receives the notification, and printing object Name, fileName, and other information.

We get the body and attribute map by accessing these methods on the message object.

message.getData().toStringUtf8()
message.getAttributesMap()

Publishing Pub/Sub Messages

We need to create another topic using the following command and after that we need to verify in the GCP console.

gcloud pubsub topics create t-another-topic

we have to create a subscription for this topic, otherwise, all the published messages will be lost.


We need to create an abstract class for the publisher. We are using PubSubTemplate from the Spring framework to publish the messages.

package com.cloudgcp.pubsub.demo.publisher;

import com.google.pubsub.v1.PubsubMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;

import java.util.concurrent.ExecutionException;

public abstract class PubSubPublisher {

    private static final Logger LOG = LoggerFactory.getLogger(PubSubPublisher.class);

    @Autowired
    private PubSubTemplate pubSubTemplate;

    protected abstract String topic();

    public void publish(PubsubMessage pubsubMessage) throws ExecutionException, InterruptedException {
        LOG.info("Publishing to the topic [{}], message [{}]", topic(), pubsubMessage);
        pubSubTemplate.publish(topic(), pubsubMessage).get();
    }
}

Now configure the topic in the application.properties

package com.cloudgcp.pubsub.demo.publisher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class DemoPublisher extends PubSubPublisher {

    private static Logger LOG = LoggerFactory.getLogger(DemoPublisher.class);

    @Value("${pubsub.topic}")
    private String topic;

    @Override
    protected String topic() {
        return this.topic;
    }
}

We have already seen the consuming part. Now We are publishing another topic once we receive the message. Here is the Consumer file where we are publishing the message as soon as we received it.

package com.cloudgcp.pubsub.demo.consumer;

import com.cloudgcp.pubsub.demo.publisher.DemoPublisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

@Component
public class DemoConsumer extends PubSubConsumer {

    private static final Logger LOG = LoggerFactory.getLogger(DemoConsumer.class);

    @Autowired
    private PubSubTemplate pubSubTemplate;

    @Autowired
    private DemoPublisher demoPublisher;

    @Value("${pubsub.subscription}")
    private String subscription;

    @Override
    public String subscription() {
        return this.subscription;
    }

    @Override
    protected void consume(BasicAcknowledgeablePubsubMessage basicAcknowledgeablePubsubMessage) {

        PubsubMessage message = basicAcknowledgeablePubsubMessage.getPubsubMessage();

        try {
            System.out.println(message.getData().toStringUtf8());
            System.out.println(message.getAttributesMap());
            String objectName = message.getAttributesMap().get("objectId");
            String bucketName = message.getAttributesMap().get("bucketId");
            String eventType = message.getAttributesMap().get("eventType");

            LOG.info("Event Type:::::" + eventType);
            LOG.info("File Name::::::" + objectName);
            LOG.info("Bucket Name::::" + bucketName);

            String messageId = "messageId " + UUID.randomUUID();
            String pubMessage = "File Name Received " + objectName + "From Bucket " + bucketName + "For the event type::" + eventType;
            publishMessage(messageId, message.getAttributesMap(), pubMessage);
        }catch(Exception ex) {
            LOG.error("Error Occured while receiving pubsub message:::::", ex);
        }
        basicAcknowledgeablePubsubMessage.ack();
    }

    public void publishMessage(String messageId, Map<String, String> attributeMap, String message) throws ExecutionException, InterruptedException {
        LOG.info("Sending Message to the topic:::");
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                .putAllAttributes(attributeMap)
                .setData(ByteString.copyFromUtf8(message))
                .setMessageId(messageId)
                .build();

        demoPublisher.publish(pubsubMessage);
    }

    @EventListener(ApplicationReadyEvent.class)
    public void subscribe() {
        LOG.info("Subscribing {} to {} ", this.getClass().getSimpleName(), this.subscription());
        pubSubTemplate.subscribe(this.subscription(), this.consumer());
    }
}

We have autowired demoPublisher and creates a separate method for publishing. First, we need to build the PubSubMessage and pass these messages as an argument to the publish method.

Summary

  • Google Pub/Sub is an asynchronous messaging service that decouples services that produce events from services that process events.
  • Pub/Sub is a messaging-oriented middleware or event ingestion and delivery for streaming analytics pipelines and it can integrate components in GCP.
  • The topic is a resource in which all the publishers publish their messages. All the subscribers which are subscribed to this topic receive messages.
  • The Subscription is a resource that represents the stream of messages from a single Topic. You create a subscription for a specific topic.
  • The Message is the actual message that is sent to the topic and subscribers get this message when they are subscribed to the Topic. This contains an actual message and attributes.
  • The Message Attributes are the key value pairs that can be sent along with the message so that it can signify some information about the message.
  • One more thing we need to understand with this Cloud Pub/Sub is that communication can be fan-out(one to many) or fan-in (many to one) or many to many.

Conclusion

On GCP Cloud Pub/Sub, we learned how to subscribe to topics and publish messages. This Pub/Sub Service has a lot of applications, and it’s a great way to decouple application components.

Leave a Reply