在celery任务内部如何取消一个任务?

3
我知道我可以使用return,但我想知道是否还有其他方法,特别是对于帮助方法而言,在这些方法中,return None会强制调用者在每次调用时添加样板检查。
我发现了InvalidTaskError,但没有真正的文档 - 这是一个内部事务吗?提出这个问题是否合适?
我正在寻找类似于self.retry()self.abort(),但没有看到任何东西。
以下是我将使用它的示例。
def helper(task, arg):
    if unrecoverable_problems(arg):
        # abort the task
        raise InvalidTaskError()

@task(bind=True)
task_a(self, arg):
    helper(task=self, arg=arg)
    do_a(arg)

@task(bind=True)
task_b(self, arg):
    helper(task=self, arg=arg)
    do_b(arg)
1个回答

7
在进一步调查之后,我发现了一个使用 Reject 的示例;
(引用自文档页面)

The task may raise Reject to reject the task message using AMQPs basic_reject method. This will not have any effect unless Task.acks_late is enabled.

Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.

Reject can also be used to requeue messages, but please be very careful when using this as it can easily result in an infinite message loop.

Example using reject when a task causes an out of memory condition:

import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
    file = get_file(path)
    try:
        renderer.render_scene(file)

    # if the file is too big to fit in memory
    # we reject it so that it's redelivered to the dead letter exchange
    # and we can manually inspect the situation.
    except MemoryError as exc:
        raise Reject(exc, requeue=False)
    except OSError as exc:
        if exc.errno == errno.ENOMEM:
            raise Reject(exc, requeue=False)

    # For any other error we retry after 10 seconds.
    except Exception as exc:
        raise self.retry(exc, countdown=10)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接