Batch loading neo4j

2019-09-02 18:25发布

I am batch loading a neo4j graph using py2neo using this script:

batch = neo4j.WriteBatch(graph)
counter = 0
for each in ans:
    n1 = graph.merge_one("Page", "url", each[0])
#     batch.create(n1)
    counter +=1
    for linkvalue in each[6]:
        try:
            text,link = linkvalue.split('!__!')
            n2 = graph.merge_one("Page", "url", link)
#             batch.create(n2)
            counter+=1
            rel = Relationship(n1,'LINKS',n2, anchor_text=text)
            batch.create(rel)

        except (KeyboardInterrupt, SystemExit):
            print 'fail'
            raise

    if counter > 900:
        counter = 0
        batch.submit()
        print 'submit'
        batch = neo4j.WriteBatch(graph)

The merge_one's both make a call to the graph, which I believe is slowing down my algorithm. I commented out the batch.create() because they were recreating the nodes. Is there a way to do this function but save it until I batch.submit() to speed up the process?

I am handling about 50,000 nodes and 1,000,000 relationships.

标签: neo4j py2neo
1条回答
太酷不给撩
2楼-- · 2019-09-02 18:39

You need to append statements to the WriteBatch and then run the batch once it reaches some number of statements.

Here's an example:

import json
from py2neo.neo4j import CypherQuery, GraphDatabaseService, WriteBatch
from py2neo import neo4j

db = neo4j.GraphDatabaseService()

business_index_query = CypherQuery(db, "CREATE INDEX ON :Business(id)")
business_index_query.execute()

category_index_query = CypherQuery(db, "CREATE INDEX ON :Category(name)")
category_index_query.execute()

create_business_query = '''
    CREATE (b:Business {id: {business_id}, name: {name}, lat:{latitude}, 
    lon:{longitude}, stars: {stars}, review_count: {review_count}})
'''

merge_category_query = '''
    MATCH (b:Business {id: {business_id}})
    MERGE (c:Category {name: {category}})
    CREATE UNIQUE (c)<-[:IS_IN]-(b)
'''

print "Beginning business batch"
with open('data/yelp_academic_dataset_business.json', 'r') as f:
    business_batch = WriteBatch(db)
    count = 0
    for b in (json.loads(l) for l in f):
        business_batch.append_cypher(create_business_query, b)
        count += 1
        if count >= 10000:
            business_batch.run()
            business_batch.clear()
            count = 0
    if count > 0:
       business_batch.run()

print "Beginning category batch"
with open('data/yelp_academic_dataset_business.json', 'r') as f:
    category_batch = WriteBatch(db)
    count = 0
    for b in (json.loads(l) for l in f):
        for c in b['categories']:
            category_batch.append_cypher(merge_category_query, {'business_id': b['business_id'], 'category': c})
            count += 1
            if count >= 10000:
                category_batch.run()
                category_batch.clear()
                count = 0
    if count > 0:
        category_batch.run()

Note that this example uses only Cypher statements and appends each statement to the WriteBatch. Also this example is using two different WriteBatch instances.

查看更多
登录 后发表回答