对于“无限”重复任务,我应该使用哪种OTP行为?

21

我希望在Phoenix应用程序中反复运行相同的操作序列,而不会在工作进程中发生任何故障时使整个Web应用程序崩溃,但我真的不知道是否应该使用GenServer、Elixir的Tasks、代理或者其他一些我还没有想到的东西。

当我启动我的Phoenix应用程序时,一个工作进程也应该启动,定期从串行连接中提取一些值,通过Phoenix通道广播这些值,收集它们,直到达到@save_interval,然后计算中位数,通过另一个通道广播该中位数并将其写入InfluxDB。目前我有类似以下的代码(基本上可以工作):

def do_your_thing(serial_pid) do
  Stream.interval(@interval_live)
    |> get_new_values_from_serial(serial_pid)
    |> broadcast!("live-channel:#{@name}")
    |> Enum.take(div(@interval_save, @interval_live))
    |> calculate_medians()
    |> broadcast!("update-channel:#{@name}")
    |> write_to_database()

  do_your_thing(serial_pid) # repeat
end

我才刚开始理解OTP相关的内容,希望有人能够帮助我朝着正确的方向前进。

1个回答

33

你应该使用一个 GenServer,在 x 秒后给自己发送消息(以下示例中是 60 秒):

defmodule MyApp.Worker do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    schedule_work()
    {:ok, []}
  end

  def handle_info(:work, state) do
    state = do_work(state)
    schedule_work()
    {:noreply, state}
  end

  defp do_work(state) do
    # Do your work here and return state
  end

  defp schedule_work do
    Process.send_after(self(), :work, 60_000)
  end
end

3
为什么不使用定期运行的无限循环Task(可能由Stream.interval或Stream.repeatedly驱动)来完成其任务?如果这只是一个定期拉取并将所拉取的数据进一步转发到系统,那么它并不真正需要成为GenServer,对吧。Task仍然符合OTP规范,并且对我来说似乎更直观地完成了工作。 - sasajuric
4
原因是 Task 无法接收系统消息。我们希望让 Stream 和其它相关应用知道这一点,但这在版本1.0中还未实现,可能要到1.3才会实现。 - José Valim
1
此外,您可能希望您的主管能够重新启动执行定期工作的工作进程,而不仅仅是让无限流在发生故障时崩溃当前进程。 - Letseatlunch
José:是的,那是个好观点。@Letseatlunch 我不认为这是一个问题。监管者可以启动/停止/重新启动任务。对于这些工人来说,使用 :brutal_kill 关闭选项可能是有意义的,因为他们可能无法处理关闭消息。 - sasajuric
3
在Elixir 1.3中有更好的方法来做这件事吗? - knite
1
谢谢,何塞!我很高兴看到这个问题得到了最好的答案来源。 - DCameronMauch

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接