在异步 Rust 中,Future 如何确保只调用最新的 Waker?

7

这个问题的目的是为了了解Rust中Futures应该如何工作。我没有一个具体不起作用的代码片段,而是对于如何编写Futures以满足其约定存在问题。

假设你正在编写一个Future。poll()的约定允许调用者每次传入一个不同的Waker,而Future只会调用最新的那个。

假设这个Future隐藏了后台由工作线程完成的实际工作。如果工作线程在创建Future时所传递的工作尚未完成,那么Future应该返回Pending并且克隆和存储Waker。然后,Future在一段时间内什么都不做,并且是工作线程调用Waker。这将导致再次对Future进行轮询。

假设当Future再次被轮询时,它判断工作线程到目前为止所做的还不够,所以它再次返回Pending,并且在这第二次调用中也克隆和存储新的Waker。

由于实际调用唤醒器的是工作线程,因此poll()实现必须在后台线程中更新唤醒器。然而,我不清楚poll()实现如何以无竞争的方式进行更新——但现在问题变得棘手了,因为细节取决于poll()合同中的细节,我认为这些细节甚至没有被准确地指定。

可能发生的一件事是,在poll()运行时,就在它想要将新的唤醒器交给后台工作者之前,工作者决定该调用唤醒器。现在它调用旧的唤醒器,即使poll()已经收到了新的唤醒器,这可能违反合同(我认为这并没有指定)。因此,人们可能认为,在poll()完成之前,工作者可以调用旧的唤醒器,但这仅在工作者允许在poll()运行时调用任一唤醒器时才有效,这也没有提到,并且只需要在poll()返回后调用新的唤醒器。

最后,即使这个假设依赖于另一个未指定的事情:在多线程上下文中,“在 poll() 返回之后”的确切含义,通常必须诉诸于特别定义的术语,例如 happens-after,以定义一个事件发生在另一个事件之后。

考虑到所有这些,poll() 实现如何履行其仅调用最新的 Waker 的契约?是否有一份文件详细说明了似乎缺失于 poll() 文档中的所有细节?


我认为你可以实际做的最好的方法是:将唤醒器存储在互斥锁中。在poll函数中,将互斥锁作为第一操作进行锁定。 - PitaJ
2个回答

4

关于以无竞争的方式更新唤醒器,我的建议是使用futures::task::AtomicWaker,它可以原子地存储和唤醒唤醒器。

需要注意的是,唤醒唤醒器并不意味着未来还有更多的工作要做,唤醒器可以间歇性地被唤醒(例如,如果另一个未来准备好继续执行)。

考虑一个简单的未来,它具有一个AtomicWaker和一个“待完成工作”的标志,该标志在未来和工作线程之间共享。如果在轮询时未设置“待完成工作”标志,则未来将简单地返回Poll::Pending

为了正确性,我们只需要两件事:

  1. 工作线程必须在唤醒唤醒器之前设置“待完成工作”标志。
  2. 未来必须在检查“待完成工作”标志之前注册新的唤醒器。

这意味着无论工作线程和poll方法以哪种顺序执行操作,行为都将是正确的:

  1. 如果没有调用poll方法,那么工作线程将设置标志并唤醒唤醒者,唤醒者将轮询未来并发现有需要完成的任务。

  2. 如果调用了poll方法,并且在注册新的唤醒者之前唤醒了旧的唤醒者,那么未来将发现有需要完成的任务,并且可以忽略旧的唤醒者。(这可能导致不必要地唤醒旧的唤醒者,但这没问题。)

  3. 如果调用了poll方法,并且在注册新的唤醒者之后唤醒了旧的唤醒者,那么未来可能会或可能不会发现有需要完成的任务。如果发现了,就可以像第2点一样忽略唤醒者;如果没有发现,则唤醒者将确保再次轮询未来,就像第1点一样。

在这三种情况下,当工作线程标记未来已准备好继续工作并唤醒唤醒者时,未来将被轮询,直到完成更多的工作,即使这可能需要多次轮询。


我认为两个答案实际上描述的是同一个解决方案,但这个更明确地指出了它们,既在引用AtomicWaker上,又在注册新Waker后如何检查标志(因为在实践中,可能是poll()而不是后台工作者来进行操作,然后poll()可以执行必要的结果步骤,而自己不需要唤醒)。 - Martin Geisse

1

我同意文档应该更清楚地说明需要什么。然而,实际上,你只需要确保新的唤醒器被调用,而不一定是旧的唤醒器没有被调用。也就是说,唤醒两个唤醒器或者只唤醒最近的一个都可以,因此你不需要有任何特定的线程事件时间关系。

为什么我们可以得出这个结论?因为以下两个事实:

  1. 没有规定在未唤醒时轮询未来是不允许的。(例如,每当将轮询未来的责任移交给新的/不同的执行器任务时,必须发生这种情况,因为唤醒器需要提供给未来。)因此,没有未来可以知道它是被虚假轮询还是有用轮询。
  2. Future是不透明的。它们应该在“未来可以取得进展时调用唤醒器”,但从外部无法判断它们是否实际上取得了进展。因此,未来的执行器无法知道给定的唤醒和轮询是有用的还是虚假的。
将这两个事实结合起来,我们可以得出这样的结论:错误地调用Waker::wake()的唯一后果就是浪费了一点CPU时间。
因此,在您的工作线程情况下,重要的是,如果后台线程被交付一个新的唤醒器,并且它相信结果已经准备好了,那么它应该立即调用该唤醒器(而不是假设轮询会发生,因为它调用了先前的唤醒器)。它可能在变为就绪状态的同时也唤醒了先前过时的唤醒器,但这并不重要。

这确实有所帮助,但我认为对于仍然拥有旧Waker并调用它的工作者来说可能不适用。尽管如此还是有帮助的。 - Martin Geisse
@MartinGeisse 正如我在最后一段所说,工作者有责任在听到新唤醒器的消息后立即调用它。 - Kevin Reid
啊,我完全误解了。现在我明白了。非常感谢! - Martin Geisse

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接