Kafka – Sending Object as a message


logo-kafka

Kafka lets us publish and subscribe to streams of records and the records can be of any type, it can be JSON, String, POJO, etc. Kafka gives user the ability to creates our own serializer and deserializer so that we can transmit different data type using it. In this blog I will demonstrate how to create a custom serializer and deserializer but first let’s understand what is serialization and why to serialize?

Serialization and Deserialization

Serialization is the process of converting an object into a stream of bytes and that bytes are used for transmission. Kafka stores and transmit these bytes of array in its queue.

Deserialization as the name suggest does the opposite of serialization where we convert bytes of array into the desired data type. Kafka provides serializer and deserializer for few data types String, Long, Double, Integer, Bytes etc.

Check all pre-build (de) serializers :

https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/package-summary.html

Now I hope you understand what is serialization and why we serialize any object, so let’s begin with its implementation.

Implementation

To create serializer class we need to implement org.apache.kafka.common.serialization.Serializer interface and similarly to create deserializer class we need to implement org.apache.kafka.common.serialization.Deserializer interface.

Both serializer and deserializer interfaces consist of three methods:

  • configure : This method called at startup with configuration.
  • serialize/deserialize : This method is used for serialization and deserialization.
  • close : This method called when Kafka session is to be closed.

Serializer Interface

public interface Serializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  byte[] serialize(String var1, T var2);

  void close();
}

Deserializer Interface

public interface Deserializer extends Closeable {
  void configure(Map<String, ?> var1, boolean var2);

  T deserialize(String var1, byte[] var2);

  void close();
}

Let’s start with an example:

Dependencies I’ve used:

  • Kafka – 0.10.1.1
  • FasterXML jackson – 2.8.6

User.java

public class User {

  private String name;
  private int age;

  public User() {
  }

  public User(String name, int age) {
    this.name = name;
    this.age = age;
  }

  public String getName() {
    return this.name;
  }

  public int getAge() {
    return this.age;
  }

  @Override public String toString() {
    return "User(" + name + ", " + age + ")";
  }
}

UserSerializer.java

public class UserSerializer implements Serializer {

  @Override public void configure(Map<String, ?> map, boolean b) {

  }

  @Override public byte[] serialize(String arg0, User arg1) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(arg1).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
  }

  @Override public void close() {

  }

}

UserDeserializer.java

public class UserDeserializer implements Deserializer {

  @Override public void close() {

  }

  @Override public void configure(Map<String, ?> arg0, boolean arg1) {

  }

  @Override
  public User deserialize(String arg0, byte[] arg1) {
    ObjectMapper mapper = new ObjectMapper();
    User user = null;
    try {
      user = mapper.readValue(arg1, User.class);
    } catch (Exception e) {

      e.printStackTrace();
    }
    return user;
  }

}

Now what’s left is to use these serializer and deserializer.

To use above serializer we need to register this property:

props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

Using this property, producer will be :

try (Producer<String, User> producer = new KafkaProducer<>(props)) {
   producer.send(new ProducerRecord<String, User>("MyTopic", user));
   System.out.println("Message " + user.toString() + " sent !!");
} catch (Exception e) {
   e.printStackTrace();
}

Similarly for deserializer we need to register this property:

props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

And consumer will be :

try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList(topic));
    while (true) {
        ConsumerRecords<String, User> messages = consumer.poll(100);
        for (ConsumerRecord<String, User> message : messages) {
          System.out.println("Message received " + message.value().toString());
        }
    }
} catch (Exception e) {
    e.printStackTrace();
}

References:
https://kafka.apache.org/


KNOLDUS-advt-sticker

Advertisements

About Prabhat Kashyap

Consultant(Engineer) at Knoldus Software LLP
This entry was posted in Apache Kafka, Java. Bookmark the permalink.

One Response to Kafka – Sending Object as a message

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s