Take a deep dive into Kafka – Producer API

Reading Time: 4 minutes

I am going to start a series of blogs on Kafka API. This blog is a part of the series. In the series of blogs

In this blog, we are going to learn about Producer-API. If you are new to Kafka then I will recommend you to first get some basic idea about Kafka Quickstart from kafka-quickstart .

There are many reasons an application might need to write messages to Kafka: recording metrics, storing log messages, buffering information before writing to a database, recording data comes from sensors and much more.

Producer Flow


We start producing messages to Kafka by creating a ProducerRecord. which must include the topic we want to send the record to and value. we can also specify a key and/or a partition. Once we send the ProducerRecord, the first thing the producer will do is serialize the key and value objects to ByteArrays so they can be sent over the network. Now, the data is sent to a partitioner. If we specified a partition in the ProducerRecord, the partitioner doesn’t do anything and simply returns the partition we specified. In addition, If we didn’t, the partitioner will choose a partition for us, usually based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go to. Then it will add the record to a batch of records that will also be sent to the same topic and partition.

A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers. When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.

Constructing a Kafka Producer

The first step in writing messages to Kafka is to create a producer object with the properties you want to pass to the producer. A Kafka producer has three mandatory properties:

List of host:port pairs of brokers that the producer will use to establish the initial connection to the Kafka cluster. This list doesn’t need to include all brokers since the producer will get more information after the initial connection.

Name of a class that will be used to serialize the keys of the records. we will produce to Kafka. Kafka brokers expect byte arrays as keys and values of messages. key.serializer should be set to the name of a class that implements the org.apache.kafka.common.serialization.Serializer interface. The producer will use this class to serialize the key object to a byte array.

Name of a class that will be used to serialize the values of the records we will produce to Kafka. The same way you set key.serializer to a name of a class that will serialize the message key object to a byte array, you set value.serializer to a class that will serialize the message value object.

private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
producer = new KafkaProducer<String, String>(kafkaProps);
  • We start with a Properties object.
  • Since we plan on using strings for message key and value, we use the built-in StringSerializer.
  • Here we create a new producer by setting the appropriate key and value types and passing the Properties object.

Start Sending Messages

We can send data by three types:

We send a message to the server and don’t really care if it arrives successfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.

ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
try {
} catch (Exception e) {

Synchronous send
We send a message, the send() method returns a Future object, and we use get() to wait on the future and see if the send() was successful or not.


ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
} catch (Exception e) {

Here, we are using Future.get() to wait for a reply from Kafka.

Asynchronous send
Whr We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker.

private class DemoProducerCallback implements Callback {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());

To use callbacks, you need a class that implements the org.apache.kafka. clients.producer.Callback interface, which has a single function—onCompletion().

Thank you for sticking to the end. If you like this blog, please do show your appreciation by giving thumbs ups. Share this blog and give me a suggestion on how I can improve my future posts to suit your needs. Follow me to get updates on different technologies. For any queries, feel free to contact me at jashan.goyal@knoldus.com .


Written by 

Jashan Goyal is the Trainee Software Consultant at Knoldus Software LLP. He has done B.Tech. from Lovely Professional University, Jalandhar. He has good knowledge of C, C++, Java, Scala and Node.js. He is also interested in Internet of Things. As a fresher, he always tries to explore the different type of software and tools.