There are already several questions on the aurelius mailing list as well as here on stackoverflow about specific problems with configuring Titan to get it working with Spark. But what is missing in my opinion is a high-level description of a simple setup that uses Titan and Spark.
What I am looking for is a somewhat minimal setup that uses recommended settings. For example for Cassandra, the replication factor should be 3 and a dedicated datacenter should be used for analytics.
From the information I found in the documentation of Spark, Titan, and Cassandra, such a minimal setup could look like this:
- Real-time processing DC: 3 Nodes with Titan + Cassandra (RF: 3)
- Analytics DC: 1 Spark master + 3 Spark slaves with Cassandra (RF: 3)
Some questions I have about that setup and Titan + Spark in general:
- Is that setup correct?
- Should Titan also be installed on the 3 Spark slave nodes and / or the Spark master?
- Is there another setup that you would use instead?
- Will the Spark slaves only read data from the analytics DC and ideally even from Cassandra on the same node?
Maybe someone can even share a config file that supports such a setup (or a better one).
So I just tried it out and set up a simple Spark cluster to work with Titan (and Cassandra as the storage backend) and here is what I came up with:
High-Level Overview
I just concentrate on the analytics side of the cluster here, so I let out the real-time processing nodes.
Spark consists of one (or more) master and multiple slaves (workers). Since the slaves do the actual processing, they need to access the data they work on. Therefore Cassandra is installed on the workers and holds the graph data from Titan.
Jobs are sent from Titan nodes to the spark master who distributes them to his workers. Therefore, Titan basically only communicates with the Spark master.
The HDFS is only needed because TinkerPop stores intermediate results in it. Note, that this changed in TinkerPop 3.2.0.
Installation
HDFS
I just followed a tutorial I found here. There are only two things to keep in mind here for Titan:
- Choose a compatible version, for Titan 1.0.0, this is 1.2.1.
- TaskTrackers and JobTrackers from Hadoop are not needed, as we only want the HDFS and not MapReduce.
Spark
Again, the version has to be compatible, which is also 1.2.1 for Titan 1.0.0. Installation basically means extracting the archive with a compiled version. In the end, you can configure Spark to use your HDFS by exporting the HADOOP_CONF_DIR
which should point to the conf directory of Hadoop.
Configuration of Titan
You also need a HADOOP_CONF_DIR
on the Titan node from which you want to start OLAP jobs. It needs to contain a core-site.xml
file that specifies the NameNode:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://COORDINATOR:54310</value>
<description>The name of the default file system. A URI whose
scheme and authority determine the FileSystem implementation. The
uri's scheme determines the config property (fs.SCHEME.impl) naming
the FileSystem implementation class. The uri's authority is used to
determine the host, port, etc. for a filesystem.</description>
</property>
</configuration>
Add the HADOOP_CONF_DIR
to your CLASSPATH
and TinkerPop should be able to access the HDFS. The TinkerPop documentation contains more information about that and how to check whether HDFS is configured correctly.
Finally, a config file that worked for me:
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
#
# Titan Cassandra InputFormat configuration
#
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=WORKER1,WORKER2,WORKER3
titanmr.ioformat.conf.storage.port=9160
titanmr.ioformat.conf.storage.keyspace=titan
titanmr.ioformat.cf-name=edgestore
#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
cassandra.input.keyspace=titan
cassandra.input.predicate=0c00020b0001000000000b000200000000020003000800047fffffff0000
cassandra.input.columnfamily=edgestore
cassandra.range.batch.size=2147483647
#
# SparkGraphComputer Configuration
#
spark.master=spark://COORDINATOR:7077
spark.serializer=org.apache.spark.serializer.KryoSerializer
Answers
This leads to the following answers:
Is that setup correct?
It seems to be. At least it works with this setup.
Should Titan also be installed on the 3 Spark slave nodes and / or the Spark master?
Since it isn't required, I wouldn't do that as I prefer a separation of Spark and Titan servers which the user can access.
Is there another setup that you would use instead?
I would be happy to hear from someone else who has a different setup.
Will the Spark slaves only read data from the analytics DC and ideally even from Cassandra on the same node?
Since the Cassandra nodes (from the analytics DC) are explicitly configured, the Spark slaves shouldn't be able to pull data from completely different nodes. But I am still not sure about the second part. Maybe someone else can provide more insight here?