我有一个Django应用程序,调用了一个异步任务对查询集进行处理(使用Celery)。该任务接收查询集并执行一系列操作,这些操作可能基于其中的对象而需要非常长的时间。对象可以在查询集之间共享,因此用户可以在包含已在运行的对象的查询集上提交任务,新任务只应在尚未运行的对象上执行,并等待所有对象完成后才返回结果。
我的解释有点混乱,所以请想象以下代码:
我的解释有点混乱,所以请想象以下代码:
from time import sleep
import redis
from celery.task import Task
from someapp.models import InterestingModel
from someapp.longtime import i_take_a_while
class LongRunningTask(Task):
def run(self, process_id, *args, **kwargs):
_queryset = InterestingModel.objects.filter(process__id=process_id)
r = redis.Redis()
p = r.pipeline()
run_check_sets = ('run_check', 'objects_already_running')
# There must be a better way to do this:
for o in _queryset.values_list('pk', flat=True):
p.sadd('run_check')
p.sdiff(run_check_sets) # Objects that need to be run
p.sunion(run_check_sets) # Objects that we need to wait for
p.sunionstore('objects_already_running',run_check_sets)
p.delete('run_check')
redis_result = p.execute()
objects_to_run = redis_result[-3]
objects_to_wait_for = redis_result[-2]
if objects_to_run:
i_take_a_while(objects_to_run)
p = r.pipeline()
for o in objects_to_run:
p.srem('objects_already_running', o)
p.execute()
while objects_to_wait_for:
p = r.pipeline()
for o in objects_to_wait_for:
p.sismember('objects_already_running',o)
redis_result = p.execute()
objects_to_wait_for = [objects_to_wait_for[i] for i, member in enumerate(redis_result) if member]
# Probably need to add some sort of timeout here or in redis
sleep(30)
我对Redis非常陌生,所以我的主要问题是是否有更有效的方法来操作Redis以达到相同的结果。更广泛地说,我想知道Redis是否是处理这个问题的必要/正确方法。似乎应该有一种更好的方式来与Django模型交互。最后,我想知道这段代码是否线程安全。是否有人能够打破我的逻辑?
任何评论都将不胜感激。