使用delayed_job进行轮询

33

我有一个流程,通常需要几秒钟才能完成,因此我正在尝试使用delayed_job来异步处理它。作业本身运行良好,我的问题是如何轮询作业以查找它是否完成。

我可以通过将其分配给变量来从delayed_job获取id:

job = Available.delay.dosomething(:var => 1234)

+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| id   | priority | attempts | handler    | last_error | run_at      | locked_at | failed_at | locked_by | created_at | updated_at  |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+
| 4037 | 0        | 0        | --- !ru... |            | 2011-04-... |           |           |           | 2011-04... | 2011-04-... |
+------+----------+----------+------------+------------+-------------+-----------+-----------+-----------+------------+-------------+

但是,一旦它完成了任务,就会删除它,并且搜索已完成的记录会返回错误:

@job=Delayed::Job.find(4037)

ActiveRecord::RecordNotFound: Couldn't find Delayed::Backend::ActiveRecord::Job with ID=4037

@job= Delayed::Job.exists?(params[:id])

我应该去更改这个,或者推迟删除完整记录吗?我不确定如何获取它的状态通知。还是轮询死记录作为完成的证明可以吗?是否有其他人遇到类似的情况?


我面临的另一个问题或障碍是,如果不这样做,它会占用我的服务器。我向数据库询问是否存在某些带有日期的数据,如果不存在或已过期,则获取新数据,但将其作为单独的作业执行时,我使用AJAX并轮询直到完成...然后再次运行查询以获取新数据。这样做可以使其更快,但也更加复杂。 - holden
使用类似于Resque和Redis这样的东西,基本上缓存返回的对象,省去了额外的数据库往返,使轮询更快,这样做是否更有意义?我从未接触过Redis或Resque,所以想请教一下。 - holden
6个回答

48

让我们从API开始。我想要像以下这样的东西。

@available.working? # => true or false, so we know it's running
@available.finished? # => true or false, so we know it's finished (already ran)

现在让我们来编写这份工作。

class AwesomeJob < Struct.new(:options)

  def perform
    do_something_with(options[:var])
  end

end

到目前为止一切都很好。我们有一个任务。现在让我们编写逻辑将其加入队列。由于Available是负责此作业的模型,让我们教它如何启动此作业。

class Available < ActiveRecord::Base

  def start_working!
    Delayed::Job.enqueue(AwesomeJob.new(options))
  end

  def working?
    # not sure what to put here yet
  end

  def finished?
    # not sure what to put here yet
  end

end
那么我们如何知道工作是否正在运行?有几种方法,但在Rails中,当我的模型创建某些东西时,通常会与该东西相关联。如何关联呢?使用数据库中的ID。让我们在Available模型上添加一个job_id
顺便说一下,我们怎么知道工作没在运行是因为它已经完成了,还是因为它还没有开始呢?一种方法是实际检查工作所做的事情。如果它创建了一个文件,请检查文件是否存在。如果它计算了一个值,请检查结果是否被写入。但有些工作不那么容易检查,因为它们的工作可能没有明显的可验证结果。对于这种情况,您可以在模型中使用标志或时间戳。假设这是我们的情况,让我们添加一个job_finished_at时间戳来区分尚未运行的工作和已经完成的工作。
class AddJobIdToAvailable < ActiveRecord::Migration
  def self.up
    add_column :available, :job_id, :integer
    add_column :available, :job_finished_at, :datetime
  end

  def self.down
    remove_column :available, :job_id
    remove_column :available, :job_finished_at
  end
end

好的,现在让我们修改start_working!方法,使得在我们入队任务时能够立即将Available与其工作关联起来。

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options))
  update_attribute(:job_id, job.id)
end

太好了。在这一点上我本可以写belongs_to :job,但我们实际上并不需要。

现在我们知道如何编写working?方法,所以很容易。

def working?
  job_id.present?
end

但是我们如何标记工作已完成呢?没有人比工作本身更了解工作是否已完成。因此,让我们将available_id作为选项之一传递给工作,并在工作中使用它。为此,我们需要修改start_working!方法以传递该ID。

