Neo4j with Spark 2.4.0

AS
3 min readSep 29, 2019

Using spark for inserting and reading data from the Neo4j graph database

Libraries

for Neo4j spark connector, we will be using the latest release which supports spark 2.4 from https://github.com/neo4j-contrib/neo4j-spark-connector/releases

compile(group: 'graphframes', name: 'graphframes', version: "0.7.0-spark2.4")compile(group: 'neo4j-contrib', name: 'neo4j-spark-connector', version: "2.4.0-M6")

Download and Install the neo4j desktop version — https://neo4j.com/download/

Creating a Spark-Session

In the spark session, it is necessary to set the bolt URL, neo4j database username, and password

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkContext
import org.neo4j.spark._
object Neo4jSparkExperiment {

def main(args: Array[String]): Unit = {

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

val spark = SparkSession
.builder().master("local")
.config(Neo4jConfig.prefix + "url", "bolt://localhost")
.config(Neo4jConfig.prefix + "user", "neo4j")
.config(Neo4jConfig.prefix + "password", "admin")
.appName("SparkSessionNeo4jExample")
.getOrCreate()
val sparkContext = spark.sparkContext
}
}

1- Creation of Node

Neo4jDataFrame.execute is used to execute any cypher query. In CreateNodes function dataFrame contains all the column which are required as the node properties for building a node in a graph.

nodes is a type of Tuple2 in scala, in which first parameter is the label of the node and second is the list of proerties of the nodes. The value of these properties should be derived from dataframe.

val nodes = Tuple2(“User”, Seq(“property1”, “property2”))

The third parameter is the renamed column map if dataframe have the columns with a different name then the node properties.


def
createNodes(sc: SparkContext, dataFrame: DataFrame, nodes: (String, Seq[String]), renamedColumns: Map[String, String] = Map.empty): Unit = {

val nodeLabel: String = renamedColumns.getOrElse(nodes._2.head, nodes._2.head)

val createStatement =
s"""
| UNWIND {rows} as row
| MERGE (node: $nodeLabel {`$nodeLabel` : row.source.`$nodeLabel`})
| SET node = row.node_properties
""".stripMargin

val partitions = Math.max(1, (dataFrame.count() / 10000).asInstanceOf[Int])

val neo4jConfig = Neo4jConfig(sc.getConf)

dataFrame.repartition(partitions).foreachPartition(rows => {
val params: AnyRef = rows.map(row => {
val nodePropertyMap: Map[String, AnyRef] = Map(
"node_properties" ->
nodes._2.map(c => (renamedColumns.getOrElse(c, c), row.getAs[AnyRef](c))).toMap.asJava)
val nodePropertiesMap = nodePropertyMap + ("source" -> Map(nodeLabel -> row.getAs(nodeLabel).asInstanceOf[AnyRef]).asJava)
nodePropertiesMap.asJava
}
).asJava
Neo4jDataFrame.execute(neo4jConfig, createStatement, Map("rows" -> params).asJava)
})
}

2- Create a new relationship between two nodes

In this method again dataframe contains the set of columns as relationship property. start node and end node is Tuple2 same as previously explained in the creation of the node.

relationship again in Tuple2 in scala. which contains the first param as the name of relationship and another one as relationship properties

val startNode = Tuple2(“User”, Seq(“property1”, “property2”))

val relationship = Tuple2(“relationship_name”, Seq(“property1”, “property2”))

def setOnCreateRelationship(sc: SparkContext, dataFrame: DataFrame, startNode: (String, Seq[String]), relationship: (String, Seq[String]), endNode: (String, Seq[String])): Unit = {  Neo4jDataFrame.mergeEdgeList(sc, dataFrame, startNode, relationship, endNode)}

The previous command will create a new relationship every time with the new properties passed. It will not update the value in the previously created relationship.

If you want to update the property in the previously created relationship use the below method. It will only create a relationship if not present otherwise it will update the properties in the already created relationship.

4- Update previously created a relationship between two node

For the creation of only one relationship between the nodes. and updated the property to new property every time, we have used a cypher to merge relationship. In this method, the params are the same as the previous method.

def updateOnCreateRelationship(sc: SparkContext, dataFrame: DataFrame, startNode: (String, Seq[String]), relationship: (String, Seq[String]), endNode: (String, Seq[String])): Unit = {

val mergeStatement =
s"""
UNWIND {rows} as row
MERGE (source:`${startNode._1}` {`${startNode._2.head}` : row.source.`${startNode._2.head}`}) ON CREATE SET source += row.source
MERGE (target:`${endNode._1}` {`${endNode._2.head}` : row.target.`${endNode._2.head}`}) ON CREATE SET target += row.target
MERGE (source)-[rel:`${relationship._1}`]->(target) SET rel += row.relationship
"""
val partitions = Math.max(1, (dataFrame.count() / 10000).asInstanceOf[Int])
val config = Neo4jConfig(sc.getConf)

dataFrame.repartition(partitions).foreachPartition(rows => {
val params: AnyRef = rows.map(row =>
Map(
"source" -> startNode._2.map(nodeProperty =>
(nodeProperty, row.getAs[AnyRef](nodeProperty))).toMap.asJava,
"target" -> endNode._2.map(nodeProperty =>
(nodeProperty, row.getAs[AnyRef](nodeProperty))).toMap.asJava,
"relationship" -> relationship._2.map(relationshipProperty =>
(relationshipProperty, row.getAs[AnyRef](relationshipProperty))).toMap.asJava)
.asJava).asJava
Neo4jDataFrame.execute(config, mergeStatement, Map("rows" -> params).asJava, write = true)

})

}

5- Reading data in dataframe from neo4j

For reading the data from neo4j create a cypher query

Cypher Query = MATCH p=(u:User)-[rel:Friend]->(u:User) RETURN u.username as username

def loadDataFrame(sc: SparkContext, query: String): DataFrame = {
val neo = Neo4j(sc)
val graphDataFrame = neo.cypher(query).partitions(1).batch(25).loadDataFrame
graphDataFrame
}

--

--

AS

Software engineer at Expedia Group. Passionate about data-science and Big Data technologies.