运行时错误: 事件循环已关闭 - motor, asyncio

4

我无法运行这个测试,总是出现相同的错误RuntimeError: Event loop is closed

我需要在这段代码中添加什么?

from motor.motor_asyncio import AsyncIOMotorClient
import pytest
import asyncio

client = AsyncIOMotorClient("mongodb://mongo:mongo@192.168.0.11:27017/admin?retryWrites=false")
db = client['app']
aux = db['users']

async def create_user_db(a: dict):
    x = await aux.insert_one(a)
    return x

@pytest.mark.asyncio
async def test_create():
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(form)
    assert res != None

这是错误信息

在此输入图像描述


请查看此答案 - Felipe Buccioni
1个回答

2
在你的例子中,你是在“import”时间打开数据库,但此时我们还没有事件循环。当测试用例运行时,事件循环会被创建。
你可以将数据库定义为fixture,并将其提供给测试函数,例如:
@pytest.fixture
def client():
    return AsyncIOMotorClient("mongodb://localhost:27017/")


@pytest.fixture
def db(client):
    return client['test']


@pytest.fixture
def collection(db):
    return db['test']


async def create_user_db(collection, a: dict):
    x = await collection.insert_one(a)
    return x



@pytest.mark.asyncio
async def test_create(collection):
    form = {'username': 'c3', 'password': 'c3'}
    res = await create_user_db(collection, form)
    assert res != None

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接