在Rails 4.2中如何检查作业是否已经排队。

7
我正在使用Rails 4.2和delayed_job 4.0.6作为我的ActiveJob后端。
我有一个工作,希望只允许在队列中出现一次。该作业需要超过一分钟的时间才能运行。它是由模型上的回调函数排队的。这个回调会比工作完成得多得多。未来不需要再将该工作排队超过一次。
以下是我尝试实现的伪代码。
# app/jobs/refresh_account_cache_job.rb
class RefreshAccountCacheJob < ActiveJob::Base
  def before_enqueue
    skip if job_already_enqueued
  end

  def perform
    Account.write_to_cache
  end

  def job_already_enqueued
    # ?
  end
end

如果在再次调用时作业的实例正在运行,则仍应将其排队等待未来运行。我正在寻找一种方式,使作业最多只能被排队等待1次未来运行。
我认为答案必须是针对delayed_job具体而言,但如果可以推广到ActiveJob那就更好了。

你能用Redis吗?也就是说,你的堆栈中已经安装了它吗? - Coffee Bite
我需要在这个项目中坚持使用Postgres。 - rb-
如果您能够将Redis与Postgres一起使用,那么可能会有一个简单的解决方案。 - Coffee Bite
如果你在考虑Resque或Sidekiq,我不能选择其中任何一种路线。 - rb-
请在当前技术栈中加入 Redis,以便能够在其上执行简单的 get/set 操作。 - Coffee Bite
只添加一次工作的要求听起来像是业务逻辑,而不是工作逻辑。因此,你应该在模型内(或某种服务中)进行验证,而不是在工作逻辑中。无论这是否是一个现实的计划,取决于回调触发的条件... - ABMagil
2个回答

2
这可能不是完全符合要求,但它应该能让您朝着正确的方向前进:
def self.up
  create_table :delayed_jobs, :force => true do |table|
  table.integer  :priority, :default => 0, :null => false
  table.integer  :attempts, :default => 0, :null => false
  table.text     :handler,                 :null => false
  table.text     :last_error
  table.datetime :run_at
  table.datetime :locked_at
  table.datetime :failed_at
  table.string   :locked_by
  table.string   :queue
  table.timestamps
end

你可以在表格中添加一个状态列,然后运行以下查询来获取作业并在执行其他操作之前检查其状态。

Delayed::Job.where(queue: '<YOUR QUEUE>').where(id: params[:id]).status

那么如何设置状态呢?使用延迟作业的success hook。代码如下:

def success(job)
  update_status('success')
end

private

def update_status(status)
  job = Job.find job_id
  job.status = status
  job.save!
end

希望这可以帮到您!

我来试试看。 - rb-

1

我将发布我已经完成的内容作为答案,以获取反馈。这只是我正在测试的一个可能的解决方案。

在该工作中,我检查Delayed::Job列表,以查看当前处理程序是否存在。如果存在,则跳过该作业。

# queue_job.rb
class EnqueueJob < ActiveJob::Base
  queue_as :default

  def already_enqueued?
    Delayed::Job.all.any? do |job|
      job.handler.include?("EnqueueJobHandler")
    end
  end

  def perform
    unless already_enqueued?
      # do stuff
    end
  end
end

到目前为止,它已经成功防止了作业超时排队。不足之处在于我不确定是否能够保持缓存的最新状态。


这个解决方案不可扩展,因为你正在循环遍历所有的工作。你应该使用ActiveRecord来利用数据库,就像上面的解决方案建议的那样:Delayed::Job.where(queue: '<YOUR QUEUE>').any? - Pere Joan Martorell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接