def start_working!
  job = Delayed::Job.enqueue(AwesomeJob.new(options.merge(:available_id => id))
  update_attribute(:job_id, job.id)
end

当任务完成时,我们应该将逻辑添加到作业中来更新我们的job_finished_at时间戳。

class AwesomeJob < Struct.new(:options)

  def perform
    available = Available.find(options[:available_id])
    do_something_with(options[:var])

    # Depending on whether you consider an error'ed job to be finished
    # you may want to put this under an ensure. This way the job
    # will be deemed finished even if it error'ed out.
    available.update_attribute(:job_finished_at, Time.current)
  end

end

有了这段代码,我们知道如何编写finished?方法。

def finished?
  job_finished_at.present?
end

我们做完了。现在我们可以轻松地针对 @available.working?@available.finished? 进行轮询。此外,您可以通过检查 @available.job_id 来了解为 Available 创建了哪个具体作业,这非常方便。您可以很容易地使用 belongs_to: job 将其转换为一个真正的关联。


15
我最终使用了Delayed_Job,结合一个after(job)回调函数来填充一个与创建的job ID相同的memcached对象。这样我就可以最小化查询作业状态的数据库操作次数,而是轮询memcached对象。它包含了我需要的整个已完成作业的对象,所以我甚至没有往返请求。我从github的一篇文章中得到了灵感,他们实际上做了类似的事情。
链接:https://github.com/blog/467-smart-js-polling 另外,我使用了一个jquery插件来进行轮询,它轮询的频率较低,并在一定数量的重试后放弃。
链接:https://github.com/jeremyw/jquery-smart-poll 看起来运行得很好。
 def after(job)
    prices = Room.prices.where("space_id = ? AND bookdate BETWEEN ? AND ?", space_id.to_i, date_from, date_to).to_a
    Rails.cache.fetch(job.id) do
      bed = Bed.new(:space_id => space_id, :date_from => date_from, :date_to => date_to, :prices => prices)
    end
  end

13

我认为最好的方法是使用delayed_job中提供的回调函数,包括::success、:error和:after。因此,你可以在模型中使用after回调函数来编写一些代码:

class ToBeDelayed
  def perform
    # do something
  end

  def after(job)
    # do something
  end
end

因为如果你坚持使用 obj.delayed.method,那么你必须猴子补丁 Delayed::PerformableMethod 并在那里添加 after 方法。 依我看来,这比轮询某些值要好得多,这些值甚至可能是后端特定的(例如 ActiveRecord vs. Mongoid)。


回调函数很好地工作了。现在当任务完成或失败时,我可以执行自己的逻辑。谢谢。 - channa ly
这些服务器端回调对于客户端如何确定作业何时完成有什么帮助? - Yarin
1
后续回调可以采取以下两种方式之一:1.设置某些标志,2.客户端将轮询此标志以使其变为真,或使用消息传递系统(如Faye)通过WebSocket向客户端发出信号,表明进程已完成。 - Roman
1
啊,我明白了,我误解了——我以为你的意思是使用回调函数代替轮询。 - Yarin

5
最简单的实现方式是将您的轮询操作更改为类似以下内容的形式:
def poll
  @job = Delayed::Job.find_by_id(params[:job_id])

  if @job.nil?
    # The job has completed and is no longer in the database.
  else
    if @job.last_error.nil?
      # The job is still in the queue and has not been run.
    else
      # The job has encountered an error.
    end
  end
end

为什么这个方法可行?当Delayed::Job从队列中运行一个作业时,如果成功,它会从数据库中删除它。如果作业失败,则记录将保留在队列中以便稍后再次运行,并将设置last_error属性为遇到的错误。利用上述两个功能,您可以检查已删除的记录以查看它们是否成功。
以上方法的好处包括:
  • 您可以获得原始帖子中寻找的轮询效果
  • 使用简单的逻辑分支,您可以向用户提供有关处理作业中出现错误的反馈
您可以通过执行以下操作之类的方式将此功能封装在模型方法中:
# Include this in your initializers somewhere
class Queue < Delayed::Job
  def self.status(id)
    self.find_by_id(id).nil? ? "success" : (job.last_error.nil? ? "queued" : "failure")
  end
end

# Use this method in your poll method like so:
def poll
    status = Queue.status(params[:id])
    if status == "success"
      # Success, notify the user!
    elsif status == "failure"
      # Failure, notify the user!
    end
end

非常简单且看起来有效。这种技术有哪些陷阱或缺点? - Leopd
1
唯一真正的缺点是,似乎检查已删除的记录有点“hackish”,而不是使用回调/观察者系统,这在Rails中是首选。 - Mike Trpcic
你会把job_id存储在哪里,以便页面刷新时不会丢失它? - hoffmanc

1
我建议,如果重要的是要得到作业完成的通知,那么编写一个自定义作业对象并将其排队,而不是依赖于调用Available.delay.dosomething时排队的默认作业。创建一个类似以下的对象:
class DoSomethingAvailableJob

  attr_accessor options

  def initialize(options = {})
    @options = options
  end

  def perform
    Available.dosomething(@options)
    # Do some sort of notification here
    # ...
  end
end

并将其排队:

Delayed::Job.enqueue DoSomethingAvailableJob.new(:var => 1234)

是的,我知道我可以写after、success或error回调函数,但作业在成功时会创建/更新数据库中的50-100条记录,由于我正在轮询它,所以我正在寻找一些查找而不是能够推送东西的方法。如果这样说有意义的话。 - holden
我正在使用ajax请求来确定它是否完成,这就是为什么我不能推送它的原因...除非我使用sockets做了一些疯狂的事情。 - holden
我想的更多是“更新某个标记”,以便您的AJAX请求可以定期轮询它。 - nickgrim

1

你的应用程序中的delayed_jobs表旨在仅提供正在运行和排队作业的状态。它不是一个持久性表,为了性能原因,应该尽可能小。这就是为什么工作在完成后立即被删除。

相反,你应该在你的Available模型中添加字段来表示工作已经完成。由于我通常对作业处理所需的时间感兴趣,所以我添加了开始时间和结束时间字段。然后我的dosomething方法会看起来像这样:

def self.dosomething(model_id)

 model = Model.find(model_id)

  begin
    model.start!

    # do some long work ...

    rescue Exception => e
      # ...
    ensure
      model.finish!
  end
end

开始!和结束!方法只记录当前时间并保存模型。然后我会有一个completed?方法,您的AJAX可以轮询以查看作业是否完成。

def completed?
  return true if start_time and end_time
  return false
end

有很多方法可以做到这一点,但我发现这种方法简单且对我很有效。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接