Zio Kafka

woman holding tablet computer
Reading Time: 4 minutes

ZIO Kafka is a library in Scala, a distributed streaming platform for building real-time data pipelines and streaming applications. It provides a very high-level, composable API built on top of the Kafka Consumer and Producer APIs, allowing developers to write type-safe, functional code for working with Kafka. It is part of the ZIO ecosystem, which is a library for building concurrent, fault-tolerant, and high-performance systems in Scala.

The library provides a high-level, composable API that allows developers to write type-safe, functional code for working with Kafka in a more declarative and expressive way.

One of the main features of ZIO Kafka is its support for composable stream processing, which allows developers to build complex data pipelines by composing smaller, reusable stream processing stages.

Additionally, ZIO Kafka provides support for advanced features like topic partitioning, offset management, and consumer groups, as well as support for working with Avro data and Schema Registry.

ZIO Kafka also provides a set of built-in connectors for common use cases, such as reading data from Kafka, writing data to Kafka, and joining streams.

Producer Consumer:

To write producers and consumers, using the ZIO Kafka library, we have two choices:

  1. Using ZIO Workflows
  2. Using ZIO Streams Workflows

Lets try the first option:

In order to write producer and consumer,we need to create some platform where we can store our data, where Serde is created.

Serializer-Deserializer:

Serde (short for “Serializer-Deserializer”) is a concept that is used in Kafka to handle the serialization and deserialization of messages. In ZIO Kafka, the Serde trait is used to define a serializer and deserializer for keys and values in a Kafka topic.

ZIO Kafka provides a default implementation of Serde, called org.zio.kafka.serde.Serde, which can be used to serialize and deserialize simple types such as String, Int, and Long. However, in case you need to handle complex types, you can define your own Serde implementation. Let’s understand this though an example.

Below are the imports which needs to be added to run the serde –

import kafka.zio.shared.model.FinalInfo
import zio.ZIO
import zio.json.{DecoderOps, EncoderOps}
import zio.kafka.serde.Serde
object FinalInfoSerde {
  val key: Serde[Any, Int] = Serde.int

  val value: Serde[Any, FinalInfo] = Serde.string.inmapM[Any, FinalInfo](str =>
    ZIO.fromEither(str.fromJson[FinalInfo])
      .mapError(new RuntimeException(_))
  )(b => ZIO.succeed(b.toJson))
}

Also, we need to define and add a JSON decoder and encoder for it.

implicit val encoder: JsonEncoder[FinalInfo] =
DeriveJsonEncoder.gen[FinalInfo]

implicit val decoder: JsonDecoder[FinalInfo] =
DeriveJsonDecoder.gen[FinalInfo]

Creating a Producer:

ZIO Kafka has several producers that can be used to produce data on Kafka topics. In this example, we will be using the TicketInfoConsumerProducer.produce method

Below are the imports which needs to be added to run the Producer –

import kafka.zio.shared.model.FinalInfo
import kafka.zio.shared.util.{BookingSerde, FinalInfoSerde}
import kafka.zio.shared.KafkaTopics
import org.apache.kafka.clients.producer.RecordMetadata
import zio.{RIO, ZIOAppDefault}
import zio.kafka.consumer.Subscription
import zio.kafka.producer.Producer
object TicketInfoConsumerProducer extends ZIOAppDefault{
    private def produce(
       topic: String,
       key: Int,
       value: FinalInfo
     ): RIO[Any with Producer, RecordMetadata] =
      Producer.produce[Any, Int, FinalInfo](
        topic = topic,
        key = key,
        value = value,
        keySerializer = FinalInfoSerde.key,
        valueSerializer = FinalInfoSerde.value
      )
}

And after creating a helper function that takes a topic name, key, and value and then returns a ZIO workflow that if we run it, will produce a record to the specified topic.

Producer Layer:

The producer layer is the layer of the Kafka architecture that is responsible for sending messages to the Kafka cluster. Let’s understand this with an example. Here we created a producer layer which is referenced to the same above producer.

  private val producer: ZLayer[Any, Throwable, Producer] =
    ZLayer.scoped(
      Producer.make(
        ProducerSettings(KafkaMeta.BOOSTRAP_SERVERS)
      )
    )

Once you have a producer, you can use the produce method to send messages to a topic. The produce method takes a ProducerRecord as an argument, which contains the topic name, key, and value of the message.

Creating a Consumer:

ZIO Kafka also has several consumers that can be used to consume data from Kafka topics. It includes the support for ZIO Streams.

Below are the imports which needs to be added to run the Consumer –

import kafka.zio.shared.model.FinalInfo
import kafka.zio.shared.util.FinalInfoSerde
import kafka.zio.shared.KafkaTopics
import org.apache.kafka.clients.producer.RecordMetadata
import zio.{RIO, ZIOAppDefault}
import zio.kafka.consumer.{Consumer, Subscription}
import zio.kafka.producer.Producer
def run = {
    val finalInfoStream =
      Consumer
        .subscribeAnd(Subscription.topics(KafkaTopics.KAFKA_TOPIC_BOOKING_REQUEST))
        .plainStream(BookingSerde.key, BookingSerde.value)
        .tap{comRec =>
          val fi = FinalInfo(comRec.value, "success")
          produce(KafkaTopics.KAFKA_TOPIC_POST_CONFIRMED_TICKET,1,fi)
        }
        .map(_.offset)
        .aggregateAsync(Consumer.offsetBatches)
        .mapZIO(_.commit)
        .drain
finalInfoStream.runDrain.provide(KafkaProdConsLayer.consumerLayer)

This is the combined code which have some producer logic as well. We are taking data from previous topic and lastly, after some computations, we are consuming the data to different topic.

Consumer Layer:

The consumer layer in ZIO Kafka is responsible for consuming messages from a Kafka topic. Let’s understand this with an example. Here we created a consumer layer.

  val consumerLayer: ZLayer[Any, Throwable, Consumer] =
    ZLayer.scoped(
      Consumer.make(
        ConsumerSettings(KafkaMeta.BOOSTRAP_SERVERS)
          .withGroupId(KafkaMeta.CONSUMER_GROUP)
      )
    )

Conclusion:

Overall, ZIO Kafka is a powerful library for working with Apache Kafka in Scala. It provides a functional and type-safe API that makes it easy to build robust and high-performance streaming applications.

If you want to add anything or you do not relate to my view on any point, drop me a comment. I will be happy to discuss it certainly. For more blogs, click here

Written by 

Rituraj Khare is a Software Consultant at Knoldus Software LLP. An avid Scala programmer and Big Data engineer, he has experience with the tech stack such as - Scala| Spark| Kafka| Python| Unit testing| Git| Jenkins| Grafana.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading