Intercepting Nutch Crawl Flow with a Scala Plugin


Apache Nutch, is an open source web search project. One of the interesting things that it can be used for is a crawler. The interesting thing about Nutch is that it provides several extension points through which we can plugin our custom functionality. Some of the existing extension points can be found here. It supports a plugin system which is used in Eclipse as well.

For one of our subprojects, which required to crawl a few websites, we decided to inject our Scala plugin into the flow to do our specific logic. For starters, here is how the Nutch Crawl mechanism overview

As you would notice, in order to begin the crawl, we need to populate the Crawldb and then the fetchlist is generated. On the basis of this fetchlist, a segment for crawl is prepared and new links are injected back to the crawldb. This process continues till the desired depth is reached or the crawldb is out of links.

For getting more information on how to start with Nutch, refer to the tutorial.

What we were required to do is that, as a part of the crawl cycle, specifically between steps 4 and 5 in the diagram above, we wanted to do update our database (MongoDB) with some information on the basis of the crawl. For that we decided to inject a Scala plugin in the crawl cycle.

In order for us to compile the Scala Plugin project, we need to get some dependencies into SBT. Our build.sbt looks like this

name := "MyAggregator"
version := "1.0"
scalaVersion := "2.9.1"
resolvers += "Central Repo" at "http://repo1.maven.org/maven2"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "Akka io" at "http://akka.io/repository"
libraryDependencies += "org.scalatest" %% "scalatest" % "1.6.1"
libraryDependencies += "org.specs2" %% "specs2" % "1.8.2"
libraryDependencies += "junit" % "junit" % "4.9"
libraryDependencies += "org.apache.nutch" % "nutch" % "1.4"
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.0.0"
libraryDependencies += "xml-apis" % "xml-apis" % "1.3.04"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.6.4"
libraryDependencies += "com.mongodb.casbah" %% "casbah" % "2.1.5-1"
libraryDependencies += "com.novus" %% "salat-core" % "0.0.8-SNAPSHOT"
seq(assemblySettings: _*)

The code for our ParserFilter looks like this

import org.apache.hadoop.conf.Configuration
import org.apache.nutch.parse.HTMLMetaTags
import org.apache.nutch.parse.HtmlParseFilter
import org.apache.nutch.parse.ParseResult
import org.apache.nutch.protocol.Content
import org.apache.nutch.util.NodeWalker
import org.w3c.dom.DocumentFragment
import org.w3c.dom.Node

class KDParseFilter extends HtmlParseFilter {
  var conf: Configuration = null
  var doc: DocumentFragment = null
  var headings: Array[String] = Array[String]()

  def filter(content: Content, parseResult: ParseResult, metaTags: HTMLMetaTags, doc: DocumentFragment): ParseResult = {
    /* filter only required results */
    parseResult.filter
    val parse = parseResult.get(content.getUrl())
    val text = parse.getText()
    /* send the text to actor for processing*/
    (new TextManager).start ! text

    return parseResult
  }

  def setConf(conf: Configuration) {
    this.conf = conf
    headings = conf.getStrings("headings")
  }

  def getConf: Configuration = this.conf

  def getElement(element: String): String = {
    val walker = new NodeWalker(doc)
    while (walker.hasNext) {
      val currentNode = walker.nextNode
      if (currentNode.getNodeType() == Node.ELEMENT_NODE) {
        if (element.equalsIgnoreCase(currentNode.getNodeName)) {
          return getNodeValue(currentNode)
        }
      }
    }
    return null
  }

  def getNodeValue(node: Node): String = {
    val buffer = new StringBuffer
    val children = node.getChildNodes
    for (i <- 0 to children.getLength) {
      if (children.item(i).getNodeType == Node.TEXT_NODE) {
        buffer.append(children.item(i).getNodeValue)
      }
    }
    return buffer.toString
  }
}

As you would notice, we extend the HtmlParseFilter and inject our logic there. As soon as we get the parsed text, we send a message to a Scala actor which does further processing. This way we do not keep the crawl cycle hostage to our processing and can easily delegate the processing asynchronously to an actor.

Let us look at the actor implementation,

