Sidekiq列出所有工作[排队+运行中]

42

有没有一种方法可以获取当前在队列中和正在运行的所有作业列表?基本上,我想知道给定类别的作业是否已经存在,我不想插入我的其他作业。我已经看到了其他选项,但我想以这种方式做。

我可以在这里看到如何获取队列中作业列表。

queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
  job.klass # => 'MyWorker'
end
据我所了解,这将不包括处理/运行作业。 有没有办法获取它们?

据我所了解,这将不包括处理/运行作业。有没有办法获取它们?


当您使用sidekiq/api检查工作程序时,您可以看到工作程序正在处理什么 https://github.com/mperham/sidekiq/wiki/API#workers - Cristian Bica
4个回答

68

如果您想从控制台列出所有当前正在运行的作业,请尝试以下方法

workers = Sidekiq::Workers.new
workers.each do |_process_id, _thread_id, work|
  p work
end

一个 work 是一个哈希。

列出所有队列数据。

queues = Sidekiq::Queue.all
queues.each do |queue|
  queue.each do |job|
    p job.klass, job.args, job.jid
  end
end

如果要针对特定队列进行更改,请使用Sidekiq::Queue.new('queue_name')

同样地,您可以使用Sidekiq::ScheduledSet.new获取所有已计划的作业


9
这个回答可能已经过时了:我收到了 NoMethodError: undefined method klass'`。 这个错误信息可能是由于调用了一个不存在的方法 "klass" 所致。 - american-ninja-warrior
job.klass 在2020年仍然可用。请查看https://github.com/mperham/sidekiq/blob/6c94a9dd9f3ccbbd107c6ad2336c26a305020a25/lib/sidekiq/api.rb#L189。 - Haseeb A
看起来调用 job.jid 会生成以下错误:NoMethodError: undefined method jid' for #Sidekiq::Queue:0x00007ff144076178`。这是因为我们正在尝试从队列中获取作业 ID 吗? - LewlSauce

10

正在运行的任务:

Sidekiq::Workers.new.each do |_process_id, _thread_id, work|
  p work
end

所有队列中排队的作业:

Sidekiq::Queue.all.each do |queue|
  # p queue.name, queue.size
  queue.each do |job|
    p job.klass, job.args
  end
end

5
稍作澄清:Sidekiq::Queue.all 只会输出正在进行的作业,而不是已计划的作业。如所接受的答案所述,您需要使用 Sidekiq::ScheduledSet.new 来获取已计划的作业。 - wondersz1
2
@sharq,你能否考虑删除这个答案,因为它与被接受的答案相同吗? - Ulysse BN

4
假设您在将任务加入 Sidekiq 时将散列作为参数传递。
args = {
  "student_id": 1,
  "student_name": "Michael Moore"
    }

YourWorker.perform_in(1.second,args)

然后你可以从你的应用程序中按照以下方式检索它

      ss = Sidekiq::ScheduledSet.new
      student_id_list = ss.map{|job| job['args'].first["student_id"]}

1
我正在应用程序作业中使用这个功能,以检查队列中是否已经有相同名称/参数的作业,并防止重复排队。

apps/jobs/application_job.rb

class ApplicationJob < ActiveJob::Base
    

    # Check if there is the same job already queued
    around_enqueue do |job, block|
        
        existing_queued_jobs = list_queued_jobs(job.class, job.queue_name, job.arguments)
        if existing_queued_jobs.size == 0
            block.call # this will enqueue your job
        else
            puts "JOB not enqueue because already queued (#{job.class}, #{job.queue_name}, #{job.arguments})"
        end
    end


    def list_queued_jobs(job_class, queue_name,  arguments)
        found_jobs = []
        queues = Sidekiq::Queue.all
        queues.each do |queue|
            queue.each do |job|
                job.args.each do |arg|
                     found_jobs << job if arg['job_class'].to_s == job_class.to_s && arg['queue_name'] == queue_name && arg['arguments'] == arguments
                end
            end
        end
        return found_jobs
    end
   


end

这可能非常低效。您可能希望查看此宝石代替:https://github.com/mhenrixon/sidekiq-unique-jobs - slhck

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接