Kafka – Sending Object as a message

logo-kafka

Kafka lets us publish and subscribe to streams of records and the records can be of any type, it can be JSON, String, POJO, etc. Kafka gives user the ability to creates our own serializer and deserializer so that we can transmit different data type using it. In this blog I will demonstrate how to create a custom serializer and deserializer but first let’s understand what is serialization and why to serialize?

Serialization and Deserialization

Serialization is the process of converting an object into a stream of bytes and that bytes are used for transmission. Kafka stores and transmit these bytes of array in its queue.

Deserialization as the name suggest does the opposite of serialization where we convert bytes of array into the desired data type. Kafka provides serializer and deserializer for few data types String, Long, Double, Integer, Bytes etc.

Check all pre-build (de) serializers :

https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/package-summary.html

Now I hope you understand what is serialization and why we serialize any object, so let’s begin with its implementation.

Implementation

To create serializer class we need to implement org.apache.kafka.common.serialization.Serializer interface and similarly to create deserializer class we need to implement org.apache.kafka.common.serialization.Deserializer interface.

Both serializer and deserializer interfaces consist of three methods:

  • configure : This method called at startup with configuration.
  • serialize/deserialize : This method is used for serialization and deserialization.
  • close : This method called when Kafka session is to be closed.

Serializer Interface

public interface Serializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  byte[] serialize(String var1, T var2);

  void close();
}

Deserializer Interface

public interface Deserializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  T deserialize(String var1, byte[] var2);

  void close();
}

Let’s start with an example:

Dependencies I’ve used:

  • Kafka – 0.10.1.1
  • FasterXML jackson – 2.8.6

User.java

public class User {

  private String name;
  private int age;

  public User() {
  }

  public User(String name, int age) {
    this.name = name;
    this.age = age;
  }

  public String getName() {
    return this.name;
  }

  public int getAge() {
    return this.age;
  }

  @Override public String toString() {
    return "User(" + name + ", " + age + ")";
  }
}

UserSerializer.java

public class UserSerializer implements Serializer {

  @Override public void configure(Map<String, ?> map, boolean b) {

  }

  @Override public byte[] serialize(String arg0, User arg1) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(arg1).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
  }

  @Override public void close() {

  }

}

UserDeserializer.java

public class UserDeserializer implements Deserializer {

  @Override public void close() {

  }

  @Override public void configure(Map<String, ?> arg0, boolean arg1) {

  }

  @Override
  public User deserialize(String arg0, byte[] arg1) {
    ObjectMapper mapper = new ObjectMapper();
    User user = null;
    try {
      user = mapper.readValue(arg1, User.class);
    } catch (Exception e) {

      e.printStackTrace();
    }
    return user;
  }

}

Now what’s left is to use these serializer and deserializer.

To use above serializer we need to register this property:

props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

Using this property, producer will be :

try (Producer<String, User> producer = new KafkaProducer<>(props)) {
   producer.send(new ProducerRecord<String, User>("MyTopic", user));
   System.out.println("Message " + user.toString() + " sent !!");
} catch (Exception e) {
   e.printStackTrace();
}

Similarly for deserializer we need to register this property:

props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

And consumer will be :

try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList(topic));
    while (true) {
        ConsumerRecords<String, User> messages = consumer.poll(100);
        for (ConsumerRecord<String, User> message : messages) {
          System.out.println("Message received " + message.value().toString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

References:
https://kafka.apache.org/


KNOLDUS-advt-sticker

Written by 

Prabhat is a Sr. Software Consultant with more than 2 years of experience in C, C++, Java, MySQL, and Scala. His interests are in Cyber Security, Web Development, and New technologies. Prabhat developed software and website on different platforms which include VB6, VB.NET, ASP.NET, PHP, Wordpress, OpenCart, SMF, VBulletin, HTML5, MyBB. Prabhat is focused and result oriented, self-motivated and team-oriented and effective team player.

2 thoughts on “Kafka – Sending Object as a message

  1. If the only way to serialize / deserialize objects is to have a custom serializer/deserializer for each class, then this is not a solution for large scale applications. I think that if every class carries its own name as a data member within itself, then a generic serializer/deseriaizer should be possible – first reading the name of the class from the byte stream and using it to cast the result to the correct type.

Leave a Reply

%d bloggers like this: