This blog I’m talking about the Kafka testing without physical installation of Kafka services or docker container.
For testing, I’m going to use another Spring library that is called spring-kafka-test. It provides much functionality to ease our job in the testing process and takes care of Kafka consumer or a producer works as expected.
Maven Test Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
application.yml props file
These are the minimum configuration for Kafka producer and consumer API.
spring:
profiles:
active: test
kafka:
topic:
name: test-topic
consumer:
auto-offset-reset: earliest
group-id: test-group
listener:
ack-mode: manual
client-id: test-client-id
concurrency: 10
type: batch
admin:
client-id: admin-test-client-id
Kafka Producer
The Producer API allows applications to send streams of data to topics in the Kafka cluster.
@Service
public class KafkaProducer {
private static final Logger log = LoggerFactory.getLogger(KafkaProducer.class);
private final KafkaTemplate<String, BankModel> kafkaTemplate;
@Value("${spring.kafka.topic.name}")
private String topic;
public KafkaProducer(KafkaTemplate<String, BankModel> kafkaTemplate){
this.kafkaTemplate=kafkaTemplate;
}
public void send(BankModel model){
kafkaTemplate.send(topic, model.getAccountNumber(), model);
}
}
Kafka Consumer
The Consumer API allows applications to read streams of data from topics in the Kafka cluster.
@Service
public class KafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
private ObjectMapper mapper=new ObjectMapper();
@KafkaListener(topics = "#{'${spring.kafka.topic.name}'.split(',')}")
public void listen(List<BankModel> recordBatch,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info("------------------------------------------");
log.info("Payload size= "+recordBatch.size());
recordBatch.forEach(record -> {
try {
log.info("Bank model Json: " + mapper.writeValueAsString(recordBatch));
} catch (JsonProcessingException e) {
log.error("Exception occurred during messages processing ", e);
}
});
log.info("------------------------------------------");
}
}
Test Class
Test Kafka consumer that takes care of reading messages from Kafka. But before that, we need to do some configuration for EmbeddedKafka
.
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class EmbeddedKafkaIntegrationTest {
private BankModel event = new BankModel(UUID.randomUUID().toString(), UUID.randomUUID().toString(), "7703", 111, "12/05/2021", "Abid", "Khan", 10000d);
@SpyBean
private KafkaConsumerService consumer;
@Autowired
private KafkaProducer producer;
@Captor
ArgumentCaptor<List<BankModel>> bankModelArgumentCaptor;
@Value("${spring.kafka.topic.name}")
private String TOPIC_NAME;
@Captor
ArgumentCaptor<String> topicArgumentCaptor;
@Test
public void embeddedKafka_whenSendingToSimpleProducer_thenMessageReceived() {
//Producer
producer.send(event);
//consumer
verify(consumer, timeout(1000).times(1)).listen(BankModelArgumentCaptor.capture(),
topicArgumentCaptor.capture());
List<BankModel> batchPayload = BankModelArgumentCaptor.getValue();
assertNotNull(batchPayload);
assertThat(batchPayload.size(), equalTo(01));
assertTrue(TOPIC_NAME.contains(topicArgumentCaptor.getValue()));
testEvents(batchPayload);
}
private void testEvents(List<BankAccount> eventsPayload) {
eventsPayload.forEach(record -> {
assertNotNull(record);
assertEquals(event.getAccountNumber(), record.getAccountNumber());
assertEquals(event.getTransactionId(), record.getTransactionId());
assertEquals(event.getIdentificationNumber(), record.getIdentificationNumber());
assertEquals(event.getSecurityCode(), record.getSecurityCode());
assertEquals(event.getDateOfBirth(), record.getDateOfBirth());
assertEquals(event.getFirstName(), record.getFirstName());
assertEquals(event.getLastName(), record.getLastName());
assertEquals(event.getBalance(), record.getBalance());
});
}
The test class has three annotations,
@EmbeddedKafka
– annotation that can be specified on a test class that runs Spring for Apache Kafka based tests.@SpringBootTest
– annotation tells Spring Boot to look for a main configuration class (one with@SpringBootApplication
, for instance) and use that to start a Spring application context. using Spring Test and Spring Boot features to test the interactions between Spring and your code.@
DirtiesContext – test annotation which indicates that the ApplicationContext associated with a test is dirty and should therefore be closed and removed from the context cache.
Conclusion
It’s very easy to test a Kafka integration. The @EmbeddedKafka
is providing a handy annotation to get started. With the JUnit 5 approach you can do similar tests.