Unit Testing Of Kafka

Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another.

Generally, data is published to topic via Producer API and  Consumers API consume data from subscribed topics.

In this blog, we will see how to do unit testing of kafka.

Unit testing your Kafka code is incredibly important. It’s transporting your most important data. As of now we have to explicitly  run zookeeper and kafka server to test the Producer and Consumer.

Now there is also an alternate to test kafka without running zookeeper and kafka broker.

Thinking how ?   EmbeddedKafka is there for you.

Embedded Kafka is a library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.10.2.1 and ZooKeeper 3.4.8.

It will start zookeeper and kafka broker before the test and stop it after the test.

Though we also have facility to start and stop the zookeeper and kafka server in programmatic way.

How to use ?

Before Testing, follow these instructions :

  1. Add the following dependency in your build.sbt

“net.manub” %% “scalatest-embedded-kafka” % “0.14.0” % “test”

2) Have your TestSpec extend the EmbeddedKafka trait.

Using withRunningKafka closure, it will give running instance of kafka. It will automatically start zookeeper and kafka broker respectively on port 6000 and 6001 and automatically shutdown at the end of the test.

class KafkaSpec extends FlatSpec with EmbeddedKafka {

“runs with embedded kafka” should {

withRunningKafka {
// test cases goes here
}

}

}

A EmbeddedKafka companion object is provided for usage without the EmbeddedKafka trait. Zookeeper and Kafka can be started an stopped in a programmatic way.

class KafkaSpec extends FlatSpec with EmbeddedKafka with BeforeAndAfterAll {

override def beforeAll():Unit = {

EmbeddedKafka.start()

}

// test cases goes here

override def afterAll():Unit = {

EmbeddedKafka.stop()

}

}

EmbeddedKafka also supports custom configurations. Like, It’s possible to change the ports on which Zookeeper and Kafka will be started by providing an implicit EmbeddedKafkaConfig. And we can also provide any implicit serializer according to our requirement.

implicit val config = EmbeddedKafkaConfig(kafkaPort = 9092, zookeeperPort = 2182)

implicit val serilizer = new StringSerializer

The same implicit EmbeddedKafkaConfig can be used to define custom producer/consumer properties.

The EmbeddedKafka trait also provides some utility methods to interact with embedded kafka, in order to test our kafka producer and consumer.

def publishToKafka(topic: String, message: String): Unit

def consumeFirstMessageFrom(topic: String): String

It also provides many more methods which can be used according to need.

For complete example click here.

Good things is that we can also test our kafka stream in similar way. For that, we have to add following dependency in build.sbt. And extend your spec with EmbeddedKafkaStreamsAllInOn.

“net.manub” %% “scalatest-embedded-kafka-streams” % “0.14.0” % “test”

For more information on testing of kafka stream, you can use links in references.

So, Embedded Kafka has made easier the unit testing of kafka. Besides that, embedded kafka is also very easy to use.

Hope, this blog will help you 🙂

References:

  1. https://github.com/Mayvenn/embedded-kafka
  2. https://github.com/tuplejump/embedded-kafka
  3. https://github.com/manub/scalatest-embedded-kafka

KNOLDUS-advt-sticker

Written by 

I am a Software Consultant with experience of more than 1.5 years. I am a Functional Programing i.e Scala and Big Data technology enthusiast.I am a active blogger, love to travel, explore and a foodie.

4 thoughts on “Unit Testing Of Kafka

Leave a Reply

%d bloggers like this: