Django中的线程安全与异步任务和Redis

4
我有一个Django应用程序,调用了一个异步任务对查询集进行处理(使用Celery)。该任务接收查询集并执行一系列操作,这些操作可能基于其中的对象而需要非常长的时间。对象可以在查询集之间共享,因此用户可以在包含已在运行的对象的查询集上提交任务,新任务只应在尚未运行的对象上执行,并等待所有对象完成后才返回结果。
我的解释有点混乱,所以请想象以下代码:
from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while

class LongRunningTask(Task):
    def run(self, process_id, *args, **kwargs):
        _queryset = InterestingModel.objects.filter(process__id=process_id)

        r = redis.Redis()
        p = r.pipeline()
        run_check_sets = ('run_check', 'objects_already_running')

        # There must be a better way to do this:
        for o in _queryset.values_list('pk', flat=True):
            p.sadd('run_check')
        p.sdiff(run_check_sets) # Objects that need to be run
        p.sunion(run_check_sets) # Objects that we need to wait for
        p.sunionstore('objects_already_running',run_check_sets)
        p.delete('run_check')
        redis_result = p.execute()

        objects_to_run = redis_result[-3]
        objects_to_wait_for = redis_result[-2]

        if objects_to_run:
            i_take_a_while(objects_to_run)
            p = r.pipeline()
            for o in objects_to_run:
                p.srem('objects_already_running', o)
            p.execute()

        while objects_to_wait_for:
            p = r.pipeline()
            for o in objects_to_wait_for:
                p.sismember('objects_already_running',o)
            redis_result = p.execute()
            objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
            # Probably need to add some sort of timeout here or in redis
            sleep(30) 

我对Redis非常陌生,所以我的主要问题是是否有更有效的方法来操作Redis以达到相同的结果。更广泛地说,我想知道Redis是否是处理这个问题的必要/正确方法。似乎应该有一种更好的方式来与Django模型交互。最后,我想知道这段代码是否线程安全。是否有人能够打破我的逻辑?

任何评论都将不胜感激。

1个回答

2

你能以稍微不同的架构实现吗?具体来说,我会为每个对象启动任务,然后将有关长时间运行任务的信息存储在某个地方(例如数据库、缓存等)。当每个单独的对象完成时,它会更新长时间运行任务的信息并检查是否已返回所有作业。如果是,则可以运行需要在长时间运行任务完成时运行的任何代码。

这种方法的好处在于在等待其他事情发生时不会占用服务器上的线程。在客户端上,您可以定期检查长时间运行任务的状态,甚至使用完成对象的数量来更新进度条(如果需要)。


我最终确实做到了那件事! - Bacon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接