“It’s Not Whether You Get Knocked Down, It’s Whether You Get Up.”
– Inspirational Quote By Vince Lombardi
Kafka Producer API allows applications to send streams of data to topics in the Kafka cluster.
Looking for a way to implement Custom Kafka Producer in your project. This blog post gives you an end to end solution to implement this functionality using KAFKA API.
Introduction
There are two ways to implement Kafka Producer :-
- Implementing CUSTOM Kafka Producer using KAFKA API.
- Implementing LAGOM Kafka Producer. (Preferable)
QUICKSTART
- Import KafkaProducer and ProducerRecord packages from producer API.
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord } |
2. Setting up the required configuration for producing a record. You can add this method in below defined class or any configuration class.
override def setKafkaProperties(): Properties = { | |
val kafkaProps: Properties = new Properties() | |
kafkaProps.put("bootstrap.servers", "localhost:9092,localhost:9093") | |
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") | |
kafkaProps.put("value.serializer", ".SampleMessageSerializer") | |
kafkaProps.put("acks", "all") | |
kafkaProps.put("retries", "3") | |
kafkaProps.put("linger.ms", "5") | |
} |
3. Create a class that extends KafkaProducerApi.
class KafkaProducerImpl extends KafkaProducerApi { | |
private val producer = new KafkaProducer[String, Notification](setKafkaProperties) | |
} |
4. Send the Message to Kafka Topic. For Example: The Message is a case class that I would like to produce to the Kafka Topic. By Default, Kafka serializer uses String type as Key and value. If you want a custom case class to be produced you should implement serializer for that corresponding case class as explained below. Add this method in above defined class.
def sendMessage(recordToSend: Message): Future[Done] = { | |
try { | |
val recordToBeProduced = new ProducerRecord(SampleService.MessageTopic, Constants.Empty, message) | |
val _ = producer.send(recordToBeProduced) | |
logger.info("Message successfully sent!") | |
} catch { | |
case e: Throwable => logger.error("Failed sending message with error {}", e.getMessage) | |
} | |
producer.close() | |
Future.successful(Done) | |
} |
Writing your custom serializer
By default, The key.serializer
and value.serializer
instruct how to turn the key and value objects the user provides with their ProducerRecord
into bytes.
You can use the included ByteArraySerializer
or StringSerializer
for simple string or byte types. But for your custom types, you can create custom serializer with the help of the below example.
class SampleMessageSerializer extends Serializer[Message] { | |
private val gson: Gson = new Gson() | |
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = { | |
// nothing to do | |
} | |
override def serialize(topic: String, data: Message): Array[Byte] = { | |
gson.toJson(data).getBytes | |
} | |
override def close(): Unit = { | |
//nothing to do | |
} | |
} |
Testing ProducerRecord
- Start ZooKeeper.
sudo bin/zkServer.sh start
2. Start Kafka.
bin/kafka-server-start.sh config/server.properties
3. Create a topic where your service is producing.
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
4. Run the Consumer to test if your service is able to produce the desired case class in JSON Format.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Thanks for reading my blog.I will cover implementation of Producer using Lagom in upcoming blog posts.Stay tuned.
References
- https://www.programcreek.com/scala/org.apache.kafka.clients.producer.KafkaProducer
- https://github.com/smallnest/kafka-example-in-scala
- https://kafka.apache.org/documentation/#producerapi
