Everything You Need To Know About ZIO Fiber And Its Operations Explained.

Reading Time: 3 minutes

Introduction

To perform an effect without blocking the current process, we can use fibers in ZIO Library, which are a lightweight concurrency mechanism.

In the ZIO library, we can fork any IO[E, A] to immediately yield a UIO[Fiber[E, A]]. The provide Fiber can be use to join the fiber, which will resume on the production of the fiber’s value, or to interrupt the fiber, which immediately terminates the fiber and safely releases all resources acquired by the fiber in the ZIO library.

val example =
  for {
    fiber1   <- exampleData(data).fork  // IO[E, example]
    fiber2   <- validateData(data).fork // IO[E, Boolean]
    // Do other stuff
    valid    <- fiber2.join
    _        <- if (!valid) fiber1.interrupt
                else IO.unit
    example <- fiber1.join
  } yield example

Operations

fork and join

When we need to start a fiber, we must fork an effect, which gives us a fiber. It is similar to starting a Java thread or adding a new thread to a Java thread pool; it is the same concept. Also, joining is a way of waiting for that fiber to compute its value. We will wait until it is complete.

import zio._
import zio.console._
import zio.clock._
import zio.duration._
for {
  fiber <- (sleep(3.seconds) *>
    putStrLn("Hello, after 3 second") *>
    ZIO.succeed(10)).fork
  _ <- putStrLn(s"Hello, to ZIO Library!")
  res <- fiber.join
  _ <- putStrLn(s"Our fiber succeeded with $res")
} yield ()

fork0

This more powerful variant of fork, called fork0, allows the specification of supervisors that will be pass non-recoverable errors from the forked fiber, including those that occur in finalizers. This supervisor needs to be specified. If not, the parent fiber supervisor will be use, recursively, up to the root handler, which can be particular in Runtime (the default supervisor simply prints the stack trace).

forkDaemon

Using ZIO#forkDaemon, the effect is fork into a new fiber attached to the global scope. This means that the forked fiber will continue to run when the previous fiber that executed the returned effect terminates.

We have three effects in the following example: inner, outer, and myApp. Using ZIO#forkDaemon, the outer effect forks the inner effect. In the myApp effect, the inner fiber is fork by using the ZIO#fork method and interrupted after three seconds. Fork in global scope, the inner effect won’t be interrupted and will continue to do its job:

val inner = putStrLn("Inner job is running.")
  .delay(1.seconds)
  .forever
  .onInterrupt(putStrLn("Inner job interrupted.").orDie)

val outer = (
  for {
    f <- inner.forkDaemon
    _ <- putStrLn("Outer job is running.").delay(1.seconds).forever
    _ <- f.join
  } yield ()
).onInterrupt(putStrLn("Outer job interrupted.").orDie)

val myApp = for {
  fiber <- outer.fork
  _     <- fiber.interrupt.delay(3.seconds)
  _     <- ZIO.never
} yield ()

interrupt

We can simply call interrupt whenever we wish to remove our fiber. Until all fiber finalizers have been execute and the interrupt operation has been completed or interrupted, the interrupt operation will not resume. This provides a means of building programs without leaks of resources.

await

You can call await on fiber to check whether it succeeded or failed. As soon as we call await, that fiber will be wait for to terminate, and we will be given back its value as an Exit. A success or failure value will be return.

import zio.console._
import zio.random._
for {
  booleanValue <- nextBoolean
  fiber <- (if (booleanValue) ZIO.succeed(10) else ZIO.fail("The boolean was false")).fork
  exitValue <- fiber.await
  _ <- exitValue match {
    case Exit.Success(value) => putStrLn(s"Fiber is succeeded with $value")
    case Exit.Failure(cause) => putStrLn(s"Fiber is failed")
  }
} yield ()

In ZIO library the await is similar to join but lower level than join. Whenever we call join, if the underlying fiber fails, we’ll encounter the same error when trying to join it.

Parallelism

The zipPar method can be use to execute actions in parallel:

def largeCompute(m1: Matrix, m2: Matrix, v: Matrix): UIO[Matrix] =
  for {
    t <- computeInverse(m1).zipPar(computeInverse(m2))
    (i1, i2) = t
    r <- applyMatrices(i1, i2, v)
  } yield r

Racing

In ZIO fiber two IO actions can be race, which means they will execute in parallel, and the value of the first action that completes successfully will be return.

result(100) race result(200)

In zio fiber, the race combinator is resource-safe, which means that if one of the two actions returns a value, the other one will interrupt, to prevent wasting resources.

Conclusion

By using ZIO we have managed to unearth only a fraction of its potential – by using ZIO Fibers within effects. Out of the box parallelism, immutability, referential transparency, and wrapped side effect managing are what has made this example painless and really very enjoyable to write.

For more about ZIO library visit : https://zio.dev

Written by 

Pappu Bhardwaj is a Software Intern at Knoldus Inc. in Noida. He has completed his B.Tech from the KIET Group of Institution, Ghaziabad. He is recognized as a good team player, a dedicated and responsible professional, and a technology enthusiast. He is a quick learner & curious to learn new technologies. He is passionate about Competetive Programming. He loves to play cricket and wildlife photography.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading