如何在数据库中使“龙卷风请求”具有原子性

4
我有一个使用Tornado异步框架编写的Python应用。当收到HTTP请求时,将调用此方法:
@classmethod
def my_method(cls, my_arg1):

    # Do some Database Transaction #1
    x = get_val_from_db_table1(id=1, 'x')
    y = get_val_from_db_table2(id=7, 'y')
    x += x + (2 * y) 

    # Do some Database Transaction #2
    set_val_in_db_table1(id=1, 'x', x)

    return True

三个数据库操作之间是相互关联的。由于这是一个并发应用程序,因此可能会同时发生多个这样的HTTP调用,并同时访问同一个数据库。
为了确保数据完整性,重要的是在该方法中调用这三个数据库操作时,没有其他进程在这些数据库行之间进行读写。
我该如何确保这个方法具有数据库的原子性?Tornado有这方面的装饰器吗?
2个回答

3

同步数据库访问

您没有说明如何访问数据库。如果您在get_val_from_db_table1和相关函数(例如使用pymysql)中具有同步的DB访问,并且my_method是阻塞的(不返回控制到IO循环),则会阻止服务器(这对服务器的性能和响应能力有影响),但有效地序列化客户端,只有一个可以执行my_method。因此,在数据一致性方面,您不需要做任何事情,但通常这是一种不良设计。在短期内,您可以通过@xyres的解决方案解决这两个问题(代价是记住线程安全问题,因为大多数Tornado功能不是线程安全的)。

异步数据库访问

如果在 `get_val_from_db_table1` 等函数中使用了异步的数据库访问(例如使用 tornado-mysql),那么可以使用 tornado.locks.Lock。以下是一个示例:
from tornado import web, gen, locks, ioloop


_lock = locks.Lock()

def synchronised(coro):
    async def wrapper(*args, **kwargs):  
        async with _lock:
            return await coro(*args, **kwargs)

    return wrapper


class MainHandler(web.RequestHandler):

    async def get(self):
        result = await self.my_method('foo')
        self.write(result)

    @classmethod
    @synchronised
    async def my_method(cls, arg):
        # db access
        await gen.sleep(0.5)
        return 'data set for {}'.format(arg)


if __name__ == '__main__':
    app = web.Application([('/', MainHandler)])
    app.listen(8080)
    ioloop.IOLoop.current().start()

请注意,上述内容是关于普通的单进程 Tornado 应用程序所说的。如果您使用 tornado.process.fork_processes,那么您只能使用 multiprocessing.Lock

2

由于您想依次运行这三个数据库操作,函数my_method必须是非异步的

但这也意味着my_method将阻塞服务器。您肯定不希望出现这种情况。我能想到的一种方法是在另一个线程中运行此函数。这不会阻塞服务器,并且在操作运行时将继续接受新请求。而且,由于它将是非异步的,因此db原子性得到保证。

以下是相关代码,可供您开始使用:

import concurrent.futures

executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
# Don't set `max_workers` more than 1, because then multiple 
# threads will be able to perform db operations

class MyHandler(...):
    @gen.coroutine
    def get(self):

        yield executor.submit(MyHandler.my_method, my_arg1)
        # above, `yield` is used to wait for 
        # db operations to finish
        # if you don't want to wait and return
        # a response immediately remove the 
        # `yield` keyword

        self.write('Done')

    @classmethod
    def my_method(cls, my_arg1):
        # do db stuff ...
        return True

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接