使用PyMongo将Pandas数据帧插入到mongodb中

62

使用 PyMongo 将 pandas DataFrame 最快地插入到 mongodb 中的方法是什么?

尝试

db.myCollection.insert(df.to_dict())

出现错误

InvalidDocument: 文档必须只有字符串键,该键为Timestamp('2013-11-23 13:31:00', tz=None)


 db.myCollection.insert(df.to_json())

出现了错误

TypeError: 'str'对象不支持项目分配


 db.myCollection.insert({id: df.to_json()})

出现了错误

InvalidDocument: 文档必须只有字符串键,该键是 <内置函数id>


df

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 150 entries, 2013-11-23 13:31:26 to 2013-11-23 13:24:07
Data columns (total 3 columns):
amount    150  non-null values
price     150  non-null values
tid       150  non-null values
dtypes: float64(2), int64(1)

1
你之后想做什么?你想每个记录一个文档还是每个数据框架一个文档? - alko
每个Mongo记录都将具有字段dateamountpricetidtid应该是一个唯一的字段。 - Nyxynyx
2
你可以通过以下方式将数据框转换为字典列表:records = json.loads(df.to_json(orient='records')),结果将会是这样的:[{'c1': 1, 'c2': 1},{'c1': 2, 'c2': 2},{'c1': 3, 'c2': 3}],然后只需使用 db.coll.insert_many(records) 即可。另外,使用 df.to_dict('recoreds') 可能会遇到 Type error 的问题。 - Ferris
13个回答

76

这是最快的方法。使用pymongo 3中的 insert_many 方法和 pandas 中 to_dict 方法的'records'参数。

db.collection.insert_many(df.to_dict('records'))

2
这是我认为最好的想法,尽管我不认为语法适用于原始用例。基本问题在于mongo需要字符串键,而您的df具有时间戳索引。您需要使用传递给to_dict()的参数,使mongo中的键成为日期以外的其他内容。我经常遇到的一个用例是,您实际上希望df中的每一行都是具有额外“日期”字段的记录。 - Marshall Farrier
你应该更正代码片段以包含集合。 - hui chen
这个并没有保留任何数据类型,对吧?例如 {'numfield': NumberLong("16797951")} - NealWalters

48

我怀疑没有既快速简单的方法。如果您不担心数据转换,可以进行以下操作:

>>> import json
>>> df = pd.DataFrame.from_dict({'A': {1: datetime.datetime.now()}})
>>> df
                           A
1 2013-11-23 21:14:34.118531

>>> records = json.loads(df.T.to_json()).values()
>>> db.myCollection.insert(records)

但是如果你试图将数据加载回来,你会得到以下提示:

>>> df = read_mongo(db, 'myCollection')
>>> df
                     A
0  1385241274118531000
>>> df.dtypes
A    int64
dtype: object

所以你需要将列'A'转换回datetime类型,并将DataFrame中所有不是intfloatstr的字段进行转换。以这个示例为例:

>>> df['A'] = pd.to_datetime(df['A'])
>>> df
                           A
0 2013-11-23 21:14:34.118531

10
db.myCollection.insert(records) 应该替换为 db.myCollection.insert_many(records),参考警告信息://anaconda/bin/ipython:1: DeprecationWarning: insert is deprecated. Use insert_one or insert_many instead. #!/bin/bash //anaconda/bin/python.app。请注意,不要改变原意,只需使翻译更加通俗易懂。 - Femto Trader

9

odo 可以使用它来完成

odo(df, db.myCollection)

2
我确实喜欢 odo,但是当Mongo uri具有非alpha用户名、密码时,它会失败得很惨。除了使用未经身份验证的mongo之外,我不会推荐它用于任何其他目的。 - armundle
1
我认为odo的开发最近已经停滞或延迟了,大约是在2019年左右。 - wordsforthewise

5

我认为这个问题中有很棒的想法。在我的情况下,我花费了更多时间来处理大型数据框的移动。在这种情况下,pandas倾向于允许您使用chunksize选项(例如在pandas.DataFrame.to_sql中)。因此,我认为我可以通过添加我在这个方向上使用的函数来做出贡献。

