Handling Large Data File Using Scala and Akka


We needed to handle large data files reaching size Gigabytes in scala based Akka application of ours. We are interested in reading data from the file and then operating on it. In our application a single line in a file forms a unit of data to be worked upon. That means that we can only operate on lines in our big data file.

These are the requirements.

  • We cannot read an entire file in memory
  • A line in a file is a unit of data to be processed
  • It is ok to loose a bit of data in each line for big data processing
  • We need to implement it in such a way that its optimal

The example big data file snippet can be of the form shown below. It can potentially be of size reaching Gigabytes of data and more and we need to process it.

.........
<tag1 ID='whatever'><tag2 arg='a'><tag3> Text here <tag4>More text</tag4></tag3></tag2></tag1>
<tag1 ID='whatever'><tag2 arg='a'><tag3> Text here <tag4>More text</tag4></tag3></tag2></tag1>
<tag1 ID='whatever'><tag2 arg='a'><tag3> Text here <tag4>More text</tag4></tag3></tag2></tag1>
.........

We decided to use RandomAccessFile api in java. It provides capability to seek to a particular location in a file and read data in bytes from that location onwards. Particularly two api calls are important as shown below.

randomAccessFile.seek(seekIndex) // call to seek to a particular position in file
randomAccessFile.read(byteArray) // read data in byteArray of specified size

But we need to figure out where to seek i.e. seekIndex for code listing above and the size of byteArray. Most importantly how to split the bytes read in number of lines.

The way to handle seekIndex is to know the number of bytes in big data file and then divide it in chunks of data we want to read in one go. There is a defaultBlockSize constant variable and we divide entire file in chunks of this defaultBlockSize. The seekIndex is then multiples of defaultBlockSize. The code listing for number of seeks is shown below.

...
val defaultBlockSize = 1 * 1024 * 1024
...

private def getNumberOfChunks(bigDataFilePath: String): Int = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      (randomAccessFile.length / defaultBlockSize).toInt
    } finally {
      randomAccessFile.close
    }
  }

If there is a 40 MB file then numberOfChunks be 40 in number. This means that now we will read 1 MB byte sized chunks forty times each and then operate on that. For operating on each line we will have to extract lines from the read byte buffer. In this case we would like to do a simple operation on each line extracted. For simplicity sake: we do a word count on each line extracted from chunk of data read from file.

...
val defaultBlockSize = 1 * 1024 * 1024
...
for(i <- 1 to getNumberOfChunks(bigdataFilePath)) {
    readLines(i, bigdataFilePath)
}

private def readLines(bigDataFilePath: String, chunkIndex: Int): Array[String] = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      val seek = (chunkIndex - 1) * BigDataProcessor.defaultBlockSize
      randomAccessFile.seek(seek)
      randomAccessFile.read(byteBuffer)
      val rawString = new String(byteBuffer)
      rawString.split(System.getProperty("line.separator"))
    } finally {
      randomAccessFile.close
    }
  }

private def getNumberOfChunks(bigDataFilePath: String): Int = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      (randomAccessFile.length / defaultBlockSize).toInt
    } finally {
      randomAccessFile.close
    }
  }

Now putting it all together, this is the complete code listing:

package com.meetu.akka.bigdata

import java.io.RandomAccessFile

object BigDataProcessor extends App {

  val bigdataFilePath = "src/main/resources/bigdata.txt"
  val defaultBlockSize = 1 * 1024 * 1024
  val byteBuffer = new Array[Byte](defaultBlockSize)

  val startTime = System.currentTimeMillis

  for (i <- 1 to getNumberOfChunks(bigdataFilePath)) {
    val lines = readLines(bigdataFilePath, i)
    getWordCount(lines)
  }

  val endTime = System.currentTimeMillis
  println("Total time in millis: " + (endTime - startTime))

  def getWordCount(lines: Array[String]) = {
    lines foreach {
      line => line.split(" +").length
    }
  }

