使用py2neo从pandas数据框创建Neo4j节点和关系

11

使用py2neo从Neo4j数据库执行cypher查询并在pandas dataframe上获取结果非常简单,代码如下:

>>> from pandas import DataFrame
>>> DataFrame(graph.data("MATCH (a:Person) RETURN a.name, a.born LIMIT 4"))
   a.born              a.name
0    1964        Keanu Reeves
1    1967    Carrie-Anne Moss
2    1961  Laurence Fishburne
3    1960        Hugo Weaving

现在我正在尝试使用 py2neo 将一个 Pandas 数据帧中的一组节点和关系创建(或更好地合并)到 Neo4j 数据库中。假设我有一个像下面这样的数据帧:

LABEL1 LABEL2
p1 n1
p2 n1
p3 n2
p4 n2

标签是列标题,属性是值。我想要复制以下Cypher查询(以第一行为例),对于数据框中的每一行都执行:

标签是列标题,属性是值。我想要复制以下Cypher查询(以第一行为例),对于数据框中的每一行都执行:

query="""
    MATCH (a:Label1 {property:p1))
    MERGE (a)-[r:R_TYPE]->(b:Label2 {property:n1))
"""

我知道可以直接告诉py2neo只运行graph.run(query),或者以同样的方式运行LOAD CSV Cypher脚本,但我想知道是否可以在py2neo中逐行迭代数据框并应用上述查询。

2个回答

14

你可以使用 DataFrame.iterrows() 迭代遍历DataFrame,对每一行执行一个查询,并将行中的值作为参数传递。

for index, row in df.iterrows():
    graph.run('''
      MATCH (a:Label1 {property:$label1})
      MERGE (a)-[r:R_TYPE]->(b:Label2 {property:$label2})
    ''', parameters = {'label1': row['label1'], 'label2': row['label2']})

这将为每一行执行一个事务。我们可以将多个查询批处理成一个事务,以获得更好的性能。

tx = graph.begin()
for index, row in df.iterrows():
    tx.evaluate('''
      MATCH (a:Label1 {property:$label1})
      MERGE (a)-[r:R_TYPE]->(b:Label2 {property:$label2})
    ''', parameters = {'label1': row['label1'], 'label2': row['label2']})
tx.commit()

通常我们可以在一个事务中批处理大约20,000个数据库操作。


我尝试做类似的事情。在tx.commit()时,代码失败了 - AttributeError: 'Graph' object has no attribute 'commit'。问题可能是什么? 以下是最初的导入行 - from py2neo import Graph, graph = Graph("bolt://localhost:7687", user="neo4j", password="xyz") - Syed Md Ismail
当您引用“tx”时,可能忘记了使用“.begin()”。 - Tonca

7

我发现提出的解决方案对我不起作用。即使节点已经存在,上面的代码仍然会创建新的节点。为了确保您不会创建任何重复项,在合并之前建议匹配ab节点:

tx = graph.begin()
for index, row in df.iterrows():
    tx.evaluate('''
       MATCH (a:Label1 {property:$label1}), (b:Label2 {property:$label2})
       MERGE (a)-[r:R_TYPE]->(b)
       ''', parameters = {'label1': row['label1'], 'label2': row['label2']})
tx.commit()

在我的情况下,我不得不同时添加关系属性(如下所示的代码)。 另外,我需要添加500k个以上的关系,因此我预计会遇到Java堆内存错误。 我通过将begin()commit()放置在循环内部来解决问题,因此每个新关系都会创建一个新的事务:

for index, row in df.iterrows():
    tx = graph.begin()
    tx.evaluate('''
       MATCH (a:Label1 {property:$label1}), (b:Label2 {property:$label2})
       MERGE (a)-[r:R_TYPE{property_name:$p}]->(b)
       ''', parameters = {'label1': row['label1'], 'label2': row['label2'], 'p': row['property']})
    tx.commit()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接