Build your own Kafka Producer

Reading Time: 2 minutes
“It’s Not Whether You Get Knocked Down, It’s Whether You Get Up.” 
– Inspirational Quote By Vince Lombardi

Kafka Producer API allows applications to send streams of data to topics in the Kafka cluster.

Looking for a way to implement Custom Kafka Producer in your project. This blog post gives you an end to end solution to implement this functionality using KAFKA API.

Introduction

There are two ways to implement Kafka Producer :-

  1. Implementing CUSTOM Kafka Producer using KAFKA API.
  2. Implementing LAGOM Kafka Producer. (Preferable)

QUICKSTART

  1. Import KafkaProducer and ProducerRecord packages from producer API.

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord }
view raw gistfile1.txt hosted with ❤ by GitHub

2. Setting up the required configuration for producing a record. You can add this method in below defined class or any configuration class.

override def setKafkaProperties(): Properties = {
val kafkaProps: Properties = new Properties()
kafkaProps.put("bootstrap.servers", "localhost:9092,localhost:9093")
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("value.serializer", ".SampleMessageSerializer")
kafkaProps.put("acks", "all")
kafkaProps.put("retries", "3")
kafkaProps.put("linger.ms", "5")
}
view raw gistfile1.txt hosted with ❤ by GitHub

3. Create a class that extends KafkaProducerApi.

class KafkaProducerImpl extends KafkaProducerApi {
private val producer = new KafkaProducer[String, Notification](setKafkaProperties)
}
view raw gistfile1.txt hosted with ❤ by GitHub

4. Send the Message to Kafka Topic. For Example: The Message is a case class that I would like to produce to the Kafka Topic. By Default, Kafka serializer uses String type as Key and value. If you want a custom case class to be produced you should implement serializer for that corresponding case class as explained below. Add this method in above defined class.

def sendMessage(recordToSend: Message): Future[Done] = {
try {
val recordToBeProduced = new ProducerRecord(SampleService.MessageTopic, Constants.Empty, message)
val _ = producer.send(recordToBeProduced)
logger.info("Message successfully sent!")
} catch {
case e: Throwable => logger.error("Failed sending message with error {}", e.getMessage)
}
producer.close()
Future.successful(Done)
}
view raw gistfile1.txt hosted with ❤ by GitHub

Writing your custom serializer

By default, The key.serializer and value.serializer instruct how to turn the key and value objects the user provides with their ProducerRecord into bytes.

You can use the included ByteArraySerializer or StringSerializer for simple string or byte types. But for your custom types, you can create custom serializer with the help of the below example.

class SampleMessageSerializer extends Serializer[Message] {
private val gson: Gson = new Gson()
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {
// nothing to do
}
override def serialize(topic: String, data: Message): Array[Byte] = {
gson.toJson(data).getBytes
}
override def close(): Unit = {
//nothing to do
}
}
view raw gistfile1.txt hosted with ❤ by GitHub

Testing ProducerRecord

  1. Start ZooKeeper.
sudo bin/zkServer.sh start

2. Start Kafka.

bin/kafka-server-start.sh config/server.properties

3. Create a topic where your service is producing.

 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

4. Run the Consumer to test if your service is able to produce the desired case class in JSON Format.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Thanks for reading my blog.I will cover implementation of Producer using Lagom in upcoming blog posts.Stay tuned.

References

Knoldus-Scala-Spark-Services