Django celery工作者将实时状态和结果消息发送到前端。

12
在我运行异步任务的Django应用程序中,我希望向用户显示进度、错误等信息。如果有错误,用户应该被重定向到一个页面,需要额外的输入或某些操作来解决问题。从celery工作返回前端的最佳方式是什么?
以下是伪代码的基本结构:
# views.py
from tasks import run_task

def view_task():
    run_task.delay()
    return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
    result = compute_fct()

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
        handle_error()
    else:
        handle_succes()     

# compute_module
import pandas as pd

def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

我理想的情况是:

  • compute_module.py 模块使用 Python 的本地记录器。出于职责分离的考虑,我希望将日志记录尽可能通用,并使用标准的 Python/Django 记录器。但它们似乎没有被设计为向前端发送消息。
  • celery 任务处理日志,而不是在 stdout 上显示它们,将它们重定向到 pusher
  • 前端 JS 显示和处理消息

可能有标准的方法在 celery worker 和前端之间通信,但我不知道。这种情况可能经常发生,我很惊讶实现起来如此困难。在某种程度上,RabbitMQ 消息队列或 AWS SNS 应该为此而设计。以下是我查看过但感觉都不太好用的资源,也许只是我混淆了。

日志记录:这似乎更多是关于服务器端日志记录,而不是向用户发送消息。

Celery cam 似乎是关于监控任务的管理员,而不是向用户发送消息。

我喜欢 pusher,但我不希望 compute_module.py 处理它。例如,我更愿意在 compute_module.py 内部不进行任何 pusher.com 集成。我可以传递一个已经被实例化的 pusher 对象,使模块只需推送消息,但我仍然希望它是通用的。


在您的情况下,进度报告的位置将是什么?您运行一个任务,它完成或出现错误。如果您运行了一个分解为子任务的任务,您是否可以使用WebWorker将每个子任务的最终输出推送回客户端?我也不太喜欢将Python日志记录作为用户反馈机制 - 我怀疑获得漂亮的输出,特别是对于HTML来说,可能会带来更多麻烦而不值得。 - JL Peyret
3个回答

2

编辑:现在已经转移到django-channels,效果很好,但比以下解决方案更复杂。

之前:

好的,下面是我目前解决它的伪代码。基本上,我使用https://pusher.com/docs/javascript_quick_start,并将实例化对象服务器端传递到compute_module中。一个缺点是pusher消息是短暂的,所以我要在LogPusher中做一些额外的工作来将它们存储在数据库中,这是另一天要做的事情……

另外,在我的实际实现中,我通过$(document).ready()中的$.post() ajax调用触发任务,因为小任务完成得非常快,用户永远看不到推送器消息,因为连接没有建立(回到历史消息问题)。

另外一个可选的路线是我上面没有提到的https://channels.readthedocs.io/en/latest/

[编辑]另一个解决方案是服务器发送事件,它有django实现,没有测试过。但它看起来很适合单向更新,例如从服务器到客户端(与websocket双向相反)。您需要像redis pubsub这样的消息传递系统才能将更新推送到服务器sse路由。

通过pusher从django服务器更新前端:

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')

    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')

            
# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        

# context_processors.py
# see https://dev59.com/CXRC5IYBdhLWcg3wCMg6
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }

        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>

1
我唯一能够获取实时状态的方法是将一些SQL写入/API调用放入任务本身中。使用任务的返回值进行操作要容易得多,因为你可以编写自定义的任务类。
我不确定在Django中如何实现这一点,但它应该看起来像这样。
class CustomTask(celery.Task):
    def __call__(self, *args, **kwargs):
        self.start_time = time.time()

    def on_success(self, retval, task_id, args, kwargs):
        do_success_stuff()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        do_failure_stuff()

@shared_task(base=CustomTask)
def do_stuff():
    return create_widgets()

完整列表可以在这里找到: http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

好的,这些小部件如何在UI上显示而不需要页面刷新? - Avinash Raj
我相信一定有一个优雅的解决方案,但我会将我的任务写入表格,然后更新状态列。由于你在启动作业时拥有task_id,所以你可以通过一些jQuery魔法来获取新状态。也许像http://www.giantflyingsaucer.com/blog/?p=4310这样的东西。 - lpiner

0

虽然这个链接可能回答了问题,但最好在此处包含答案的必要部分并提供链接以供参考。如果链接页面更改,仅有链接的答案可能会变得无效。-【来自审核】 - Flair

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接