使用PyMongo在MongoDB中进行批量插入/更新

9
我该如何使用pymongo/pandas批量更新/插入mongoDb? 我收到的错误为“batch op errors occurred”。 我认为是因为我设置了“_id”,但我想这么做。我的代码第一次运行正常,但第二次运行失败。我想在工作流程中使用pandas。数据包含一个日期时间对象。
使用“upsert = True”和“Update”的语法完全不同。希望能提供一个有效的解决方案,其中可以设置“_id”或“qid”。但是,有Python日期时间对象!
InSQL   = 'SELECT * from  database2.table2 '
sqlOut  = pd.read_sql(InSQL,cxn)
sqlOut['_id'] = "20170101" + ":"+ sqlOut['Var']   

dfOut   = sqlOut.to_json(orient='records',date_format='iso' )
try:
    db["test"].insert_many(json.loads(dfOut))
except Exception as e:  print e

我曾经发布了一个50pt的悬赏,但已经过期并且没有答案。嗯...


Mongo使用string base input,不允许插入任何变量(仅限操作符)请查看此链接json.loads(dfOut)无法将本地变量作为条目插入!主键为YYYYmmDDHHMMSS+计数器值 - dsgdfg
您可以深入了解BulkWriteError的详细信息,更好地了解发生了什么。可能是您个人定义的ID重复或违反了12字节限制。有关详细信息,请参见https://dev59.com/JF0a5IYBdhLWcg3waYGM。 - conner.xyz
_id 第一次起作用的事实告诉我,12 字节限制不是问题所在。是的,_id 或 gid 是重复的,并且需要进行更新。 - Merlin
听起来你的目标是要进行upsert,但当前的示例代码正在进行插入(因此预计连续运行将失败并出现重复的_id键)。你能否更新一下dfOut内容和您尝试的upsert代码的简短示例?另外,您使用的PyMongo版本是什么?我怀疑您实际上想使用带有设置upsert选项的update_many()。您还可以澄清Python日期时间对象和_idqid之间的关注点吗?您是否想将qid用作_id - Stennie
2个回答

7
您之所以会出现错误,是因为在第二次及后续的insert_many调用中,您尝试插入具有与现有文档字段冲突的文档。您推断出这可能是由于您显式设置了_id,这将与集合中现有的_id值发生冲突。
MongoDB 自动创建一个唯一索引来禁止重复值。
在第一次调用(插入文档的初始版本)之后的调用中,您需要更新或替换您的文档。确实有一个“upsert”的概念,它将负责插入集合中不存在的文档以及更新现有文档。
您的选项:
  • Most efficient: pymongo.collection.Collection.bulk_write

    import pymongo
    
    operations = [pymongo.operations.ReplaceOne(
        filter={"_id": doc["_id"]}, 
        replacement=doc,
        upsert=True
        ) for doc in json.loads(dfOut)]
    
    result = db["test"].bulk_write(operations)
    # handle results
    
请注意,其效率还取决于该字段是否在集合中索引,顺便提一下,_id是被索引的。(另请参见pymongo.operations.ReplaceOne) 注意:pymongo.collection.Collection.update_many 似乎不适合您的需求,因为您不是要在给定筛选器的所有匹配项上设置相同的值。

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接