Python中最佳的数据库连接池解决方案是什么?

41

我已经开发了一些类似DAO的自定义类,以满足我的项目对服务端进程的要求,该进程不在任何框架内运行。

这个解决方案很好,但每次有新请求时,都会通过MySQLdb.connect打开一个新连接。

在Python中,切换到使用连接池的最佳“插入式”解决方案是什么?我想象中的解决方案类似于Java的commons DBCP解决方案。

该进程长时间运行,并且有许多需要发出请求的线程,但并非同时...具体而言,在简短的输出结果之前,它们会做很多工作。

编辑后添加: 经过更多搜索,我发现了anitpool.py,看起来还不错,但由于我相对较新于Python,我想确保我没有错过更明显/更习惯/更好的解决方法。

9个回答

21

在MySQL中?

我认为不必费心去使用连接池。它们经常成为问题的源头,并且对于MySQL,它们不会带来您所期望的性能优势。这条路可能需要花费很多精力才能跟进——政治上——因为在这个领域有太多的最佳实践和教科书用语关于连接池的优点。

连接池只是无状态应用程序(例如HTTP协议)后Web时代与有状态长时间批处理应用程序前Web时代之间的桥梁。由于在前Web数据库中连接非常昂贵(因为过去没有人过于在意连接需要多长时间才能建立),因此后Web应用程序设计了这种连接池方案,以便每次命中不会在RDBMS上产生巨大的处理开销。

由于MySQL更像是一个后Web时代的关系型数据库,连接非常轻量级且快速。我编写了许多高流量的Web应用程序,根本不使用MySQL的连接池。

只要没有政治障碍需要克服,这是您可以受益于不使用的复杂情况。


35
此答案发布8年后,连接池仍然是相关的。如果您运行一个具有大量流量的Web应用程序,无论其是否为无状态应用程序,都可能轻易遭遇“连接过多”的限制。连接池将通过等待空闲连接而不是硬性失败来帮助减轻此问题。此外,如果您想要水平扩展您的应用服务器,您的数据库可能不会驻留在同一台机器上。在这种情况下,您很可能希望通过HTTPS连接到它,这会产生显着的开销。连接池也将在此方面提供帮助。 - Joe
1
@Joe,您可能意思是TLS / SSL,而不是HTTPS。 - Stanislav Bashkyrtsev

18

封装你的连接类。

限制连接数。 返回未使用的连接。 拦截关闭以释放连接。

更新: 我在dbpool.py中添加了类似以下内容:

import sqlalchemy.pool as pool
import MySQLdb as mysql
mysql = pool.manage(mysql)

2
克里斯,肯定有人已经构建了这个功能吧?最坏的情况是我可以自己写,但显然这应该是一个相当普遍的需求,适用于那些不使用现有ORM/框架的人,我相信已经有其他人创建了一个经过时间验证的解决方案。 - John
我以前用Oracle做过类似的事情,我觉得总共涉及不到50行代码。 基本上,使用ID、字典、存储连接、存储使用状态等。 非常简单? - Chris
5
根据这种逻辑,我也应该自己开始实现哈希表和列表。 - Martin Konecny

16

在我看来,“更明显/更惯用/更好的解决方案”是使用现有的ORM,而不是发明类似DAO的类。

在我的看法中,ORM比“原始”的SQL连接更受欢迎。为什么?因为Python是面向对象的,从SQL行到对象的映射是绝对必要的。很少有使用情况是你处理不需要映射到Python对象的SQL行。

我认为SQLAlchemySQLObject(以及相关的连接池)是更具Pythonic风格的解决方案。

将连接池作为一个独立特性并不常见,因为没有对象映射的纯SQL(不包括对象映射)在那些需要连接池受益的复杂和长时间运行的进程类型中并不流行。是的,纯SQL被使用,但它通常在更简单或更受控制的应用程序中使用,连接池并没有提供帮助。

我认为你可能有两个选择:

  1. 修改你的类以使用SQLAlchemy或SQLObject。虽然一开始可能会觉得痛苦(所有的工作都白费了),但你应该能够利用所有的设计和思考。这只是一个采用广泛使用的ORM和连接池解决方案的练习。
  2. 使用你提出的算法,编写自己的简单连接池——一个用于循环连接的简单集合或列表。

a) DAO/Repository 不反对 ORM。实际上,无论您使用 ORM 还是 DB-API,都应该创建这样的层级。b) DB 池必须存在于 ORM 之外——只是 Python 社区不了解更好的方法。实际上,如果编写正确,两种类型的项目都应该使用它们——无论是否使用 ORM。 - Stanislav Bashkyrtsev

9

虽然是老帖子了,但对于一般性的对象池(连接或其他昂贵的对象),我使用类似以下的东西:

