Sidekiq:确保队列中的所有作业都是唯一的

13

我有一些更新触发器,它们将任务推入Sidekiq队列。所以在某些情况下,可能会有多个任务处理相同的对象。

有几个唯一性插件("中间件"唯一作业),它们没有被很好地记录,但它们似乎更像是限流器,用于防止重复处理;我想要的是一个限流器,它可以防止创建重复的作业。这样,对象总是以最新状态进行处理。是否有此类插件或技术?


更新:我没有时间制作中间件,但我最终得到了一个相关的清理函数来确保队列是唯一的:https://gist.github.com/mahemoff/bf419c568c525f0af903


不是在恶意挑衅,但 Sidekiq 的一个假设是任务具有幂等性,这正是你抱怨的问题所在。 - engineerDave
我不担心重复的工作会导致一些不必要的后果;我担心的是性能。相同的工作意味着浪费周期。例如,如果一个对象被更改并添加到队列中的作业,然后在作业仍在队列中时再次更改对象,则执行两个相同的作业没有意义。 - mahemoff
你是凭直觉感觉这是一个优化问题,还是需要基准测试来验证性能瓶颈?由于Sidekiq以非阻塞的方式并发运行任务,任务会并行执行,开销很小。查找独特的作业可能会消耗更多的周期,或者会导致阻塞操作,这将比在线程中执行一些重复操作慢。无论如何,你永远不知道,直到你进行了基准测试。无论哪种方式,祝你好运! - engineerDave
谢谢Dave!当你说“little overhead”时,你指的是Sidekiq的工作量,但如果任务本身需要大量的网络活动和繁重的工作,那么节省的成本就会非常巨大。我的意思是,这些任务被推迟的原因是有道理的,其中一些可能非常繁重。 - mahemoff
抱歉如果有任何混淆,我所说的“little overhead”是指低内存占用和非阻塞,在这个背景操作中。 - engineerDave
4个回答

8

那么,简单的客户端中间件怎么样?

module Sidekiq
  class UniqueMiddleware

    def call(worker_class, msg, queue_name, redis_pool)
      if msg["unique"]
        queue = Sidekiq::Queue.new(queue_name)
        queue.each do |job|
          if job.klass == msg['class'] && job.args == msg['args']
            return false
          end
        end
      end

      yield

    end
  end
end

只需注册它

  Sidekiq.configure_client do |config|
    config.client_middleware do |chain|
      chain.add Sidekiq::UniqueMiddleware
    end
  end

然后在工作中,当需要设置时,请在sidekiq_options中设置unique: true


1
在我看来,这不是一个好的解决方案。它的时间复杂度为O(n),而拥有后台作业处理器的整个目的就是不延迟主线程的执行。然而,这个中间件可能会成为性能瓶颈,具体取决于你的队列有多大。 - Hamed
完全同意。这只是一个小队列的示例。任何大型队列都需要使用基于作业参数的哈希查找,而不是这种迭代方法。 - Paté

3

我的建议是根据某些选择条件搜索之前安排的作业并删除,然后再安排新的。当我想要特定对象和/或其方法的单个预定作业时,这对我很有用。

在此背景下的一些示例方法:

 find_jobs_for_object_by_method(klass, method)

  jobs = Sidekiq::ScheduledSet.new

  jobs.select { |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method
  }

end

##
# delete job(s) specific to a particular class,method,particular record
# will only remove djs on an object for that method
#
def self.delete_jobs_for_object_by_method(klass, method, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        job_method == method  &&
        args[0] == id
  end.map(&:delete)

end

##
# delete job(s) specific to a particular class and particular record
# will remove any djs on that Object
#
def self.delete_jobs_for_object(klass, id)

  jobs = Sidekiq::ScheduledSet.new
  jobs.select do |job|
    job.klass == 'Sidekiq::Extensions::DelayedClass' &&
        ((job_klass, job_method, args) = YAML.load(job.args[0])) &&
        job_klass == klass &&
        args[0] == id
  end.map(&:delete)

end

谢谢,这不是完整的答案,但我认为这是接近解决此问题的最佳策略。 - mahemoff

3

我在问题中提到了这一点。 - mahemoff
它有非常完善的文档,详细说明了如何做你想要的事情。 - Freddy Wetson
在我看来,它似乎是在处理时间而不是入队时间检查唯一性。但文件并没有很清楚地表明。 - mahemoff
我们使用的是sidekiq-unique-jobs 2.7.0,它在入队时起作用。只需在您的worker的sidekiq_options中设置unique_job_expiration,并将其设置为作业的平均执行时间的倍数即可。例如:您的作业每分钟调度一次,需要20秒才能完成,则使用sidekiq_options queue: unique, unique_job_expiration: 40。这样,如果Sidekiq在这40秒内尝试重新排队作业,它将不会产生任何影响。 - aledalgrande
1
@aledalgrande 谢谢,但基于时间的过期并不是我需要的。我想要一种方法,在 Sidekiq 尝试入队时检查作业是否存在,如果存在,则什么也不做;但如果不存在,则将其入队。我猜测一个插件可能需要维护所有作业的哈希表以有效地完成它。 - mahemoff
显示剩余2条评论

0
也许你可以使用Queue Classic,它将作业排队到Postgres数据库中(以非常开放的方式),因此可以在执行之前进行扩展(开源)以检查唯一性。

2
Sidekiq非常开放且易于扩展,说实话。 - mahemoff

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接