Django多进程和数据库连接

97

背景:

我正在开发一个项目,使用Django和Postgres数据库。我们还使用了mod_wsgi,因为我的一些网络搜索提到了它。在Web表单提交时,Django视图启动一个需要大量时间(超过用户想等待的时间)的作业,因此我们通过后台的系统调用启动作业。现在正在运行的作业需要能够读取和写入数据库。由于这个作业花费的时间很长,我们使用多进程并行运行部分作业。

问题:

顶层脚本具有数据库连接,当它生成子进程时,似乎父级的连接对子进程可用。然后会出现有关必须在查询之前调用SET TRANSACTION ISOLATION LEVEL的异常。研究表明,这是由于尝试在多个进程中使用相同的数据库连接所致。我找到的一个线程建议在子进程开始时调用connection.close(),这样Django将在需要新连接时自动创建一个新连接,因此每个子进程将具有唯一的连接 - 即不共享。但是这对我没有起作用,因为在子进程中调用connection.close()会导致父进程抱怨连接丢失。

其他发现:

我读到的一些东西似乎表明你实际上不能这样做,而且多进程、mod_wsgi和Django不能很好地协同工作。我想这似乎很难相信。

有人建议使用celery,可能是一个长期的解决方案,但目前我无法安装celery,因为还需要一些批准过程。

在SO和其他地方找到了几个关于持久数据库连接的参考资料,我认为这是一个不同的问题。

还发现了关于psycopg2.pool和pgpool以及某种关于bouncer的内容。诚然,我没有完全理解我读到的大部分内容,但它当然没有引起我的注意,因为那不是我要找的东西。

当前的“解决方法”:

目前,我已经恢复到逐个运行事物的方式,这样可以工作,但速度比我想要的慢。

任何建议如何使用多进程并行运行?如果我可以让父进程和两个子进程都有独立的数据库连接,似乎事情会变得很好,但我无法获得这种行为。谢谢,对于长度很抱歉!

还可以参考这个讨论 - djvg
12个回答

82

多进程会复制连接对象,因为它会 fork 出一个新的进程,并复制父进程的所有文件描述符。话虽如此,与 SQL 服务器的连接只是一个文件,在 Linux 下可以在 /proc//fd/... 查看。任何打开的文件都将在 fork 的进程之间共享。您可以在这里了解更多有关 forking 的信息。

我的解决方案很简单,就是在启动进程之前关闭数据库连接,每个进程在需要时重新创建自己的连接(在 Django 1.4 中测试通过):

from django import db
db.connections.close_all()
def db_worker():      
    some_paralell_code()
Process(target = db_worker,args = ())

Pgbouncer/pgpool并不涉及多进程处理中的线程连接。它更像是一种解决方案,可以在高负载下加速与PostgreSQL的连接,避免了在每个请求中关闭连接所带来的问题。

更新:

为完全消除数据库连接问题,只需将所有与数据库相关的逻辑移动到db_worker中 - 我想将QueryDict作为参数传递...更好的想法是简单地传递ids列表...请参阅QueryDict和 values_list('id', flat=True),并别忘了将其转换为列表!在传递给db_worker之前进行list(QueryDict)转换。通过这样做,我们就可以避免复制模型数据库连接。

def db_worker(models_ids):        
    obj = PartModelWorkerClass(model_ids) # here You do Model.objects.filter(id__in = model_ids)
    obj.run()


model_ids = Model.objects.all().values_list('id', flat=True)
model_ids = list(model_ids) # cast to list
process_count = 5
delta = (len(model_ids) / process_count) + 1

# do all the db stuff here ...

# here you can close db connection
from django import db
db.connections.close_all()

for it in range(0:process_count):
    Process(target = db_worker,args = (model_ids[it*delta:(it+1)*delta]))   

1
多进程复制连接对象之间的原因是因为它分叉进程,因此复制了父进程的所有文件描述符。话虽如此,与mysql服务器的连接只是一个文件,您可以在Linux下看到它位于/proc/<PID>/fd/...。据我所知,任何打开的文件都将在分叉的进程之间共享。https://dev59.com/0G855IYBdhLWcg3wjlCL - vlad-ardelean
1
这也适用于线程吗?例如,在主线程中关闭数据库连接,然后在每个线程中访问数据库,每个线程都会获得自己的连接吗? - James Lin
1
你应该使用 django.db.connections.close_all() 来一次性关闭所有连接。 - Denis Malinovsky
谢谢。我还是有疑问——如果这是一个文件,并且多个进程正在使用同一个文件,它们不应该共享同一个连接吗?因此,如果一个进程打开了一个连接,Django 将在另一个进程中使用该连接。 - Dejell
1
嗯...这里有一段非常有趣的对话,来自django的人们:https://code.djangoproject.com/ticket/20562 或许它能为这个话题提供一些启示?基本上,连接“不能被fork”...每个进程都应该有自己的连接。 - lechup
显示剩余3条评论

20

使用多个数据库时,您应该关闭所有连接。

from django import db
for connection_name in db.connections.databases:
    db.connections[connection_name].close()

编辑

请使用与 @lechup 提到的相同方法关闭所有连接(不确定从哪个 Django 版本开始添加此方法):

from django import db
db.connections.close_all()

9
这只是多次调用db.close_connection。 - ibz
2
我不认为在任何地方不使用别名或信息的情况下,这可以工作。 - RemcoGerlich
这个...不能工作。 @Mounir,如果dbclose_connection()支持,你应该修改它以在for循环体中使用aliasinfo - 0atman

8

对于Python 3和Django 1.9,以下是我使用的方法:

