Introduction to Zio-Kafka

zio kafka
Reading Time: 3 minutes

In this blog we discuss how to read messages from kafka and process them using the zio-kafka library.

Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another.

ZIO is a zero-dependency library for asynchronous and concurrent programming in Scala. It is a functional effect system in Scala.

ZIO Kafka library provides a purely functional, streams-based interface to the Kafka client. It integrates effortlessly with ZIO and ZIO Streams.

This blog required basic knowledge of zio and kafka.

Library Dependencies

Add the zio kafka dependency in our project build.sbt file.

libraryDependencies ++= Seq(
  "dev.zio" %% "zio-kafka" % "0.15.0"
)

docker-compose.yml Configuration

To start an Apache Kafka server, we’d first need to start Zookeeper server.

We can configure this dependency in a docker-compose.yml file, which will ensure that the Zookeeper server always starts before the Kafka server and stops after it.

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

In this setup, our Zookeeper server is listening on port=2181.

The kafka service is exposed to the host applications through port 29092, but it is actually run on port 9092 within the container environment configured by the KAFKA_ADVERTISED_LISTENERS property.

Let’s start the Kafka server by using the docker-compose command:

$ docker-compose up -d

Consumer Application

First, we’ll create the Kafka Consumer as a ZLayer, and provide it to our consumer stream in our main application.

Lets understand the ZioKafkaConsumer app, first we create the needed configuration in managedConsumer then we complete the Kafka consumer. As each consumer owns an internal connection pool to connect to the broker, we don’t want to leak such a pool in case of failure.

val managedConsumer = Consumer.make(ConsumerSettings(List("localhost:9092"))
    .withGroupId("zio-group"))
  val consumer = ZLayer.fromManaged(managedConsumer)

We use  ZManaged[R, E, A],it is a data structure that encapsulates the acquisition and the release of a resource of type A using R, and that may fail with an error of type E.

After that we create ZLayer from the managedConsumer as consumer.

Consuming Messages as a Stream

Now we consume the messages from kafka topic using the ZStream[R, E, A], which represents an effectful stream requiring an environment R to execute, eventually failing with an error of type E, and producing values of type A.

For creation of stream we need to subscribe consumer to Kafka topic and configuring key and value bytes interpretation.

After subscribe to kafka topic we use the plainStream method for interpreting the messages.

The plainStream method takes two Serde as parameters, the first for the key and the second for the value of a message.

val streams = Consumer.subscribeAnd(Subscription.topics("kafkaTopic"))
    .plainStream(Serde.string,Serde.string)
    .map(cr => (cr.value,cr.offset))
    .tap {
      case (value, _ ) => zio.console.putStrLn(s"| $value |")
    }
    .map(_._2) //stream of offsets
    .aggregateAsync(Consumer.offsetBatches)

Apache Kafka introduced the concept of serde, which stands for _ser_ializer and _de_serializer. We give the suitable Serde types both for messages’ keys and values during the materialization of the read messages into a stream.

Then we map these CommitableRecord[K,V] pair into a tuple of record value and offset.

Next we use the tap which is used to add an effect to consumption of every element of the stream.

And use the zio.console to print the value to console in a functional way.

After that we apply the Consumer.offsetBatches transducer which merges the input offset by using the aggregateAsync method

It aggregates elements of this stream using the provided sink for as long as the downstream operators on the stream are busy.

Next we use the foreach function of ZSink which is used to apply an effectful function to all the values emitted by the sink.

val streamEffect = streams.run(ZSink.foreach((offset => offset.commit)))

Run Consumer App

Now our Consumer is ready to read messages from Kafka topic,so define the run method of zio.App which execute our application by providing the ZLayer and effectful action.

override def run(args: List[String]) =
    streamEffect.provideSomeLayer(consumer ++ zio.console.Console.live).exitCode

Producer App

Now we create the producer app for producing messages to kafka topic.

As we define managedConsumer in consumer app similarly we also define mangedProducer with all required configurations.

But there will be a little difference in define the ProducerSettings as it require serde for serialize and deserialize the keys and values of the message to be produce.

  val managedProducer = Producer.make(ProducerSettings(List("localhost:9092")),Serde.string,Serde.string)
  val producer: ZLayer[Blocking, Throwable, Producer[Any, String,String]] = ZLayer.fromManaged(managedProducer)

After that we define producer similarly as consumer using the manageProducer which provide the Zlayer.

Next we provide the message to be produce using the ProducerRecord.

It creates a record to be sent to kafka.It requires topic ,key,value to be produce.

 val record = new ProducerRecord("kafkaTopic","key-1","abc")
  

Run the Producer App

At last we produce the message to be sent using the Producer.produce method.

Then we run the Producer app by override the run method of zio.App similarly Consumer App.

val producerEffect: ZIO[Producer[Any, String,String],Throwable, RecordMetadata] = Producer.produce(record)
 override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
    producerEffect.provideSomeLayer(producer).exitCode

Note:- First run the Consumer App and then Producer App for consuming the message in consumer app.

Complete source code for this example on this repository.

Conclusion

In this blog we set up basic application using zio-kafka for producing and consuming message with the help of docker.We make a docker-compose.yml file for start the zookeeper and Kafka services.Then, we focused on the consumer part, and we learned how to subscribe to a topic.

References

Ziverge:- https://ziverge.com/

Written by 

Akash Kumar is a Software Consultant at Knoldus Software LLP. He has done B.tech from Abdul Kalam Technical University. He is majorly focused on Scala and Angular. On the personnel side, he loves to play Cricket and online video games.