Kafka lets us publish and subscribe to streams of records and the records can be of any type, it can be JSON, String, POJO, etc. Kafka gives user the ability to creates our own serializer and deserializer so that we can transmit different data type using it. In this blog I will demonstrate how to create a custom serializer and deserializer but first let’s understand what is serialization and why to serialize?
Serialization and Deserialization
Serialization is the process of converting an object into a stream of bytes and that bytes are used for transmission. Kafka stores and transmit these bytes of array in its queue.
Deserialization as the name suggest does the opposite of serialization where we convert bytes of array into the desired data type. Kafka provides serializer and deserializer for few data types String, Long, Double, Integer, Bytes etc.
Check all pre-build (de) serializers :
https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/package-summary.html
Now I hope you understand what is serialization and why we serialize any object, so let’s begin with its implementation.
Implementation
To create serializer class we need to implement org.apache.kafka.common.serialization.Serializer interface and similarly to create deserializer class we need to implement org.apache.kafka.common.serialization.Deserializer interface.
Both serializer and deserializer interfaces consist of three methods:
- configure : This method called at startup with configuration.
- serialize/deserialize : This method is used for serialization and deserialization.
- close : This method called when Kafka session is to be closed.
Serializer Interface
public interface Serializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); byte[] serialize(String var1, T var2); void close(); }
Deserializer Interface
public interface Deserializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); T deserialize(String var1, byte[] var2); void close(); }
Let’s start with an example:
Dependencies I’ve used:
- Kafka – 0.10.1.1
- FasterXML jackson – 2.8.6
User.java
public class User { private String name; private int age; public User() { } public User(String name, int age) { this.name = name; this.age = age; } public String getName() { return this.name; } public int getAge() { return this.age; } @Override public String toString() { return "User(" + name + ", " + age + ")"; } }
UserSerializer.java
public class UserSerializer implements Serializer { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String arg0, User arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } @Override public void close() { } }
UserDeserializer.java
public class UserDeserializer implements Deserializer { @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public User deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); User user = null; try { user = mapper.readValue(arg1, User.class); } catch (Exception e) { e.printStackTrace(); } return user; } }
Now what’s left is to use these serializer and deserializer.
To use above serializer we need to register this property:
props.put("value.serializer", "com.knoldus.serializers.UserSerializer");
Using this property, producer will be :
try (Producer<String, User> producer = new KafkaProducer<>(props)) { producer.send(new ProducerRecord<String, User>("MyTopic", user)); System.out.println("Message " + user.toString() + " sent !!"); } catch (Exception e) { e.printStackTrace(); }
Similarly for deserializer we need to register this property:
props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");
And consumer will be :
try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, User> messages = consumer.poll(100); for (ConsumerRecord<String, User> message : messages) { System.out.println("Message received " + message.value().toString()); } } } catch (Exception e) { e.printStackTrace(); }
References:
https://kafka.apache.org/
Reblogged this on himaniarora1.
If the only way to serialize / deserialize objects is to have a custom serializer/deserializer for each class, then this is not a solution for large scale applications. I think that if every class carries its own name as a data member within itself, then a generic serializer/deseriaizer should be possible – first reading the name of the class from the byte stream and using it to cast the result to the correct type.
Or serialize objects into JSON with Jackson.