  private def readLines(bigDataFilePath: String, chunkIndex: Int): Array[String] = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      val seek = (chunkIndex - 1) * BigDataProcessor.defaultBlockSize
      randomAccessFile.seek(seek)
      randomAccessFile.read(byteBuffer)
      val rawString = new String(byteBuffer)
      rawString.split(System.getProperty("line.separator"))
    } finally {
      randomAccessFile.close
    }
  }

  private def getNumberOfChunks(bigDataFilePath: String): Int = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      (randomAccessFile.length / defaultBlockSize).toInt
    } finally {
      randomAccessFile.close
    }
  }
}

Above is an implementation of operating on a large file and extracting lines on chunk of data read from file and doing a simple word count operation on each line. It meets first three requirements outlined in the beginning except the last one of making it optimal. Now, lets use some Akka code to speed up our Application.

Akka uses Actors and using actors we can do parallel processing to speed up the performance. Let’s use FileWorker Actor to read data from file and operate on it. We need to pass to FileWorker the big data file path, the chunk index and total number of chunks.

Let’s look at the code listing of the FileWorker Actor and the message scala case class.

case class BigDataMessage(bigdataFilePath: String, chunkIndex: Int, totalChunks: Int)

class FileWorker extends Actor {
  var byteBuffer = new Array[Byte](BigDataProcessor.defaultBlockSize)
  def receive = {
    case BigDataMessage(bigDataFilePath, chunkIndex, totalChunks) =>
      val lines = readLines(bigDataFilePath, chunkIndex)
      getWordCount(lines)
  }
  
  def getWordCount(lines: Array[String]) = {
    lines foreach {
      line => line.split(" +").length
    }
  }

  private def readLines(bigDataFilePath: String, chunkIndex: Int): Array[String] = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      val seek = (chunkIndex - 1) * BigDataProcessor.defaultBlockSize
      randomAccessFile.seek(seek)
      randomAccessFile.read(byteBuffer)
      val rawString = new String(byteBuffer)
      rawString.split(System.getProperty("line.separator"))
    } finally {
      randomAccessFile.close
    }
  }
}

When FileWorker Actor received the BigDataMessage it read lines and then do a word count on each line extracted. The code listing for reading lines and doing a word count is same as the previous version.

To measure performance i.e. the time elapsed we also require a Diagnostics Actor. It will calculate total time elapsed for big data file processing. Here is a code listing for the Diagnostics Actor.

class Diagnostics extends Actor {
  var startTime = 0.0
  def receive = {
    case BigDataMessage(bigDataFilePath, chunkIndex, totalChunks) =>
      if(chunkIndex == 1) startTime = System.currentTimeMillis
      if(chunkIndex == totalChunks) println("Total Time: " + (System.currentTimeMillis - startTime))
  }
}

For executing big data processing we now need a BigDataProcessor. It will be responsible for starting the FileWorker actors, a Diagnostic Actor, a cyclic router and send messages of type BigDataMessage scala case class.

Below is the code listing for the BigDataProcessor.

object BigDataProcessor extends App {

  val bigdataFilePath = "src/main/resources/bigdata.txt"
  val defaultBlockSize = 1 * 1024 * 1024

  val diagnostics = actorOf[Diagnostics].start
  distributeMessages

  private def setDiagnostics: ActorRef = {
    actorOf[Diagnostics].start
  }

  private def distributeMessages = {
    val workerCount = Runtime.getRuntime.availableProcessors
    val workers = Vector.fill(workerCount * 2)(actorOf[FileWorker].start)
    val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
    val totalChunks = totalMessages(bigdataFilePath)
    for (i <- 1 to totalChunks) {
      router ! BigDataMessage(bigdataFilePath, i, totalChunks)
    }
  }

  private def totalMessages(bigDataFilePath: String): Int = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      (randomAccessFile.length / defaultBlockSize).toInt
    } finally {
      randomAccessFile.close
    }
  }
}

Combining the pieces here is a complete code listing for this Application.

package com.meetu.akka.bigdata

import java.io.RandomAccessFile

import akka.actor.Actor.actorOf
import akka.actor.Actor
import akka.routing.{ CyclicIterator, Routing }

object BigDataProcessor extends App {

  val bigdataFilePath = "src/main/resources/bigdata.txt"
  val defaultBlockSize = 1 * 1024 * 1024
  val diagnostics = actorOf[Diagnostics].start
  distributeMessages

