使用上下文管理器为contextvars.Context提供上下文。

4

我正在尝试在我的数据库框架中管理事务(我使用基于 pymongo 的 umongo 连接 MongoDB)。

要使用事务,必须沿着整个调用链传递一个 session 关键字参数。我想提供一个上下文管理器来隔离事务。只有在调用链末尾的函数需要知道 session 对象。

我了解到了上下文变量并且已经接近一些内容,但还没有完全理解。

我想要的是:

with Transaction():
    # Do stuff
    d = MyDocument.find_one()
    d.attr = 12
    d.commit()

这是我目前为止想到的:

s = ContextVar('session', default=None)

class Transaction(AbstractContextManager):

    def __init__(self):
        self.ctx = copy_context()
        # Create a new DB session
        session = db.create_session()
        # Set session in context
        self.ctx.run(s.set, session)

    def __exit__(self, *args, **kwargs):
        pass

    # Adding a run method for convenience
    def run(self, func, *args, **kwargs):
        self.ctx.run(func, *args, **kwargs)

def func():
    d = MyDocument.find_one()
    d.attr = 12
    d.commit()

with Transaction() as t:
    t.run(func)

但是我没有漂亮的上下文管理器语法。上下文管理器的重点在于说“在那个上下文中运行放置在其中的所有内容”。

我上面所写的并不比仅仅使用函数更好:

def run_transaction(func, *args, **kwargs):
    ctx = copy_context()
    session = 12
    ctx.run(s.set, session)
    ctx.run(func)

run_transaction(func)

我是否走错了方向?

我是否误用了上下文变量?

还有其他实现我所需求的方法吗?


基本上,我想要像上下文管理器一样打开一个上下文。

session = ContextVar('session', default=None)

with copy_context() as ctx:
    session = db.create_session()
    # Do stuff
    d = MyDocument.find_one()
    d.attr = 12
    d.commit()

我会将此嵌入到一个Transaction上下文管理器中,以管理会话内容并仅在用户代码中保留对d的操作。


Jerome,你有时间测试这个了吗? 你采用了什么最终方法? - Haidar Zeineddine
我从未深入研究过,但我认为被接受的答案是正确(而且不错)的方式。 - Jérôme
1
我向CPython打开了一个拉取请求,以添加上下文管理器支持:https://github.com/python/cpython/pull/99634 - Richard Hansen
1个回答

4
你可以使用contextmanager来创建会话和事务,并将会话存储在ContextVar中,以便其他函数使用。

from contextlib import contextmanager
from contextvars import ContextVar
import argparse
import pymongo


SESSION = ContextVar("session", default=None)


@contextmanager
def transaction(client):
    with client.start_session() as session:
        with session.start_transaction():
            t = SESSION.set(session)
            try:
                yield
            finally:
                SESSION.reset(t)


def insert1(client):
    client.test.txtest1.insert_one({"data": "insert1"}, session=SESSION.get())


def insert2(client):
    client.test.txtest2.insert_one({"data": "insert2"}, session=SESSION.get())


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--url", default="mongodb://localhost:27017")
    args = parser.parse_args()

    client = pymongo.MongoClient(args.url)

    # Create and lear collections, collections must be created outside the transaction
    insert1(client)
    client.test.txtest1.delete_many({})
    insert2(client)
    client.test.txtest2.delete_many({})

    with transaction(client):
        insert1(client)
        insert2(client)

    for doc in client.test.txtest1.find({}):
        print(doc)
    for doc in client.test.txtest2.find({}):
        print(doc)


if __name__ == "__main__":
    main()

1
你难道不需要在某个时候使用 copy_contextrun 吗? - Jérôme
不,除非你正在进行一些高级操作并手动管理所有这些。 asynciofutures等都会在内部根据需要使用copy_contextrun,因此您不需要自己处理。 - rectalogic
1
我想我可能想太多了。一旦我有时间尝试,我会回来点赞/接受答案的。谢谢。 - Jérôme
我没有时间尝试它,但现在阅读它并更好地理解它,它完全有道理。接受这个答案。谢谢。 - Jérôme

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接