import scala.actors.Actor
import com.novus.salat.annotations.raw.Key
import com.novus.salat.dao.SalatDAO
import com.novus.salat._
import com.novus.salat.global._
import com.novus.salat.annotations._
import com.novus.salat.dao._
import com.mongodb.casbah.Imports._
import com.mongodb.casbah.MongoConnection

class TextManager extends Actor {

  var events: List[Event] = List()
  def act() {
    react {
      case text:String =>
        for (event <- parseTextForEvents(text).getOrElse(List())) yield Event.createEvent(event)
      case _ =>
                println("^^^^^ received a message but not valid")
    }
  }

  def parseTextForEvents(text: String): Option[List[Event]] = {
    text.replaceAll("\"", "")
    val starter = text.indexOf("London Olympics ") + 43
    starter match{
      case -1 => return None
      case _ =>
        // parsing mechanism
        Option(events)
    }
  }

  }

}

case class Event(@Key("_id") id: ObjectId, name: String)
object Event {
  def createEvent(event: Event) {
    EventDAO.insert(event)
  }
}

object EventDAO extends SalatDAO[Event, ObjectId](collection = MongoConnection()("london")("event"))

Our scala actor picks up the processing part and puts the desired parsed information into MongoDB. Once we have the code compiling then it is time to package it. For that we used sbt assembly This helped us a get a jar of the project which can directly be deployed in Nutch.

Nutch requires a corresponding plugin.xml file for it to register the plugin. Our plugin.xml looks like this

<plugin id="MyAggregator" name="My Parse Filter" version="1.0.0" provider-name="nutch.org">
	<runtime>
		<library name="myaggregator.jar">
			<export name="*" />
		</library>
	</runtime>
	<requires>
		<import plugin="nutch-extensionpoints" />
	</requires>
	<extension id="org.apache.nutch.parse.headings" name="Nutch Headings Parse Filter" point="org.apache.nutch.parse.HtmlParseFilter">
		<implementation id="KDParseFilter"
			class="com.knoldus.KDParseFilter">
		</implementation>
	</extension>
</plugin>

Here we are telling Nutch, which extension point we are extending and where is the implementation.

Next step is to make Nutch aware of the plugin. For this change the following setting in the ../runtime/local/conf/nutch-site.xml to make Nutch aware of our plugin as well


<property>
   <name>plugin.includes</name>
 <value>MyAggregator|protocol-http|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|scoring-opic|urlnormalizer-(pass|regex|basic)</value>
</property>

Now once these steps are done, Nutch would know that it has to call this plugin as well as a part its crawl cycle. When we execute Nutch with something like

bin/nutch crawl urls -dir vikas -depth 2 -topN 3

you would be able to see the records present in MongoDB as a part of the crawl process. Thus, it is easy to intercept and extend the Nutch cycle with your custom plugins. You just need to get lucky by finding your way through some sketchy documentation 😉

About Vikas Hazrati

Vikas is the Founding Partner @ Knoldus which is a group of software industry veterans who have joined hands to add value to the art of software development. Knoldus does niche Reactive and Big Data product development on Scala, Spark and Functional Java. Knoldus has a strong focus on software craftsmanship which ensures high-quality software development. It partners with the best in the industry like Lightbend (Scala Ecosystem), Databricks (Spark Ecosystem), Confluent (Kafka) and Datastax (Cassandra). To know more, send a mail to hello@knoldus.com or visit www.knoldus.com
This entry was posted in Architecture, Scala and tagged , , , , , , , . Bookmark the permalink.

2 Responses to Intercepting Nutch Crawl Flow with a Scala Plugin

  1. Mayank Bairagi says:

    It is really helpful. Thanks

  2. Mayank Bairagi says:

    Hello Mr. Vikas,
    I have a problem while using Nutch. When I try to lunch nutch crawler using any command eg. (bin/nutch crawl urls -dir crawl -depth 3 -topN 5). It throws an error: Exception in thread “main” java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Astar\mapred\staging\Astar-2124359786\.staging to 0700. Would you please help me?
    I am using Windows 7.

    Thanks

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s