Celery如何将命令行参数传递给任务?

3

我正在努力将额外的命令行参数传递给celery任务。我可以在bootstep中设置所需属性,但是当直接从任务中访问相同属性时,该属性为空(我猜它被覆盖了)。

class Arguments(bootsteps.Step):
  def __init__(self, worker, environment, **options):
    ArgumentTask.args = {'environment': environment}

    # this works
    print ArgumentTask.args

这里是自定义任务。

class ArgumentTask(Task):
  abstract = True

  _args = {}

  @property
  def args(self):
    return self._args

  @args.setter
  def args(self, value):
    self._args.update(value)

并实际任务

@celery.task(base = ArgumentTask, bind = True, name = 'jobs.send')
def send(self):
  # this prints empty dictionary
  print self.args

我需要使用额外的持久化层,例如持久化对象吗?还是我漏掉了一些非常明显的东西?

类似问题

1个回答

3

看起来似乎不可能。原因是您的任务可以被队列中的任何消费者在任何地方使用,每个消费者都有不同的命令行参数,因此其处理过程不应取决于工作者配置。

如果您的问题是管理环境dev/prod,则我们在项目中管理它的方法如下:

每个环境都被禁锢在它自己的venv中,并具有配置,以便项目自我意识其环境(在我们的情况下,只是配置中的db链接更改)。每个环境都有它自己的队列和celery工作者,使用以下命令启动:

/path/venv/bin/celery worker -A async.myapp --workdir /path -E -n celery-name@server -Ofair

希望对你有所帮助。

如果你真的想深入研究,每个任务都可以访问.control,这允许在celery上启动控制操作(如某些监视)。但我没有在那里找到任何有用的东西。


我猜这意味着在任务之间持久化对象是不可能的?例如,如果我在自定义任务中设置了某个属性,然后执行一系列任务,它们将不会设置该属性。是这样吗? - realshadow
同样的规则适用于任何你想要从一个任务跳转到另一个任务的情况,必须将其串行化并与下一个要执行的任务一起作为参数放入队列中,或者必须将其持久化在数据库中。我处理这种情况的方式是在我的对象中持久化元数据(在mongodb中),并添加标签(“任务X开始/结束”)。 - Benoît Latinier
我很担心这个...我需要重新考虑一些事情。谢谢。 - realshadow

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接