Celery+Django -- 使用Django消息框架进行状态轮询任务并报告成功或失败

6
在我的Django项目中,我使用Celery(以及其他一些东西),有一个Celery任务会在后台上传文件到数据库。我使用轮询来跟踪上传进度,并显示上传的进度条。以下是一些细节描述上传过程的代码片段:
views.py:
from .tasks import upload_task
...

upload_task.delay(datapoints, user, description) # datapoints is a list of dictionaries, user and description are simple strings

tasks.py:

from taskman.celery import app, DBTask # taskman is the name of the Django app that has celery.py
from celery import task, current_task

@task(base=DBTask)
def upload_task(datapoints, user, description):
    from utils.db.databaseinserter import insertIntoDatabase
    for count in insertIntoDatabase(datapoints, user, description):
        percent_completion = int(100 * (float(count) / float(len(datapoints))))
        current_task.update_state(state='PROGRESS', meta={'percent':percent_completion})

databaseinserter.py:

def insertIntoDatabase(datapoints, user, description):
    # iterate through the datapoints and upload them one by one
    # at the end of an iteration, yield the number of datapoints completed so far

上传代码运行正常,进度条也正常工作。但我不确定如何发送Django消息来告诉用户上传完成(或在出现错误时发送Django消息通知用户错误)。当上传开始时,我在views.py中执行以下操作:

from django.contrib import messages
...

messages.info(request, "Upload is in progress")

当上传成功时,我想要执行类似以下操作:
messages.info(request, "Upload successful!")

由于Celery任务是“fire and forget”,因此我无法在views.py中执行此操作。是否可以在celery.py中执行此操作?在我的celery.py中的DBTask类中,我定义了on_successon_failure,那么我能否从那里发送Django消息?

此外,虽然我的轮询技术在技术上可行,但目前并不理想。当前轮询的方式是无论是否正在进行任务,它都会无休止地检查任务。它会迅速地淹没服务器控制台日志,并且可能对性能产生负面影响。我对编写轮询代码比较新,因此我不太确定最佳实践以及如何仅在需要时轮询。处理常量轮询和服务器日志阻塞的最佳方法是什么?以下是我的轮询代码。

views.py:

def poll_state(request):
    data = 'Failure'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            data = task.result or task.state
            if data == 'SUCCESS' or data == 'FAILURE': # not sure what to do here; what I want is to exit the function early if the current task is already completed
                return HttpResponse({}, content_type='application/json')
        else:
            data ='No task_id in the request'
            logger.info('No task_id in the request')
    else:
        data = 'Not an ajax request'
        logger.info('Not an ajax request')

    json_data = json.dumps(data)
    return HttpResponse(json_data, content_type='application/json')

相应的jQuery代码如下:

{% if task_id %}
    jQuery(document).ready(function() {
        var PollState = function(task_id) {
            jQuery.ajax({
                url: "poll_state",
                type: "POST",
                data: "task_id=" + task_id,
            }).done(function(task) {
                if (task.percent) {
                    jQuery('.bar').css({'width': task.percent + '%'});
                    jQuery('.bar').html(task.percent + '%');
                }
                else {
                    jQuery('.status').html(task);
                };
                PollState(task_id);
            });
        }
        PollState('{{ task_id }}');
    })
{% endif %}

(这两个代码片段主要来自之前关于Django + Celery进度条的StackOverflow问题。)
2个回答

1
最简单的减少日志记录和开销的方法是在下一次PollState调用上设置超时。当前函数的写法会立即再次轮询。可以使用如下简单的方式:
setTimeout(function () { PollState(task_id); }, 5000);

这将大大减少您的日志问题和开销。
关于您的Django消息问题,您需要使用某种处理方式将这些已完成的任务取出来。一种方法是使用一个Notification模型或类似的模型,然后添加一个中间件来获取未读通知并将它们注入到消息框架中。

我从未为Django项目编写过自己的中间件,因此我有点不确定该如何处理。在高层次上,我知道我想要将完成消息存储在redis中(我正在使用celery的代理和后端),然后弹出未读消息并在网页上显示完成消息,但我有点不知道该如何编写。您能否提供一些粗略的示例代码,以指导我正确的方向? - Dan K

0
感谢Josh K提供使用setTimeout的提示。不幸的是,我从未弄清中间件方法,因此我选择了更简单的方法,在poll_state中发送HttpResponse,如下所示:
if data == "SUCCESS":
    return HttpResponse(json.dumps({"message":"Upload successful!", "state":"SUCCESS"}, content_type='application/json'))
elif data == "FAILURE":
    return HttpResponse(json.dumps({"message":"Error in upload", "state":"FAILURE"}, content_type='application/json'))

该意图是根据接收到的JSON,简单地呈现成功或错误消息。 现在存在新问题,但这些属于不同的问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接