Hadoop Word Count Program in Scala


You must have seen Hadoop word count program in java, python or in c/c++ but probably not in Scala. so, lets learn how to build Word Count Program in Scala.

Submitting a Job to Hadoop which is written in Scala is not that easy, because Hadoop runs on Java so, it does not understand the functional aspect of Scala.

For writing Word Count Program in Scala we need to follow the following steps.

  • Create Scala Project with Sbt having version of your choice.
  • Add Hadoop core Dependency in build.sbt from here.
  • Create Scala object say WordCount with main method in the project.
  • Create a class under the Scala object say Map that extends MapReduceBase class with Mapper class.
  • Provide body to Map Function.
  • Create another class under Scala object say Reduce that extends MapReduceBase class with Reduce class.
  • Provide body to reduce function.
  • Provide necessary job configuration in main method of Scala object.

Here is the example for Word Count Program written in Scala.

import java.io.IOException
import java.util._
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._
import org.apache.hadoop.util._

object WordCount {

  class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
    private final val one = new IntWritable(1)
    private val word = new Text()

    @throws[IOException]
    def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) {
      val line: String = value.toString
      line.split(" ").foreach {token =>
        word.set(token)
        output.collect(word, one)
      }
    }
  }

  class Reduce extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] {
    @throws[IOException]
    def reduce(key: Text, values: Iterator[IntWritable], output: OutputCollector[Text, IntWritable], reporter: Reporter) {
      import scala.collection.JavaConversions._
      val sum = values.toList.reduce((valueOne, valueTwo) => new IntWritable(valueOne.get() + valueTwo.get()))
      output.collect(key,  new IntWritable(sum.get()))
    }
  }

  @throws[Exception]
  def main(args: Array[String]) {
    val conf: JobConf = new JobConf(this.getClass)
    conf.setJobName("WordCountScala")
    conf.setOutputKeyClass(classOf[Text])
    conf.setOutputValueClass(classOf[IntWritable])
    conf.setMapperClass(classOf[Map])
    conf.setCombinerClass(classOf[Reduce])
    conf.setReducerClass(classOf[Reduce])
    conf.setInputFormat(classOf[TextInputFormat])
    conf.setOutputFormat(classOf[TextOutputFormat[Text, IntWritable]])
    FileInputFormat.setInputPaths(conf, new Path(args(0)))
    FileOutputFormat.setOutputPath(conf, new Path(args(1)))
    JobClient.runJob(conf)
  }
}

Till now we have created a program in Scala, now we need to submit this Program/ Job to Hadoop. For submitting a job to Hadoop we need to follow certain steps.

  • Add sbt-assembly plugin to plugin.sbt under project from here.
  • Open terminal and change directory to the root of the project.
  • In terminal run the command sbt clean compile assembly
  • This command will build the jar under target/scala<version> folder of project.
  • Create directory in HDFS by the following commnad.
    $HADOOP_HOME/bin/hadoop fs -mkdir input_dir 
  • Insert some data in newly created directory in HDFS by following command.
    $HADOOP_HOME/bin/hadoop fs -put sample.txt input_dir 
  • Now Submit job to Hadoop by following command.
    $HADOOP_HOME/bin/hadoop jar jar_name.jar input_dir output_dir 

the jar in the last command is same which is stored in target/scala<verion> directory of project.

you can see the output by the following command

$HADOOP_HOME/bin/hadoop fs -ls output_dir/

if you encounter any problem regarding the building of jar using sbt clean compile assembly command then you need to include mergeStrategy in build.sbt you can find related information here.

Resources :-

  • You can find hadoop-core dependency here.
  • You can find sbt-assembly Pluin here.
  • You can find the this Project here.

KNOLDUS-advt-sticker

About Akash Sethi

I am Akash Sethi. I am currently working at Knoldus Software LLP.
This entry was posted in big data, Scala and tagged , , , , , . Bookmark the permalink.

4 Responses to Hadoop Word Count Program in Scala

  1. Prabhat Kashyap says:

    Reblogged this on Prabhat Kashyap – Scala-Trek.

  2. rbarrdev says:

    This could have been much easier if you wrote this using Spark instead of MapReduce, unless this of a proof of concept to prove that its possible. This also is easily set up locally as well as on a cluster.

    Example Code, runs in most of not all IDE’s that support Scala, intellij is my preference.

    object TestDriver extends App{
    override def main(args: Array[String]): Unit = {
    // build spark configs
    val sparkConf = new SparkConf().setAppName(“test”)
    sparkConf.setMaster(“local[2]”)
    .set(“spark.executor.memory”, “2g”)

    // SparkContex
    val sc = new SparkContext(sparkConf)

    /* Word count in Scala */
    // Read in the RDD from the hdfs files
    val textFile = sc.textFile(“hdfs:/path/to/file”) // read the file into an Rdd[String]
    val counts = textFile.flatMap(line => line.split(” “) ) // split the line delimited by “0”
    .map(word => (word, 1)) // build map of word and value 1
    .reduceByKey(_ + _) // reduce by word (key in map) and sum the values
    counts.saveAsTextFile(“hdfs://path/to/file”) // write out
    }
    }

  3. Akash Sethi says:

    @rbarrdev Thanks for providing the code. i just want to prove that the Map reduce program can be written in Scala .

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s