使用mongoengine和多进程 - 如何关闭mongoengine连接?

4
无论我尝试什么,都会遇到“MongoClient opened before fork”警告,这是关于在使用多进程处理 mongoengine 数据库时未分叉活动的 mongo 连接。 标准mongo建议似乎是只从子进程中连接数据库,但我认为我所做的应该在功能上相当,因为我在使用 multiprocessing 之前关闭了数据库,然而我仍然遇到问题。
相关问题要么没有最小示例,要么没有适用的解决方案,这些问题包括这里, 这里,以及特别针对 flask/celery 的情况,这里 最小示例代码如下:
from mongoengine import connect, Document, StringField, ListField, ReferenceField
from pathos.multiprocessing import ProcessingPool


class Base(Document):
    key = StringField(primary_key=True)
    name = StringField()
    parent = ReferenceField('Parent', required=True)

class Parent(Document):
    key = StringField(primary_key=True)
    name = StringField()
    bases = ListField(ReferenceField('Base'))


def remove_base(key):
   db = connect('mydb')
   mongo_b = Base.objects().get(key=key)
   mongo_b.parent.update(pull__bases=mongo_b)
   mongo_b.delete()


### setup
db = connect('mydb', connect=False)

Base(key='b1', name='test', parent='p1').save()
Base(key='b2', name='test', parent='p1').save()
Base(key='b3', name='test2', parent='p1').save()

p=Parent(key='p1', name='parent').save()
p.update(add_to_set__bases='b1')
p.update(add_to_set__bases='b2')
p.update(add_to_set__bases='b3')

### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects

# close db to avoid problems?!
db.close()
del db

# parallel map removing base objects and references from the db
# warning generated here
pp = ProcessingPool(2)
pp.map(remove_base, keys)
2个回答

2

好的,我已经明白了。Mongoengine在许多地方缓存到数据库连接。如果您手动删除它们,则问题将得到解决。添加以下导入即可:

from mongoengine import connection

然后加入:

connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}

Base._collection = None
Parent._collection = None

在“#关闭db”部分进行操作似乎可以解决问题。

完整代码:

from mongoengine import connect, Document, StringField, ListField, ReferenceField, connection
from pathos.multiprocessing import ProcessingPool


class Base(Document):
    key = StringField(primary_key=True)
    name = StringField()
    parent = ReferenceField('Parent', required=True)

class Parent(Document):
    key = StringField(primary_key=True)
    name = StringField()
    bases = ListField(ReferenceField('Base'))


def remove_base(key):
   db = connect('mydb', connect=False)
   mongo_b = Base.objects().get(key=key)
   mongo_b.parent.update(pull__bases=mongo_b)
   mongo_b.delete()

def setup():
    Base(key='b1', name='test', parent='p1').save()
    Base(key='b2', name='test', parent='p1').save()
    Base(key='b3', name='test2', parent='p1').save()

    p=Parent(key='p1', name='parent').save()
    p.update(add_to_set__bases='b1')
    p.update(add_to_set__bases='b2')
    p.update(add_to_set__bases='b3')

db = connect('mydb', connect=False)
setup()
### find objects we want to delete
my_base_objects = Base.objects(name='test')
keys = [b.key for b in my_base_objects]
del my_base_objects


### close db to avoid problems?!
db.close()
db = None

connection._connections = {}
connection._connection_settings ={}
connection._dbs = {}

Base._collection = None
Parent._collection = None

### parallel map removing base objects from the db
pp = ProcessingPool(2)
pp.map(remove_base, keys)

1
这一功能最近得到了改进,从MongoEngine>=0.18.0开始,应分别使用disconnect()disconnect_all()方法来断开1个或所有现有连接 (changelog 0.18.0)
请查看官方文档

@clyde,(mongoengine 贡献者在此)请接受这个答案,以便用户使用正确的方法进行断开连接。 - bagerard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接