有没有办法在 Sidekiq 在执行任务时重启之前运行代码?

3
我有一个每4分钟运行一次的Sidekiq工作任务。
这个任务会检查当前代码块是否正在执行,然后再决定是否要再次执行该代码块。
process = ProcessTime.where("name = 'ad_queue_process'").first

# Return if job is running
return if process.is_running == true

如果Sidekiq在代码块的中途重新启动,更新作业状态的代码将永远不会运行。
# Done running, update the process times and allow it to be ran again
process.update_attributes(is_running: false, last_execution_time: Time.now)

这导致任务从未运行,除非我运行一个更新语句来设置is_running = false

有没有办法在Sidekiq重启之前执行代码?


1
你是如何重新启动Sidekiq的?通常情况下,它会“优雅地”停止/重启,这意味着在重新启动/停止之前会先完成所有正在运行的作业:参考但是如果你在开发中使用CTRL+C,那么它将立即关闭,但这只是在开发环境中。如果你想确保“更新”只有在没有错误时才会提交到数据库(即仅当Sidekiq在作业中间未被CTRL+C关闭时),那么你可以将整个作业包装在一个ProcessTime.transaction do ... end块中。 - Jay-Ar Polidario
1
附注:这无论如何都是错误的方法,容易受到竞态条件的影响。应该在作业完成后使用消息队列确认消息。另一个(低级)选项是使用“Mutex”/“ConditionalVariable”。所有其他解决方案迟早会导致竞争条件和两个作业同时并发执行。 - Aleksei Matiushkin
1
@RickS 噢,我明白了,你正在Heroku中使用Sidekiq。我以前没有在那里使用过它,但我发现关闭中途的原因(可能是你的作业需要>30秒才能运行?)。从这个文档中可以看出(https://github.com/mperham/sidekiq/wiki/Deployment),它说:“请记住,Heroku在进程重启时设置了30秒的硬限制,“-t 25”告诉Sidekiq在开始“强制关闭”程序之前给作业25秒的时间来完成”。 - Jay-Ar Polidario
1
@RickS 尽管如此,继续阅读 Heroku 页面上的内容:在 Heroku 发送 SIGTERM 给您的应用程序之后,它将等待几秒钟,然后发送 SIGKILL 强制关闭它,即使它还没有完成清理。在这个例子中,ensure 块根本没有被调用,程序只是退出:。因此,如果您的作业“挂起”/需要很长时间才能关闭,那么我的“rescue; ensure”解决方案仍然不是完全可靠的,但希望它不会花费太长时间,因为您只是在 ensure 块中执行一个 update;仍然不是100%可靠,例如更新时出现临时数据库超时。 - Jay-Ar Polidario
1
哎呀,我忘记重新抛出异常了,而且我不能再编辑上面的评论了。所以在这里:def perform; # 在此处编写代码...; rescue SignalException => e; raise; ensure; process.update(...); end - Jay-Ar Polidario
显示剩余5条评论
2个回答

5

更新:

  • 感谢 @Aaron 的贡献和我们的讨论(请参阅下面的评论),ensure 块(由 forked worker-threads 执行)只能在主线程强制终止这些 worker-threads 之前运行几个不确定的毫秒,以便主线程在异常堆栈中进行“清理”,避免被 Heroku 发送 SIGKILL。因此,请确保您的 ensure 代码非常快!

TL;DR:

def perform(*args)
  # your code here
ensure
  process.update_attributes(is_running: false, last_execution_time: Time.now)
end

  • 无论方法是否“成功”或引发异常,上述ensure始终被调用。我进行了测试:请参见这个repl代码,然后点击“Run”

  • 换句话说,即使出现SignalException,甚至是信号SIGTERM(优雅关闭信号),但ensure块仍然会被调用,但只有在SIGKILL(强制不可挽救的关闭)除外。您可以通过检查我的repl代码并将Process.kill('TERM',Process.pid)更改为Process.kill('KILL',Process.pid)来验证此行为,然后再次单击“run”(您将注意到puts不会被调用)

  • Heroku文档中看到:

    当Heroku要关闭dyno(对于重启或新部署等)时,它会首先向dyno中的进程发送SIGTERM信号。

    在Heroku向应用程序发送SIGTERM之后,它将等待几秒钟,然后发送SIGKILL以强制其关闭,即使它尚未完成清理。 在此示例中,根本不会调用try块,程序仅退出

    这意味着因为是SIGTERM而不是SIGKILL,将会调用ensure块,只有在关机需要很长时间时才不会调用,可能由于以下原因(我能想到的一些原因):

    • 您的perform代码(或堆栈中的任何ruby代码;甚至是gems)中也救援了SignalException,甚至救援了根Exception类,因为SignalExceptionException的子类),但需要很长时间来清理(即清理到数据库的connections或某些hang住应用程序的I/O操作等)

    • 或者,您上面自己的ensure块需要很长时间。在执行process.update_attributes(...) 时,由于某些原因(例如DB临时挂起/网络延迟或超时),该update可能根本无法成功!并且将耗尽时间,在上面的引述中,SIGTERM后几秒钟后,Heroku将发送SIGKILL强制停止应用程序。