def write_df_to_mongoDB(  my_df,\
                          database_name = 'mydatabasename' ,\
                          collection_name = 'mycollectionname',
                          server = 'localhost',\
                          mongodb_port = 27017,\
                          chunk_size = 100):
    #"""
    #This function take a list and create a collection in MongoDB (you should
    #provide the database name, collection, port to connect to the remoete database,
    #server of the remote database, local port to tunnel to the other machine)
    #
    #---------------------------------------------------------------------------
    #Parameters / Input
    #    my_list: the list to send to MongoDB
    #    database_name:  database name
    #
    #    collection_name: collection name (to create)
    #    server: the server of where the MongoDB database is hosted
    #        Example: server = 'XXX.XXX.XX.XX'
    #    this_machine_port: local machine port.
    #        For example: this_machine_port = '27017'
    #    remote_port: the port where the database is operating
    #        For example: remote_port = '27017'
    #    chunk_size: The number of items of the list that will be send at the
    #        some time to the database. Default is 100.
    #
    #Output
    #    When finished will print "Done"
    #----------------------------------------------------------------------------
    #FUTURE modifications.
    #1. Write to SQL
    #2. Write to csv
    #----------------------------------------------------------------------------
    #30/11/2017: Rafael Valero-Fernandez. Documentation
    #"""



    #To connect
    # import os
    # import pandas as pd
    # import pymongo
    # from pymongo import MongoClient

    client = MongoClient('localhost',int(mongodb_port))
    db = client[database_name]
    collection = db[collection_name]
    # To write
    collection.delete_many({})  # Destroy the collection
    #aux_df=aux_df.drop_duplicates(subset=None, keep='last') # To avoid repetitions
    my_list = my_df.to_dict('records')
    l =  len(my_list)
    ran = range(l)
    steps=ran[chunk_size::chunk_size]
    steps.extend([l])

    # Inser chunks of the dataframe
    i = 0
    for j in steps:
        print j
        collection.insert_many(my_list[i:j]) # fill de collection
        i = j

    print('Done')
    return

这真的很有用,谢谢。您可能需要使用当前输入更新Args(输入)部分。 - ximiki
属性错误:'range'对象没有'extend'属性。 - Aakash Basu

4
如果你的数据框中存在缺失数据(即 None、nan),且你不想在文档中出现空键值:db.insert_many(df.dropna().to_dict(orient="records")) 将会插入空键值。如果你不想在文档中出现空键值,你可以使用修改版的 pandas .to_dict(orient="records") 代码如下:
from pandas.core.common import _maybe_box_datetimelike
my_list = [dict((k, _maybe_box_datetimelike(v)) for k, v in zip(df.columns, row) if v != None and v == v) for row in df.values]
db.insert_many(my_list)

在这里,我添加了检查以确保值不是Nonenan,然后再将其放入行字典中,代码为if v != None and v == v。现在您的.insert_many将只包括文档中具有值的键(而没有null数据类型)。


这是一个很好的方法,因为当上传dataframe到mongodb时需要处理空值,而且这种方法比“DataFrame.to_dict()”更快。顺便说一下,使用 columns = list(df.columns) 然后 {k: _maybe_box_datetimelike(v) for k, v in zip(columns, row) if v != None and v == v} for row in df.values] 更快。 - Woods Chen

4
我使用以下代码将数据框架插入到数据库中的集合中。
df.reset_index(inplace=True)
data_dict = df.to_dict("records")
myCollection.insert_many(data_dict)

问题是,如果我的data_dict有100万行数据,是更好的做一个单独的insertMany呢?还是应该分批进行insertMany? - lesolorzanov

2
这个怎么样:
db.myCollection.insert({id: df.to_json()})

id将是该df的唯一字符串标识符。


谢谢,我收到了错误信息 InvalidDocument: documents must have only string keys, key was <built-in function id> - Nyxynyx
你必须自己生成那个 ID。 - PasteBT
这个id和mongo文档中的通常_.id一样吗?如果是的话,它看起来像是一个随机哈希值,我该如何生成它呢? - Nyxynyx
对于@Nyxynyx而言,它会失败,因为id是Python中的内置函数,不建议覆盖。您可以通过使用id(df)生成一个简单的测试ID,但由于对象ID在会话之间不是持久的,这可能会根据您的使用方式导致问题。虽然用于测试是有效的。 - erb
我遇到了“达到最大递归层数”的错误。通过使用sys.setrecursionlimit(1000000)进行修复。 - Gabriel Fair

1

只需创建字符串键!

import json
dfData = json.dumps(df.to_dict('records'))
savaData = {'_id': 'a8e42ed79f9dae1cefe8781760231ec0', 'df': dfData}
res = client.insert_one(savaData)

##### load dfData
data = client.find_one({'_id': 'a8e42ed79f9dae1cefe8781760231ec0'}).get('df')
dfData = json.loads(data)
df = pd.DataFrame.from_dict(dfData)

1
如果您想一次发送多个,请使用以下方法:
db.myCollection.insert_many(df.apply(lambda x: x.to_dict(), axis=1).to_list())

0

对于upsert操作,这个方法可行。

for r in df2.to_dict(orient="records"):
    db['utest-pd'].update_one({'a':r['a']},{'$set':r})

它一次只处理一条记录,但似乎 upsert_many 不能处理不同记录的多个过滤值。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接