  private def distributeMessages = {
    val workerCount = Runtime.getRuntime.availableProcessors
    val workers = Vector.fill(workerCount * 2)(actorOf[FileWorker].start)
    val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
    val totalChunks = totalMessages(bigdataFilePath)
    for (i <- 1 to totalChunks) {
      router ! BigDataMessage(bigdataFilePath, i, totalChunks)
    }
  }

  private def totalMessages(bigDataFilePath: String): Int = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      (randomAccessFile.length / defaultBlockSize).toInt
    } finally {
      randomAccessFile.close
    }
  }
}

class FileWorker extends Actor {
  var byteBuffer = new Array[Byte](BigDataProcessor.defaultBlockSize)
  def receive = {
    case BigDataMessage(bigDataFilePath, chunkIndex, totalChunks) =>
      val lines = readLines(bigDataFilePath, chunkIndex)
      getWordCount(lines)
      BigDataProcessor.diagnostics ! BigDataMessage(bigDataFilePath, chunkIndex, totalChunks)
  }

  def getWordCount(lines: Array[String]) = {
    lines foreach {
      line => line.split(" +").length
    }
  }

  private def readLines(bigDataFilePath: String, chunkIndex: Int): Array[String] = {
    val randomAccessFile = new RandomAccessFile(bigDataFilePath, "r")
    try {
      val seek = (chunkIndex - 1) * BigDataProcessor.defaultBlockSize
      randomAccessFile.seek(seek)
      randomAccessFile.read(byteBuffer)
      val rawString = new String(byteBuffer)
      rawString.split(System.getProperty("line.separator"))
    } finally {
      randomAccessFile.close
    }
  }
}

class Diagnostics extends Actor {
  var startTime = 0.0
  def receive = {
    case BigDataMessage(bigDataFilePath, chunkIndex, totalChunks) =>
      if (chunkIndex == 1) startTime = System.currentTimeMillis
      if (chunkIndex == totalChunks) println("Total Time: " + (System.currentTimeMillis - startTime))
  }
}

case class BigDataMessage(bigdataFilePath: String, chunkIndex: Int, totalChunks: Int)

To execute the application run both the versions as scala application either from eclipse or from the terminal. The codebase is also on my github. It is a sbt based project after you have downloaded the code execute following command in the terminal

sbt "run-main com.meetu.akka.bigdata.BigDataProcessor"

When you run both the versions you will notice that with increase in big data file size Akka based version performance starts becoming better than the standalone version. There is a LameBigDataProcessor as well you can run that with the following command in the terminal

sbt "run-main com.meetu.akka.bigdata.LameBigDataProcessor"

Here is my run of both of these versions. Compare their elapsed timings they are in milliseconds.

Akka massive scalable architecture using Actors to achieve parallelism enables to scale up the system by optimal use of cores on a single box.

This entry was posted in Scala and tagged , , . Bookmark the permalink.

5 Responses to Handling Large Data File Using Scala and Akka

  1. Sander Mak says:

    Isn’t doing blocking I/O inside actors considered a bad practice (see for example http://stackoverflow.com/questions/9485039/blocking-io-in-akka)? How about scanning over the file sequentially, while sending a message for each chunk read (or even line read) to the Actor pool to be processed?

    I think you could end up with the best of both worlds: a single thread do sequential I/O on a large file, while asynchronously and concurrently processing the data read.

  2. Meetu Maltiar says:

    Hi Sander Mak,

    Thanks for your suggestion of doing sequential read and passing the data around, infact that was our first approach as well. Actually, In our situation the file resides in a shared disk and we would have to move large chunks of data over network, we noticed massive degradation of performance with increase of payload size in message and hence stayed away from it.

    Regards,
    Meetu Maltiar

  3. Pingback: 2012년 3월 23일 |

  4. Pingback: Ton of Subversion and Codign Links | Agile Mobile Developer

  5. mega says:

    Thank you for sharing this great topic.Sir, i have a question which might not sound okie to ask but i am really wondering how to get this code written in java code.I have never used scala before neither akka , i am a fresh student and i am learning java and currently i am facing this kinda problem where i want to load a huge file to be parsed + inserted into sql-database in a shared disk environment.Please try to help if you can. Thank you

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s