Serialization in Kafka

Reading Time: 2 minutes

Serialization is the process of converting an object into a stream of bytes that are used for transmission. Kafka stores and transmits these bytes of arrays in its queue. Deserialization, as the name suggests, does the opposite of serialization, in which we convert bytes of arrays into the desired data type. Apache Kafka stores as well as transmit these bytes of arrays in its queue.

Kafka offers serializers and deserializers for only a few data types, such as

  • String
  • Long
  • Double
  • Integer
  • Bytes

Custom Serializer

Need?

Basically, in order to prepare the message for transmission from the producer to the broker, we use serializers. In other words, before transmitting the entire message to the broker, let the producer know how to convert the message into a byte array we use serializers. Similarly, to convert the byte array back to the object we use the deserializers by the consumer.

Implementation?

To create a custom serializer class, we need to implement org.apache.kafka.common.serialization.Serializer interface.

Similarly, to create a custom deserializer class, we need to implement org.apache.kafka.common.serialization.Deserializer interface.

These interfaces have three abstract methods, which we need to provide an implementation for. They are:

  1. Configure: This method is called at startup with configuration.
  2. Serialize/deserialize: This method is used for serialization and deserialization.
  3. Close: This method is called when the Kafka session is to be closed.

Let’s consider an example to implement our own custom SerDe.

Consider a User case class:

case class User(name: String, age: Int, gender: String, nationality: String)

This is how a serializer class will look like:

class UserSerializer extends Serializer[User] {  
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = { }
override def serialize(topic: String, data: User): Array[Byte] = { 
   try {
      val byteOut = new ByteArrayOutputStream()
      val objOut = new ObjectOutputStream(byteOut)       
      objOut.writeObject(data)
      objOut.close()
      byteOut.close()
      byteOut.toByteArray 
   }    catch {  
    case ex:Exception => throw new Exception(ex.getMessage)
    }
  } 
 override def close():Unit = { }
  }

And a deserializer class will look like:

class UserDeserializer extends Deserializer[User] { 

override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = {} 
override def deserialize(topic: String, data: Array[Byte]): User = { 
 val byteIn = new ByteArrayInputStream(data)
 val objIn = new ObjectInputStream(byteIn) 
 val obj = objIn.readObject().asInstanceOf[User] 
 byteIn.close()
 objIn.close()
 obj 
} 
override def close(): Unit = { }
}

We are done with the implementation of Serializer and Deserializer, now time to tell Kafka that we would be using them as a custom SerDe. This is done by registering these properties in the configuration:

In Producer application:

properties.put("value.serializer", "kafka.serde.UserSerializer")

In Consumer application:

properties.put("value.deserializer", "kafka.serde.UserDeserializer")

A complete implementation can be found here.

Written by 

Tech Enthusiast

1 thought on “Serialization in Kafka2 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading