Morpheus – Cypher for Spark

fetching data from different sources using Spark 2.1
Reading Time: 4 minutes

The Apache Spark ecosystem is experiencing a boom with its newest version 3.0. Among the many exciting features being introduced, Apache has also decided to include a new module for Graph Processing: SparkGraph. This module brings Morpheus, Neo4j’s popular query language Cypher, its accompanying Property Graph Model, and Graph Algorithms to the data science toolbox on Spark.

Up until now Neo4j provided Graph Processing to Spark with project Cypher for Apache Spark (CAPS) under the name Morpheus. This works as a third-party add-on to the Spark ecosystem. But with Spark 3.0, Morpheus is set to be added as a Spark component with the name SparkGraph.

Before looking directly into the SparkGraph module of Spark 3.0, this blog helps you understand Neo4j’s current offering – Morpheus.

Hello Morpheus

Morpheus extends Apache Spark with Cypher, the industry’s most widely used property graph query language. It builds on the Spark SQL DataFrame API, offers integration with standard Spark SQL processing, and also allows integration with GraphX. It can be called as SQL for graph processing.

Cypher is based on the Property-Graph model. The Property Graph is a graph database where data is organized as nodes, relationships, and properties (data stored on the nodes or relationships). It is a directed, vertex – labeled, edge – labeled multi-graph with self edges, where edges have their own identity.

Property Graph

The above figure represents a Property Graph where Employee, Company, and City are nodes, each with some properties. Employee node and company node are connected by a relationship :HAS_CEO which has a property name start_date. Similarly, the Company node and City node are connected with the :LOCATED_IN relationship.

Extending Graphs into Spark Job

Consider 3 data sets:

  • Persons.csv : It contains information about different actors
id:Intnameborn
3Carrie-Anne Moss1967
4Hugo Weaving1960
Persons.csv
  • Movies.csv : It contains information about different movies
id:Inttitletagline
1The MatrixWelcome to the Real World.
28The Matrix ReloadedFree your mind.
Movies.csv
  • Acted_in.csv : It contains an association between an actor id from persons data set to a movie id from movies data set.
rel_id:IntSTART_IDEND_ID
1328
241
Acted_in.csv

Thus, persons and movies are nodes, while acted_in represents the relationship between them. The column named START_ID represents a node from Persons.csv while END_ID represents a node from Movies.csv.

Creating Nodes

Now, let’s assume we have a spark job involving creating a property graph for the given data.

First, we create a morpheus session using spark session:

# Here, spark is the spark session
implicit val morpheus: MorpheusSession = MorpheusSession.create(spark)

To create a property graph for the given data, we read the data from CSV files into data frames. Once we get the data frames, we convert them to a Property Graph node using Element Mapping for nodes and relationships involved.

To create an Element Mapping for the node, we use the following code:

val movieNodeMapping = NodeMappingBuilder
      .withSourceIdKey("id:Int")
      .withImpliedLabel("Movies")
      .withPropertyKey(propertyKey = "title", sourcePropertyKey = 
       "title")
      .withPropertyKey(propertyKey = "tagline", sourcePropertyKey = 
      "tagline")
      .build

Source ID Key in node mappings is the data frame’s column which uniquely identifies a row. The ID of every should be explicitly set to Long, Integer, or String. Internally, Morpheus converts it to Array[Bytes]. Each node is tagged with a Label defining its role in our graph domain. Apart from these two mappings, others represent the properties of our node. For example, every movie has some properties present in the data frame columns named “title” and “tagline”.

Similarly, we create a mapping for the persons node too.

Once the mappings are created, we can create nodes using the following code:

///moviesDF, personsDF are the respective data frames for the CSVs
val moviesNode = MorpheusElementTable.create(movieNodeMapping, moviesDF)
val personsNode = MorpheusElementTable.create(personNodeMapping, personsDF)

Creating Edges

For creating a mapping for the Relationship, we use the following code:

val actedInMapping = RelationshipMappingBuilder
        .withSourceIdKey("rel_id:Int")
        .withSourceStartNodeKey("START_ID")
        .withSourceEndNodeKey("END_ID")
        .withRelType("ACTED_IN")
        .withPropertyKey("role", "role")
        .build

The relationship node contains extra properties. It has a SoureStartNodeKey to specify the column of the data frame representing the start node of an edge, and a SoureEndNodeKey to represent the end node of an edge. Also, we map a Relation Type which is like the label of the edge.

Now, to create the edges of the graph we use the following code:

//actedInDF is the data frame for Acted_in.csv 
val actedInRelation = MorpheusElementTable.create(actedInMapping, actedInDF)

Creating Property Graph

To create the final property graph, we use the following code:

val graph = morpheus.readFrom(personsNode,actedInRelation,moviesNode)

The above code creates a property graph using the nodes and relationships we provided. It would look like this:

Property Graph

Using Cypher

Now that the property graph is ready, we can create SQL-like query with Cypher to process the graph. One of the most used features of Cypher Language is Pattern Matching which allows us to convert our white-board graph directly to the Physical Graph Processing query easily.

1. To query all the actors names in the data frame with the name of the movie they acted in, we use a query like this:

val actor_movies = graph.cypher(
        "MATCH (p:Person)-[a:ACTED_IN]->(m:Movies)
         RETURN p.name AS ACTOR_NAME,m.title AS MOVIE_TITLE"
    )
actor_movies.records.show

The output of the above code is:

╔════════════════════╤══════════════════════════╗
║ ACTOR_NAME         │ MOVIE_TITLE              ║
╠════════════════════╪══════════════════════════╣
║ 'Carrie-Anne Moss' │ 'The Matrix Reloaded'    ║
║ 'Hugo Weaving'     │ 'The Matrix Revolutions' ║
║ 'Gloria Foster'    │ 'The Matrix'             ║
╚════════════════════╧══════════════════════════╝

2. To get the name of the movie on which a particular actor acted, we can use the following query:

val movie = graph.cypher(
        "MATCH 
        (p:Person{name:'Gloria Foster'}) -[a:ACTED_IN]-> (m:Movies) 
        RETURN m.title AS MOVIE_TITLE")
movie.records.show

The above query produces the output:

╔══════════════╗
║ MOVIE_TITLE  ║
╠══════════════╣
║ 'The Matrix' ║
╚══════════════╝

3. To pass parameters to the query, we can use the following query:

val param =  CypherValue.CypherMap(("actor_name", "Hugo Weaving"))
val actor = graph.cypher(
        s"MATCH (p:Person{name:{actor_name}}) -[a:ACTED_IN]-> 
        (m:Movies) RETURN m.title AS MOVIE_TITLE",
        param)
actor.records.show

The output from the above query is:

╔══════════════════════════╗
║ MOVIE_TITLE              ║
╠══════════════════════════╣
║ 'The Matrix Revolutions' ║
╚══════════════════════════╝

The complete code is available here

Conclusion

Graphs have a plethora of useful applications in recommendations, fraud detection, and research. With the vote for inclusion of Property Graph and it’s Cypher language as a Spark Component, SparkGraph allows us to extend our Spark Analytical and Data Science workflow. Also, Morpheus converts processed Property Graph back into spark Data frames. This allows us to create pipelines with Graph Processing as one of the stages.

Written by 

Software Consultant with 2+ years of experience, with a strong inclination towards Big Data Analytics and Data Science.