Load Spark RDD to Neo4j in Python

2019-04-17 12:18发布

I am working on a project where I am using Spark for Data processing. My data is now processed and I need to load the data into Neo4j. After loading into Neo4j, I will be using that to showcase the results.

I wanted all the implementation to de done in Python Programming. But I could't find any library or example on net. Can you please help with links or the libraries or any example.

My RDD is a PairedRDD. And in every tuple, I have to create a relationship.
PairedRDD

Key   Value
Jack  [a,b,c]

For simplicity purpose, I transformed the RDD to

 Key  value
 Jack  a
 Jack  b
 Jack  c

Then I have to create relationships between

 Jack->a    
 Jack->b
 Jack->c

Based on William Answer, I am able to load a list directly. But this data is throwing the cypher error.

I tried like this:

 def writeBatch(b):
    print("writing batch of " + str(len(b)))
    session = driver.session()
    session.run('UNWIND {batch} AS elt MERGE (n:user1 {user: elt[0]})', {'batch': b})
    session.close()

def write2neo(v):
    batch_d.append(v)
    for hobby in v[1]:
        batch_d.append([v[0],hobby])

    global processed
    processed += 1
    if len(batch) >= 500 or processed >= max:
        writeBatch(batch)
        batch[:] = []


max = userhobbies.count()
userhobbies.foreach(write2neo)

b is the list of lists. Unwinded elt is a list of two elements elt[0],elt[1] as key and values.

Error

ValueError: Structure signature must be a single byte value

Thanks Advance.

1条回答
劫难
2楼-- · 2019-04-17 12:55

You can do a foreach on your RDD, example :

from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("",""), encrypted=False)
from pyspark import SparkContext

sc = SparkContext()
dt = sc.parallelize(range(1, 5))

def write2neo(v):
    session = driver.session()
    session.run("CREATE (n:Node {value: {v} })", {'v': v})
    session.close()


dt.foreach(write2neo)

I would however improve the function to batch the writes, but this simple snippet is working for basic implementation

UPDATE WITH EXAMPLE OF BATCHING WRITES

sc = SparkContext()
batch = []
max = None
processed = 0

def writeBatch(b):
    print("writing batch of " + str(len(b)))
    session = driver.session()
    session.run('UNWIND {batch} AS elt CREATE (n:Node {v: elt})', {'batch': b})
    session.close()

def write2neo(v):
    batch.append(v)
    global processed
    processed += 1
    if len(batch) >= 500 or processed >= max:
        writeBatch(batch)
        batch[:] = []

dt = sc.parallelize(range(1, 2136))
max = dt.count()
dt.foreach(write2neo)

- Which results with

16/09/15 12:25:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
writing batch of 500
writing batch of 500
writing batch of 500
writing batch of 500
writing batch of 135
16/09/15 12:25:47 INFO PythonRunner: Times: total = 279, boot = -103, init = 245, finish = 137
16/09/15 12:25:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1301 bytes result sent to driver
16/09/15 12:25:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 294 ms on localhost (1/1)
16/09/15 12:25:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/09/15 12:25:47 INFO DAGScheduler: ResultStage 1 (foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36) finished in 0.295 s
16/09/15 12:25:47 INFO DAGScheduler: Job 1 finished: foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36, took 0.308263 s
查看更多
登录 后发表回答