ZIO Streaming Applications, Transformation, and Error Handling

Reading Time: 4 minutes

Introduction to Streaming

Why do we really need streaming? We can understand the same with the following example.

Suppose we have a list of integers and we want to get all the prime numbers and do some further computation on that. In the typical application, we will use ZIO.foreachParN

But when we are using the above method we will have two problems:-

One Is High Latency:- Here we need to wait for the entire list to get processed first then it will jump to the next step of processing. As the list size increases the latency will be increased.

Second is Memory:- We need to keep the entire list in memory as we process it and this doesn’t work if we are working with an infinite data stream.

The solution is Zio Stream. In-stream we can create the stream of the same integers and take the integer one by one do the first task and then the second task followed by the first no need to wait for the entire list to get completed. We can run the stream in parallel using ZStream.fromIterable(numbers)
.mapMParUnordered(20)
. here we can process 20 elements at a time to achieve parallelism.

Creating Stream

There are multiple ways to create a Stream. We can create a stream from Kafka input, for local testing, etc.

We can create the stream using the range, from an iterable sequence, infinite stream, and finite stream, for testing using a schedule that will create elements after a fixed interval. all the stream creation methods are covered in the following code snipped.

  val finiteStream = ZStream.range(1, 20)

  val iterStream = ZStream.fromIterable(List(1,2,3,4,5,6))

  val infiniteStream= ZStream.iterate(1)(_+1)

  val scheduledStream= ZStream(1).schedule(Schedule.forever)

  val intervalStream = 
ZStream.repeatWithSchedule(4,Schedule.spaced(100.millisecond))

  val fromZioStream = ZStream.fromZIO(readLine)

Stream Operations

In this section, we will learn about the useful operation which we can perform on the zio stream.

Tapping

The tap the function does not change elements of the stream, it does not affect the return type of the stream. It is used to print the debug message or current content of the stream basically to track what is going on at current steps.

object tap extends ZIOAppDefault{
  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = myApp

  val myApp = ZStream.fromIterable(1 to 6)
    .tap(before => Console.printLine(before))
    .map(elem => elem + 100)
    .tap(now => Console.printLine(now))
    .runDrain
}

In the above set of code, we can see the initial element of the stream using the tap method then we are adding 100 to each element and then we are printing the same on the console using the same.

Taking Elements

Suppose we want to take certain elements of stream based on some condition, starting some element. we can use functions like .take method .takewhile or. takeUntil etc.

stream.take(5), stream.takeWhile(_ < 5), stream.takeUntil(_ == 5) 

Mapping

map function — It transforms one zio stream into another zio stream.

val intStream = ZStream("1", "2", "3", "4")
intStream.map(_.toString)

mapMPar — It is similar to mapM, but will evaluate effects in parallel.

Filtering

The ZStream.filter allows us to filter emitted elements:

ZStream.range(1, 11).filter(_ % 2 == 0)

Draining

sometimes we might want to execute its effect without emitting any element, in these situations to discard the results we should use the ZStream.drain method. It removes all output values from the zio stream:-

ZStream(1, 2, 3, 4, 5).tap(k => Console.printLine(k+3)).drain

object emit extends ZIOAppDefault {
  val s1 = ZStream.fromIterable(1 to 20)
  val s2 = s1.tap(k => Console.printLine(k+3)).runDrain

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = s2
}

it won’t emit any element, but as it executes the intermediate step it will print the elements as a part of the tap operation.

Partition

We can split the zio stream based on the boolean condition using the partition function over the stream. We can split the odd and even number stream in the following manner.

val partionStreams: ZManaged[Any, Nothing, (ZStream[Any, Nothing, Int], ZStream[Any, Nothing, Int])]= ZStream.fromIterable(0 to 100).partition(_ % 2 == 0)

Grouping

In this section, we will learn how we can group the events based on chunk size or time.

We can group the event based on the chunk size as follows:-

ZStream.fromIterable(0 to 8).grouped(3)

We can group the event by time and chunk size together whichever condition satisfies first as follow:-

ZStream.fromIterable(0 to 10).repeat(Schedule.spaced(1.seconds)).groupedWithin(30, 10.seconds)

object grouping extends ZIOAppDefault {
  def run =
    ZStream
      .fromIterable('a' to 'z')
      .schedule(Schedule.spaced(100.millis))
      .groupedWithin(1000, 600.millisecond).map{k => k.zipWithIndex}
      .debug
      .runDrain
}

Error Handling

orElse:- If we have a stream it may fail in that case we might have to recover from failure or run another stream. We can prove the fallback stream using the orElse method.

val s1 = ZStream(1, 2, 3) ++ ZStream.fail(“Oh! Error!”) ++ ZStream(4, 5)
val s2 = ZStream(7, 8, 9)


val stream = s1.orElse(s2)

catchAll:- We have another method catchAll which is more powerful than orElse, in this method we can decide based on the type of failure and provide the fallback stream.

val Stream =ZStream(1, 2, 3) ++ZStream.fail("Odd Num!") ++ZStream(4, 5) ++ZStream.fail("Even Num")

val evenStream = ZStream(2,4,6, 8)
val oddStream = ZStream(1,3,5,7)

val resultStream= Stream.catchAll{
  case "Odd Num!"  => oddStream
  case "Even Num!" => evenStream
}

onError:- If our stream encounters an error, we can do some cleanup activity or print some failure message on the console for debugging process.

val stream = 
  (ZStream(1, 2, 3) ++ ZStream.dieMessage("Oh! Boom!") ++ ZStream(4, 5))
    .onError(_ => putStrLn("Stream application closed! We are doing some cleanup jobs.").orDie)

Retrying:- When a stream fails during some operation we can retry after a certain interval using the .retry method.

        AWSSinkService
          .uploadService(configReader.S3_BASE_BUCKET, key, chunkOfBytes)
          .tapError(ex => ZIO(logger.error("Exception : " + ex.getMessage)))
          .retry(zio.Schedule.exponential(10.second))

Conclusion

In this blog, we have learned why we need streaming its use cases. How we can create the dummy stream and what is the operation available on a stream. How we can handle the failure case retry and use a fallback.

In the subsequent blog, we will be learning how we can run the stream in a microservice merged with two streams and some production-level use cases.

Written by 

Just another person who has some good exposure to Data Engineering. Scala | Spark | AKKA | Kafka