Using the Neo4j Spark Connector with Databricks

May 11, 2020 - Neo4j Projects - 3 minute read

A quick tutorial on setting up Spark (in Databricks) to work with the Neo4j Aura DBaaS. I'll be using a free Databricks Community Cloud account to make my life easier.

south-park-wiki-page-layout

In this blog post I show how to set up Apache Spark (in Databricks cloud) to communicate with a Neo4j Aura causal cluster. I’ll be using a free Databricks Community Cloud account to make my life easier.

About the Neo4j Spark Connector

The Neo4j Spark connector is a community developed Scala library to integrate Neo4j with Spark. With just a few lines of Scala, the connector loads your Neo4j data Spark DataFrames, GraphFrames, GraphX, and RDDs for further processing.

Bonus - If you’re running a Neo4j cluster, it allows for distributed read operations from the cluster members to speed up your data loading. The current version supports Neo4j 3.5, but support for 4.0 is on the way.

Setting up Databricks

For my experiment I’ll be using the free DataBricks community edition. You can sign up for it here.

We’re going to be creating a cluster and configure it to use Neo4j. First set up a new cluster:


south-park-wiki-page-layout


Next, grab the latest release of the Neo4j Spark connector from Github, as well as the latest GraphFrames release:

Install the two libraries by clicking the ‘install new’ button on the cluster’s ‘libraries’ tab.


south-park-wiki-page-layout


Optional - if you want to visualize some graphs with Python later, install the networkx library under Libraries --> Install New --> PyPi --> networkx.

Configuration

Next up, we’re setting up the configuration so that the Neo4j Spark Connector knows where to connect to. I’m using a Neo4j Aura instance, which runs Neo4j 3.5. Since Aura runs a 3 machine causal cluster, we’ll be using the bolt+routing protocol in the configuration. If you’re connecting to a single instance, you’ll need the regular bolt:// protocol.

In Neo4j 4.0, both of these have been replaced by the neo4j:// protocol.

configpng

Copy-pastable version:

spark.neo4j.bolt.encryption true
spark.databricks.delta.preview.enabled true
spark.neo4j.bolt.password change
spark.neo4j.bolt.user neo4j
spark.neo4j.bolt.url bolt+routing://f1337.databases.neo4j.io:7687

And we’re done! Restart the cluster and lets get to the actual coding.

Using the Spark Connector

Next up, we’re going to read from the Neo4j graph into a graph frame. When doing this, Spark expects two Cypher queries:

  • Selecting the nodes, returning id, the Neo4j ID of the node.
  • Selecting the relationships, returning three properties: src, value, dst.

To make use of the multiple machines on the Spark cluster, we have to do two things:

  • Tell Spark how many partitions to use, and what the batch size is: .partitions(3).batch(25)
  • Embed in our Cypher how the results can be broken up into batches. This can be done by placing a {_skip} LIMIT {_limit} at the end of the query. The Spark connector will insert these parameters dynamically for you to break up your results.

Here’s what my loading code looks like:

%scala

import org.neo4j.spark._
import org.graphframes._

val neo = Neo4j(sc);

val nodesQuery = """MATCH (u:User) 
                    RETURN id(u) as id SKIP {_skip} LIMIT {_limit}
                 """

val relsQuery =  """MATCH (u:User)
                  WITH u SKIP {_skip} LIMIT {_limit}
                  MATCH (u)-[r:WRITES]->(m:Message) 
                  RETURN id(u) as src, type(r) as value, id(m) as dst
                  """

val graph = Neo4j(sc).nodes(nodesQuery, Map.empty).rels(relsQuery,Map.empty).partitions(3).batch(25).loadGraphFrame

A first thing you might notice is that the connector only works with Scala! Databricks, however, allows you to mix Python and Scala code, so we’ll still be able to do some graph analysis with Python.

To do this, we create a temporary view of the graph’s vertices and edges:

graph.vertices.createOrReplaceTempView("vertices")
graph.edges.createOrReplaceTempView("edges")

These view are shared across the Spark cluster and can be accessed by both Scala and Python. To demonstrate, I’ve built a simple graph visualization using Networkx:

%python
import networkx as nx

edges_df = spark.table("edges").select("*").toPandas()[0:25]
G= nx.from_pandas_edgelist(edges_df, source='src', target='dst',  edge_attr=None, create_using=None)
plot = nx.draw(G, with_labels=True, pos=nx.spring_layout(G))
display(plot)

configpng

That’s all there is to it! This should provide a helpful start to your graph analysis on Spark. If there’s anything missing here, or you have any follow-up questions, please reach out to me.

Happy coding!

Updated: