Alpakka: Process big CSV files using Akka Streams

Table of contents
Reading Time: 3 minutes

In this big data world, we have lots of data, and generally, that data comes in CSV files. So as the data is huge, so the CSV files will also be huge. It will not be feasible to load that CSV file into memory in one go. So here the challenge comes, that how should we read those big CSVs.

I got the same use case in one of my projects. I got an assignment where I needed to process the CSV file and do some transformation.

The first thing came in my mind that I should load the CSV file in memory and then should do processing, but as the file was big, so this option was not the feasible one, so I dropped that idea. Then I thought of reading the file line by line. By doing this, the whole file will not be loaded in memory as only one line at a time will be loaded, so this option was looking perfect.

So I decided to go with this approach only. Now I needed to decide what technology should I used to read the CSV file line by line.

First I thought of using scala.io.Source which reads the file and returns the iterator. Using iterator, we can process the file line by line as follows:

object Application {
  def main(args: Array[String]): Unit = {
    val source = Source.fromFile("/home/rishi/files/emp.csv")
    val lines = source.getLines()
    while (lines.hasNext) {
      println(lines.next())
    }
  }
}

The above code looks pretty simple and will process the CSV file line by line easily. But it was not fulfilling my use case completely. I wanted to access each line of CSV with its header but in the above code, it is not happening, and to fulfill my use case, I needed to create a Map explicitly by taking CSV’s first line as a header. Doing all these things by myself will take some effort, therefore I was looking for some library which will do it for me.

So I moved to Alpakka which uses Akka stream under the hood and provides simple APIs to process CSV. Akka stream does the processing in a non-blocking way and provides the features of backpressure. Non-blocking processing was the extra advantage that I was getting with Akka stream. (scala.io.Source processes the file in a synchronous way.)

Let’s take a look of the code below:

First, we need to add the below dependencies in the pom.xml file:

<dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-csv_2.12</artifactId>
    <version>2.0.1</version>
</dependency>

<dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_2.12</artifactId>
    <version>2.6.6</version>
</dependency>

then, create a CsvUtility file which reads the CSV file using Akka streams:

CsvUtility.scala

class CsvUtility(implicit val system: ActorSystem) {

  def processCsvFile(inputFile: File): Future[Boolean] = {
    val inputFileName = inputFile.getName
    val csvFile: Path = Paths.get(inputFile.getPath)
    val source = FileIO.fromPath(csvFile)

    source
      .via(CsvParsing.lineScanner('~'))
      .via(CsvToMap.toMapAsStrings())
      .filter(row => row.getOrElse("dept", "") == "bowler")
      .runForeach(x => println(x))
      .map { _ =>
        println(s"Successfully processed the file $inputFileName")
        true
      }
      .recover {
        case _: Exception => println(s"Error in processing the file $inputFileName")
          false
      }
  }

As the CSV file is having tilde (~) as the separator, so we have provided that separator explicitly as

CsvParsing.lineScanner(‘~’).

To convert each line into a Map where the key will be the CSV header and the value will be the row value, we have used the following code :

CsvToMap.toMapAsStrings()

Akka stream returns the future as it processes the files in a non-blocking way.

Now, create a main class that will start the process.

Application.scala

object Application {

  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem()

    val file = new File("/home/rishi/files/cricketer.csv")
    new CsvUtility().processCsvFile(file)
      .map { x =>
        if (x) println("File successfully processed") else println("File processing failed")
        system.terminate()
      }
      .recover {
        case _: Exception => println("Error in file processing")
          system.terminate()
      }
  }
}

Let’s look at the CSV file.

cricketer.csv:

id~name~dept
1~Rishi~bowler
2~Virat~batsman
3~Yuvi~batsman
4~Zaheer~bowler
5~Bumrah~bowler
6~Shami~bowler

That’s it. It’s quite simple. There are many other ways to use Alpakka for CSV processing which you can find out here.

I hope, this blog will be useful for everyone.

References: https://doc.akka.io/docs/alpakka/current/data-transformations/csv.html

blog-footer

Written by 

Rishi is a tech enthusiast with having around 10 years of experience who loves to solve complex problems with pure quality. He is a functional programmer and loves to learn new trending technologies. His leadership skill is well prooven and has delivered multiple distributed applications with high scalability and availability by keeping the Reactive principles in mind. He is well versed with Scala, Akka, Akka HTTP, Akka Streams, Java8, Reactive principles, Microservice architecture, Async programming, functional programming, distributed systems, AWS, docker.