def pool(ctor, limit=None):
    local_pool = multiprocessing.Queue()
    n = multiprocesing.Value('i', 0)
    @contextlib.contextmanager
    def pooled(ctor=ctor, lpool=local_pool, n=n):
        # block iff at limit
        try: i = lpool.get(limit and n.value >= limit)
        except multiprocessing.queues.Empty:
            n.value += 1
            i = ctor()
        yield i
        lpool.put(i)
    return pooled

有一个惰性构造函数,它具有可选限制,并且应该普遍适用于我能想到的任何用例。当然,这假设你确实需要无论什么资源的池化,对于许多现代SQL类似的情况可能不需要。用法:

# in main:
my_pool = pool(lambda: do_something())
# in thread:
with my_pool() as my_obj:
    my_obj.do_something()

这意味着,如果需要的话(有些服务器不会显式关闭连接对象),任何对象构造函数创建的对象都必须具有适当的析构函数。


1
你忘了两件事情:1. yield i 可能会引发异常,因此你应该用 try...except 包装它。2. lpool.put(i) 可能会返回错误状态的对象(比如带有打开事务的数据库连接)。 - socketpair
异常产生应该由上下文管理器来处理。无论上下文如何退出(异常或其他情况),函数的其余部分都将运行。但是,如果您正在对数据库进行有状态的操作,则在函数的后续部分处理它是一个好主意。 - metaperture
实际上,使用Chris编辑后的帖子中的池对象可能更好,但对于那些想要学习如何实现池的人来说,我认为这是一个不错的例子。 - metaperture

3
回复旧主题,但据我上次查看,MySQL的驱动程序提供连接池作为其一部分。
你可以在以下网址上查看它们:

https://dev.mysql.com/doc/connector-python/en/connector-python-connection-pooling.html

从TFA中,假设您想要显式地打开一个连接池(正如OP所述):
dbconfig = {  "database": "test", "user":"joe" }
cnxpool = mysql.connector.pooling.MySQLConnectionPool(pool_name = "mypool",pool_size = 3, **dbconfig)

然后,通过调用 get_connection() 函数从池中请求连接。
cnx1 = cnxpool.get_connection()
cnx2 = cnxpool.get_connection()

3

2

如果你的应用程序决定开始使用多线程,那么自己制作连接池是一个糟糕的主意。为多线程应用程序制作连接池比为单线程应用程序制作连接池要复杂得多,在这种情况下,可以使用 PySQLPool 等工具。

如果你想要性能,使用 ORM 也是一个糟糕的主意。

如果你将处理大型/重型数据库,并且必须同时处理许多查询、插入、更新和删除操作,那么你需要性能,这意味着你需要编写自定义 SQL 来优化查找和锁定时间。而 ORM 往往不具备这种灵活性。

所以基本上,是的,你可以制作自己的连接池并使用 ORM,但只有当你确定不需要我刚才描述的任何东西时才能这样做。


1
使用 DBUtils,简单可靠。
pip install DBUtils

0

我为OpenSearch做了这件事,你可以参考一下。

    from opensearchpy import OpenSearch

    def get_connection():
                    connection = None
                    try:
                        connection = OpenSearch(
                            hosts=[{'host': settings.OPEN_SEARCH_HOST, 'port': settings.OPEN_SEARCH_PORT}],
                            http_compress=True,
                            http_auth=(settings.OPEN_SEARCH_USER, settings.OPEN_SEARCH_PASSWORD),
                            use_ssl=True,
                            verify_certs=True,
                            ssl_assert_hostname=False,
                            ssl_show_warn=False,
                        )
                    except Exception as error:
                        print("Error: Connection not established {}".format(error))
                    else:
                        print("Connection established")
                    return connection

    class OpenSearchClient(object):
        connection_pool = []
        connection_in_use = []

        def __init__(self):
            if OpenSearchClient.connection_pool:
                pass
            else:

                OpenSearchClient.connection_pool = [get_connection() for i in range(0, settings.CONNECTION_POOL_SIZE)]

        def search_data(self, query="", index_name=settings.OPEN_SEARCH_INDEX):
            available_cursor = OpenSearchClient.connection_pool.pop(0)
            OpenSearchClient.connection_in_use.append(available_cursor)
            response = available_cursor.search(body=query, index=index_name)
            available_cursor.close()
            OpenSearchClient.connection_pool.append(available_cursor)
            OpenSearchClient.connection_in_use.pop(-1)
            return response

你的回答可以通过提供更多支持信息来改进。请编辑以添加进一步的细节,例如引用或文档,以便他人可以确认你的答案是正确的。您可以在帮助中心中找到有关如何编写良好答案的更多信息。 - dsillman2000

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接