Apache Kafka with Spring Boot

Reading Time: 3 minutes

Overview of Apache Kafka


Apache Kafka is A high-throughput distribute streaming platform. It’s a publish-subscribe messaging rethought as a distributed commit log

A streaming platform has three key capabilities:

  1. Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system
  2. Store streams of records in a fault-tolerant durable way
  3. Process streams of records as they occur.

Apache Kafka’s Distributed Architecture :

Kafka is generally use for two broad classes of applications:

  1. Building real-time streaming data pipelines that reliably get data between systems or applications
  2. Building real-time streaming applications that transform or react to the streams of data.

Create Spring Boot Kafka Example :


Kafka Server Port

Define Kafka server port in the application. properties as given below.

knoldusExample.kafka.bootstrap-servers: localhost:9092


  1. Create Spring Boot Maven project
  2. Define the pom.xml as follows- Add the spring-kafka dependency.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.3.9.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.techgeeknext</groupId>
	<artifactId>kafaExample</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>kafaExample</name>
	<description>Spring Boot Kafka</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

Messages Producer :

3. Define the Configuration Class to produce the messages.

package com.knoldusExample.config;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {
    @Value("${knoldusExample.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

In the above example, ProducerFactory is responsible for creating Kafka Producer instances. In producerConfigs(), we configure the below properties:

  1. BOOTSTRAP_SERVERS_CONFIG – Host and port on which Kafka is running.
  2. KEY_SERIALIZER_CLASS_CONFIG – Serializer class to be use for the key.
  3. VALUE_SERIALIZER_CLASS_CONFIG – Serializer class to be use for the value. We are using StringSerializer for both keys and values.

4. Define the Service class to auto-wire the Kafka Template object to publish the message using knoldusexample-topic as shown.

package com.knoldusExample.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    String kafkaTopic = "knoldusexample-topic";

    public void send(String message) {

        kafkaTemplate.send(kafkaTopic, message);
    }
}

5. Now define the controller class for publishing the message through Kafka.

package com.knoldusExample.controller;

import com.knoldusExample.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping(value = "/knoldusexample-kafka/")
public class KafkaExampleController {

    @Autowired
    KafkaProducerService kafkaProducer;


    @GetMapping(value = "/producer")
    public String sendMessage(@RequestParam("message") String message)
    {
        kafkaProducer.send(message);
        return "Message sent Successfully to the Kafka topic knoldusexample-topic";
    }

}

Message Consumer :

6. Define Consumer configuration class to consume messages.

package com.knoldusExample.config;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${knoldusExample.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String>
                factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

7. Now, write a Listener method using @KafkaListener annotation to listen to the messages coming via knoldusexample-topic with group-id.

package com.knoldusExample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class KafaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafaExampleApplication.class, args);
    }

    @KafkaListener(topics = "knoldusexample-topic", groupId = "group-id")
    public void listen(String message) {

        System.out.println("Received Messasge : " + message);
    }
}

8. Next start the Spring Boot Application by running spring-boot:run.

9. Start zookeeper: This Kafka installation comes with a built-in zookeeper. Zookeeper is mainly used to track the status of the nodes present in the Kafka cluster and to keep track of Kafka topics, messages, etc.

Refer to Install Apache Kafka to know the steps to install Zookeeper and Kafka.

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

10. Start the Apache Kafka: Use the below command to start the Apache Kafka.

.\bin\windows\kafka-server-start.bat .\config\server.properties

11. Finally, hit the URL with Hello message as follows- 

http://localhost:8080/knoldusexample-kafka/producer?message=Hello

12. Then, hit the URL with a different message Next Message Testing as follows-

http://localhost:8080/knoldusexample-kafka/producer?message=Next Message Testing

Above 2 URLs will trigger the message to be sent to the knoldusexample-topic with group-id.

13. On the Spring Boot console, we can see the consumer started and messages are received by the consumer.

14. Also, you can verify that knoldusexample-topic with group-id has been created in Kafka Tool with other things like partition, consumer, producer, etc.


Knoldus-blog-footer-image

Written by 

Being a developer, I am responsible for the development, support, maintenance, and implementation of a complex project module. I should follow standard software development principles, I should be able to work independent team member, quite capable of applying plans and ideas which makes my task perfectly executed. I have in-depth knowledge of those programming languages which makes me capable to respond technical queries of team members

Leave a Reply