...这意味着我的解决方案仍然不完全可靠,但在正常情况下应该能够工作。


我正在尝试一些解决这个问题的方案,而这个特定的解决方案似乎很有前途。但我发现一旦Sidekiq在工作线程上引发Sidekiq::Shutdown,它就不会等待线程完成就退出进程。这意味着使用这个解决方案时,在你的ensure块完成和进程退出之间存在竞争。我非常希望这个解决方案能够奏效,但如果没有某种调整,你的ensure块必须非常快才有任何希望完成。我正在努力解决这个问题... - Aaron
然而,所说的8秒限制是Sidekiq的东西,因为发生的情况是(从我理解的来看,如果我错了请纠正我)。begin; Process.kill('TERM', Process.pid); rescue Exception; raise_sidekiq_shutdown_after_8_seconds do # 然后Sidekiq等待作业完成; end; rescue Sidekiq::Shutdown => # 哦不!作业在8秒后没有完成!那么我(Sidekiq)只有2秒钟来进行内部清理(包括将作业推回重试队列); ensure; # 然后最终你的确保代码在这里运行!因此它应该很快!#end - Jay-Ar Polidario
抱歉,之前的消息中有误:“这样堆栈中任何更高层次的Ruby代码都有机会来捕获异常”。另外,附言:我在上面的评论中所说的并没有经过测试!我还没有看过Sidekiq进程的Ruby代码,也没有进行任何调试。但以上所有内容仅基于我的过去观察和假设。因此,我也很想了解事实,并且对你的发现更感兴趣! :) - Jay-Ar Polidario
嗯,这很有趣。Sidekiq::Shutdown 似乎是 Interrupt 的一个子类,因此 ensure 应该仍然在分叉线程中被调用(其中作业正在运行且 ensure 块被放置),只要主线程不立即终止。Sidekiq 文档 看起来建议在第 25 秒等待时间之后还有大约 5 秒的时间通过 Sidekiq::Shutdown 进行清理。但我只是好奇你的具体配置是否会影响这种行为。 - Jay-Ar Polidario
1
我一定会让你知道的!目前我所能想到的最好的办法是通过猴子补丁来等待线程(至少几秒钟)然后再终止进程。但这让我很紧张。谢谢你与我共同思考! - Aaron
显示剩余7条评论

0
处理 Sidekiq 关闭异常。
class SomeWorker
  include Sidekiq::Worker

  sidekiq_options queue: :default

  def perform(params)
    ...

  rescue Sidekiq::Shutdown
    SomeWorker.perform_async(params)
  end
end

嗨,根据这个指南 https://www.rubydoc.info/github/mperham/sidekiq/Sidekiq/Shutdown 上所说,“这是必要的回滚数据库事务,否则 Ruby 的 Thread#kill 将会提交...不要在你的工作进程中 RESCUE 这个错误”,所以我认为这不是最好的解决方案。 - ricks
但是在处理后,您可以重新引发它。 - NeverBe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接