使用asyncpg连接池与Sanic一起工作

4

我整理了网上的几个例子,得出了这个代码。这段代码可以运行,但我想知道有没有更好的方法来做到这一点?

user.py:


from asyncpg import create_pool
from sanic import Blueprint

bp = Blueprint('dp')

class pg:
    def __init__(self, pg_pool):
        self.pg_pool = pg_pool

    async def fetch(self, sql, *args, **kwargs):
        async with self.pg_pool.acquire() as connection:
            return await connection.fetch(sql, *args, **kwargs)

    async def execute(self, sql, *args, **kwargs):
        async with self.pg_pool.acquire() as connection:
            return await connection.execute(sql, *args, **kwargs)

@bp.listener('before_server_start')
async def init_pg(app, loop):
    """
    Init Postgresql DB.
    """
    bp.pg_pool = await create_pool(
        **app.config.PG_CFG,
        max_inactive_connection_lifetime=60,
        min_size=1,
        max_size=3,
        loop=loop,
    )
    bp.pg = pg(bp.pg_pool)
    print('-------- setup connection pool --------')

现在在webapp.py中使用pg类

webapp.py:


@app.route("/")
async def root(req):
    result = await app.pg.fetch('SELECT * FROM foo')
1个回答

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接