import multiprocessing
import django
django.setup() # Must call setup

def db_worker():
    for name, info in django.db.connections.databases.items(): # Close the DB connections
        django.db.connection.close()
    # Execute parallel code here

if __name__ == '__main__':
    multiprocessing.Process(target=db_worker)

请注意,如果没有django.setup(),我就无法使其工作。我猜测需要重新初始化某些内容以进行多进程操作。


谢谢!这对我有用,对于较新版本的Django来说,现在可能应该是被接受的答案。 - krischan
Django的方式是创建管理命令而不是创建独立的包装脚本。如果您不使用管理命令,则需要使用Django的setup - lechup
3
你的for循环实际上没有对db.connections.databases.items()做任何操作 - 它只是多次关闭连接。只要在调用工作函数时调用db.connections.close_all()就可以正常工作。 - tao_oat

6

在顺序运行Django 测试用例 时,我遇到了“关闭连接”问题。除了测试以外,还有另一个进程在测试执行期间故意修改数据库。这个进程是在每个测试用例的setUp()中启动的。

一个简单的解决方法是从TestCase继承我的测试类改为继承TransactionTestCase。这确保了数据库实际上已被写入,并且其他进程对数据具有最新的视图。


运行在Linux上很好,但似乎在Windows上无法工作。 - bluppfisk

4

解决问题的另一种方法是在派生的进程内使用以下方式初始化到数据库的新连接:

from django.db import connection    
connection.connect()

1

(不是一个很好的解决方案,但可能是一个可行的变通方法)

如果您无法使用celery,也许您可以实现自己的队列系统,基本上是将任务添加到某个任务表中,并有一个定期的cron作业来提取并处理它们?(通过管理命令)


可能 - 希望避免那种复杂度,但如果这是唯一的解决方案,那么我可能不得不走这条路 - 感谢您的建议。 Celery是最好的答案吗? 如果是这样,我可能能够推动它,但需要一段时间。 我将Celery与分布式处理相联系,而不是在一台机器上进行并行处理,但也许这只是我缺乏经验的原因。 - daroo
2
Celery非常适合处理请求-响应周期之外的任何处理需求。 - second
如果任务不急迫,轮询是可以的。但是如果需求稍有变化,你就必须重新编写所有内容。 - nurettin

1
如果您只需要I/O并行而不是处理并行,可以通过将进程切换为线程来避免此问题。替换
from multiprocessing import Process

使用

from threading import Thread

Thread 对象与 Process 具有相同的接口。


1
一种可能的方法是使用多进程spawn子进程创建方法,这不会将django的DB连接细节复制到子进程中。子进程需要从头开始引导,但可以自由地创建/关闭自己的django DB连接。 在调用代码中:
import multiprocessing
from myworker import work_one_item # <-- Your worker method

...

# Uses connection A
list_of_items = djago_db_call_one()

# 'spawn' starts new python processes
with multiprocessing.get_context('spawn').Pool() as pool:
    # work_one_item will create own DB connection
    parallel_results = pool.map(work_one_item, list_of_items)

# Continues to use connection A
another_db_call(parallel_results) 

在我的worker.py文件中:

import django. # <-\
django.setup() # <-- needed if you'll make DB calls in worker

def work_one_item(item):
   try:
      # This will create a new DB connection
      return len(MyDjangoModel.objects.all())

   except Exception as ex:
      return ex

请注意,如果您在TestCase中运行调用代码,则模拟将不会传播到子进程中(需要重新应用它们)。

1

嘿,我遇到了这个问题,并通过执行以下操作解决了它(我们正在实现一个有限任务系统)

task.py

from django.db import connection

def as_task(fn):
    """  this is a decorator that handles task duties, like setting up loggers, reporting on status...etc """ 
    connection.close()  #  this is where i kill the database connection VERY IMPORTANT
    # This will force django to open a new unique connection, since on linux at least
    # Connections do not fare well when forked 
    #...etc

ScheduledJob.py

from django.db import connection

def run_task(request, job_id):
    """ Just a simple view that when hit with a specific job id kicks of said job """ 
    # your logic goes here
    # ...
    processor = multiprocessing.Queue()
    multiprocessing.Process(
        target=call_command,  # all of our tasks are setup as management commands in django
        args=[
            job_info.management_command,
        ],
        kwargs= {
            'web_processor': processor,
        }.items() + vars(options).items()).start()

result = processor.get(timeout=10)  # wait to get a response on a successful init
# Result is a tuple of [TRUE|FALSE,<ErrorMessage>]
if not result[0]:
    raise Exception(result[1])
else:
   # THE VERY VERY IMPORTANT PART HERE, notice that up to this point we haven't touched the db again, but now we absolutely have to call connection.close()
   connection.close()
   # we do some database accessing here to get the most recently updated job id in the database

坦白地说,为了避免多个同时用户引起的竞态条件,最好在进程分叉后尽快调用database.close()。尽管如此,在你有机会刷新数据库之前,仍然有可能出现其他用户在某个地方完全向db发出请求的情况。
老实说,更安全、更明智的做法可能是让你的分叉不直接调用该命令,而是调用操作系统上的一个脚本,使生成的任务在自己的django shell中运行!

我采用了您的想法,在fork内部而不是之前关闭,以制作一个装饰器,将其添加到我的工作函数中。 - Rebs

1
如果您也在使用连接池,以下方法对我们有用,即在分叉后强制关闭连接。之前似乎没有帮助。
from django.db import connections
from django.db.utils import DEFAULT_DB_ALIAS

connections[DEFAULT_DB_ALIAS].dispose()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接