Testing Spring Embedded Kafka consumer and producer

Reading Time: 2 minutes

This blog I’m talking about the Kafka testing without physical installation of Kafka services or docker container.
For testing, I’m going to use another Spring library that is called spring-kafka-test. It provides much functionality to ease our job in the testing process and takes care of Kafka consumer or a producer works as expected.

Maven Test Dependencies



application.yml props file

These are the minimum configuration for Kafka producer and consumer API.

    active: test
       name: test-topic
      auto-offset-reset: earliest
      group-id: test-group
      ack-mode: manual
      client-id: test-client-id
      concurrency: 10
      type: batch
      client-id: admin-test-client-id

Kafka Producer

The Producer API allows applications to send streams of data to topics in the Kafka cluster.

public class KafkaProducer {
    private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
    private final KafkaTemplate<String, BankModel> kafkaTemplate;
    private String topic;

public KafkaProducer(KafkaTemplate<String, BankModel> kafkaTemplate){

public void send(BankModel model){
        kafkaTemplate.send(topic, model.getAccountNumber(), model);


Kafka Consumer

The Consumer API allows applications to read streams of data from topics in the Kafka cluster.

public class KafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private ObjectMapper mapper=new ObjectMapper();

    @KafkaListener(topics = "#{'${spring.kafka.topic.name}'.split(',')}")
    public void listen(List<BankModel> recordBatch,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("Payload size= "+recordBatch.size());
        recordBatch.forEach(record -> {
            try {
                log.info("Bank model Json: " + mapper.writeValueAsString(recordBatch));
            } catch (JsonProcessingException e) {
                log.error("Exception occurred during messages processing ", e);

Test Class

Test Kafka consumer that takes care of reading messages from Kafka. But before that, we need to do some configuration for EmbeddedKafka.

@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class EmbeddedKafkaIntegrationTest {
    private BankModel event = new BankModel(UUID.randomUUID().toString(), UUID.randomUUID().toString(), "7703", 111, "12/05/2021", "Abid", "Khan", 10000d);

    private KafkaConsumerService consumer;

    private KafkaProducer producer;

    ArgumentCaptor<List<BankModel>> bankModelArgumentCaptor;

    private String TOPIC_NAME;

    ArgumentCaptor<String> topicArgumentCaptor;

    public void embeddedKafka_whenSendingToSimpleProducer_thenMessageReceived() {

        verify(consumer, timeout(1000).times(1)).listen(BankModelArgumentCaptor.capture(),
        List<BankModel> batchPayload = BankModelArgumentCaptor.getValue();
        assertThat(batchPayload.size(), equalTo(01));

    private void testEvents(List<BankAccount> eventsPayload) {
        eventsPayload.forEach(record -> {
            assertEquals(event.getAccountNumber(), record.getAccountNumber());
            assertEquals(event.getTransactionId(), record.getTransactionId());
            assertEquals(event.getIdentificationNumber(), record.getIdentificationNumber());
            assertEquals(event.getSecurityCode(), record.getSecurityCode());
            assertEquals(event.getDateOfBirth(), record.getDateOfBirth());
            assertEquals(event.getFirstName(), record.getFirstName());
            assertEquals(event.getLastName(), record.getLastName());
            assertEquals(event.getBalance(), record.getBalance());

The test class has three annotations,

  • @EmbeddedKafka – annotation that can be specified on a test class that runs Spring for Apache Kafka based tests.
  • @SpringBootTest – annotation tells Spring Boot to look for a main configuration class (one with @SpringBootApplication, for instance) and use that to start a Spring application context. using Spring Test and Spring Boot features to test the interactions between Spring and your code.
  • @DirtiesContext – test annotation which indicates that the ApplicationContext associated with a test is dirty and should therefore be closed and removed from the context cache.


It’s very easy to test a Kafka integration. The @EmbeddedKafka is providing a handy annotation to get started. With the JUnit 5 approach you can do similar tests.

Written by 

Abid Khan is a Lead Consultant at Knoldus Inc., postgraduate (MCA), and having 5+ years of experience in JavaSE, JavaEE, ORM framework, Spring, Spring-boot, RESTful Web Services, Kafka, MQTT, Rabbitmq, Docker, Redis, MySQL, Maven, GIT, etc. He is a well-developed professional with a prolific track record of designing, testing, and monitoring software as well as upgrading the existing programs.