Spark on JanusGraph (TinkerPop): A PageRank example

Boxuan Li
4 min readSep 4, 2023

--

Pageranks are for webpages, but can we compute pageranks for academic papers based on their citation relationships? In this post, we will go through an end-to-end example and run PageRank algorithm on DBLP citation network dataset using JanusGraph 1.0.0. JanusGraph is an open-source, distributed graph database. Analytical queries like pagerank computation and community detection could be run via its Spark engine. We will be using Spark in a local setup, but if you would like to deploy it on a cluster, make sure you read the previous post run analytical queries on JanusGraph 0.6.0 using Spark.

PageRank is perhaps the most popular OLAP-oriented graph algorithm, developed by Brin and Page of Google. PageRank defines a centrality value for all vertices in the graph, where centrality is defined recursively where a vertex is central if it is connected to central vertices. In a citation network, a higher pagerank usually means higher popularity.

We will be using a DBLP dataset which contains ~5 million papers, and ~32 million citation relationships. The dataset and data loader script is available here. We model the papers as vertices and citation relationships as edges.

For demo purpose, we only deploy JanusGraph on our local machine using a single Cassandra instance. To run the pagerank program (or any other analytical queries via Spark), we don’t need to launch JanusGraph server but the Cassandra instance. TinkerPop, the graph computation framework that JanusGraph is built based on, has PageRankVertexProgram built in. To start, let’s create a Java project with maven and introduce necessary dependencies:

<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>spark-gremlin</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>cassandra-hadoop-util</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.janusgraph</groupId>
<artifactId>janusgraph-hadoop</artifactId>
<version>1.0.0</version>
</dependency>

Now let’s create a Utils class to store some common Spark-related configs:

package io.citegraph.data.spark;

import org.apache.commons.configuration2.BaseConfiguration;
import org.apache.commons.configuration2.Configuration;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.janusgraph.hadoop.formats.cql.CqlInputFormat;

public class Utils {
public static Configuration getSparkGraphConfig() {
org.apache.commons.configuration2.Configuration sparkGraphConfiguration = new BaseConfiguration();
sparkGraphConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, CqlInputFormat.class.getCanonicalName());
sparkGraphConfiguration.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql");
sparkGraphConfiguration.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1");
sparkGraphConfiguration.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042);
sparkGraphConfiguration.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "janusgraph");
sparkGraphConfiguration.setProperty("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner");
sparkGraphConfiguration.setProperty("gremlin.graph", HadoopGraph.class.getCanonicalName());
sparkGraphConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[*]");
sparkGraphConfiguration.setProperty("spark.local.dir", "/Users/admin/workspace/tmp");
return sparkGraphConfiguration;
}
}

Please change your hostname and port for your Cassandra instance accordingly. This Utils class can be useful for all Spark analytical programs running on your graph.

Now it comes to the core part — use PageRankVertexProgram to compute the page ranks for each paper in our graph database! We persist the output to the disk using GraphSON format, which can be understood by not just JanusGraph, but also other gremlin-enabled graph databases too. Let’s create a PageRankRunner class:

package io.citegraph.data.spark.loader;

import org.apache.commons.configuration2.Configuration;
import org.apache.spark.launcher.SparkLauncher;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;

import static io.citegraph.data.spark.Utils.getSparkGraphConfig;
import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_HADOOP_GRAPH_WRITER;
import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_HADOOP_OUTPUT_LOCATION;

/**
* This Spark application runs page rank algorithm to calculate
* a score for each paper
*/
public class PageRankRunner {
public static void main(String[] args) throws Exception {
Configuration sparkGraphConfiguration = getSparkGraphConfig();
sparkGraphConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_AND_DISK");
sparkGraphConfiguration.setProperty(GREMLIN_HADOOP_GRAPH_WRITER, GraphSONOutputFormat.class.getCanonicalName());
sparkGraphConfiguration.setProperty(GREMLIN_HADOOP_OUTPUT_LOCATION, "/Users/admin/workspace/sparkgraph/");
sparkGraphConfiguration.setProperty(SparkLauncher.EXECUTOR_MEMORY, "1g");
Graph graph = GraphFactory.open(sparkGraphConfiguration);

long startTime = System.currentTimeMillis();
ComputerResult result = graph.compute(SparkGraphComputer.class)
.vertices(__.has("type", "paper"))
.edges(__.bothE("cites"))
.vertexProperties(__.properties("dummy"))
.persist(GraphComputer.Persist.VERTEX_PROPERTIES)
.program(PageRankVertexProgram.build()
.edges(__.outE("cites").asAdmin())
.property("pagerank")
.iterations(100)
.create())
.submit()
.get();
long duration = (System.currentTimeMillis() - startTime) / 1000;
System.out.println("finished PageRank computation, elapsed time = " + duration + " seconds.");
graph.close();
}
}

In the above example, we only care about “paper” vertices, so we filter the graph by .vertices(__.has("type", "paper")) . Similarly, we only need cites edges (thus .edges(__.bothE("cites")) ) and don’t need any vertex property (thus .vertexProperties(__.properties("dummy")) ). We define the page ranks outflows from one paper to all papers that it cites (thus PageRankVertexProgram.build().edges(__.outE("cites").asAdmin()) ). Finally, we would like to run the program for at most 100 iterations or until it converges.

It takes a few hours to finish computation on my local laptop, but it would be much faster have it run on a cluster. If we want to load the computed pageranks to JanusGraph, we can use this GraphSONVertexPropertyLoader program. Furthermore, we could either use the same way to compute pageranks for authors, or define one’s pagerank as the sum of pageranks of all papers they write.

If we visualize one’s citation network and determine a node’s size using its pagerank value, then Geoffrey Hinton’s academic community looks like this:

CiteGraph: visualization of Geoffrey Hinton’s academic community

The above visualization can be found on this open-source online citation network visualizer, powered by JanusGraph. By the way, the academic community is computed by this community detection runner.

Finally, when you run vertex programs using Spark, you might find your local disk usage grows quickly and might even witness out of disk space errors. This is because TinkerPop’s Spark graph computation follows Bulk Synchronous Parallel (BSP) paradigm, and each iteration involves a shuffle in Spark. Due to Spark’s design philosophy, intermediate data for each shuffle is not purged until the end of the application for fast recovery. Since vertex programs tend to have long lineage (e.g. 100 iterations means 100 shuffles), running a vertex program might take a lot of disk space. A small tip is to have a cronjob periodically (say, every hour) purging the shuffle data:

0 * * * * /usr/bin/find <your path to Spark tmp data> -type f ! -name ‘*rdd*’ -cmin +60 -exec rm {} \;

Although we used JanusGraph as an example, the same approach can be used at other gremlin-enabled graph databases too — as long as that graph database supports gremlin-spark module. Of course, you could also develop a graph algorithm like PageRankVertexProgram by yourself. Have fun with big data!

--

--

Boxuan Li
Boxuan Li

Written by Boxuan Li

Software Engineer at Microsoft & Open-source Enthusiast https://github.com/li-boxuan

Responses (1)