Overview of Apache Kafka
Apache Kafka is A high-throughput distribute streaming platform. It’s a publish-subscribe messaging rethought as a distributed commit log
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system
- Store streams of records in a fault-tolerant durable way
- Process streams of records as they occur.
Apache Kafka’s Distributed Architecture :

Kafka is generally use for two broad classes of applications:
- Building real-time streaming data pipelines that reliably get data between systems or applications
- Building real-time streaming applications that transform or react to the streams of data.
Create Spring Boot Kafka Example :
Kafka Server Port
Define Kafka server port in the application. properties as given below.
knoldusExample.kafka.bootstrap-servers: localhost:9092
- Create Spring Boot Maven project
- Define the pom.xml as follows- Add the spring-kafka dependency.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.techgeeknext</groupId>
<artifactId>kafaExample</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafaExample</name>
<description>Spring Boot Kafka</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Messages Producer :
3. Define the Configuration Class to produce the messages.
package com.knoldusExample.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaProducerConfig {
@Value("${knoldusExample.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
In the above example, ProducerFactory is responsible for creating Kafka Producer instances. In producerConfigs(), we configure the below properties:
- BOOTSTRAP_SERVERS_CONFIG – Host and port on which Kafka is running.
- KEY_SERIALIZER_CLASS_CONFIG – Serializer class to be use for the key.
- VALUE_SERIALIZER_CLASS_CONFIG – Serializer class to be use for the value. We are using StringSerializer for both keys and values.
4. Define the Service class to auto-wire the Kafka Template object to publish the message using knoldusexample-topic as shown.
package com.knoldusExample.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
String kafkaTopic = "knoldusexample-topic";
public void send(String message) {
kafkaTemplate.send(kafkaTopic, message);
}
}
5. Now define the controller class for publishing the message through Kafka.
package com.knoldusExample.controller;
import com.knoldusExample.service.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping(value = "/knoldusexample-kafka/")
public class KafkaExampleController {
@Autowired
KafkaProducerService kafkaProducer;
@GetMapping(value = "/producer")
public String sendMessage(@RequestParam("message") String message)
{
kafkaProducer.send(message);
return "Message sent Successfully to the Kafka topic knoldusexample-topic";
}
}
Message Consumer :
6. Define Consumer configuration class to consume messages.
package com.knoldusExample.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${knoldusExample.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
7. Now, write a Listener method using @KafkaListener annotation to listen to the messages coming via knoldusexample-topic with group-id.
package com.knoldusExample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class KafaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(KafaExampleApplication.class, args);
}
@KafkaListener(topics = "knoldusexample-topic", groupId = "group-id")
public void listen(String message) {
System.out.println("Received Messasge : " + message);
}
}
8. Next start the Spring Boot Application by running spring-boot:run.
9. Start zookeeper: This Kafka installation comes with a built-in zookeeper. Zookeeper is mainly used to track the status of the nodes present in the Kafka cluster and to keep track of Kafka topics, messages, etc.
Refer to Install Apache Kafka to know the steps to install Zookeeper and Kafka.
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
10. Start the Apache Kafka: Use the below command to start the Apache Kafka.
.\bin\windows\kafka-server-start.bat .\config\server.properties
11. Finally, hit the URL with Hello message as follows-
http://localhost:8080/knoldusexample-kafka/producer?message=Hello
12. Then, hit the URL with a different message Next Message Testing as follows-
http://localhost:8080/knoldusexample-kafka/producer?message=Next Message Testing
Above 2 URLs will trigger the message to be sent to the knoldusexample-topic with group-id.
13. On the Spring Boot console, we can see the consumer started and messages are received by the consumer.
14. Also, you can verify that knoldusexample-topic with group-id has been created in Kafka Tool with other things like partition, consumer, producer, etc.
