使用响应式扩展一次处理一个事件

3
让我们考虑以下流:
  SomeState state = new SomeState().

 _refreshFiberStream =
    Stream()
    .SubscribeOn(new EventLoopScheduler()) 
    .Select(DoCalc)
    .ObserveOn(DispatcherScheduler.Current)
    .Subscribe(Update);

DoCalc方法会将输入投影并消耗'state',然后将结果输出给Update方法,Update方法会修改'state'。如果有新的事件到来,则应该基于上一个事件更新的'state'进行操作,并根据它进行投影。

我正在寻找一种让事件始终按顺序执行的方法。例如,如果我有三个事件,我希望它们按照DoCalc、Update、DoCalc、Update、DoCalc、Update的顺序执行。

但是我看到的是DoCalc、DoCalc、Update、Update、DoCalc、Update,即它们从未按顺序运行。

在Rx中,是否有一种方法可以强制执行顺序?

1个回答

2
我看到一种需要“有序执行”的需求与“分派到另一个线程”的冲突。我的建议是将“更新(Update)”拆分为两个部分:

  1. 需要按顺序执行的部分(Update)
  2. 需要分派的部分(Dispatch)

然后,您可以按顺序调用Do(Update),并在调度程序上调用Subscribe(Dispatch)

var result =
    Stream()
        .SubscribeOn(new EventLoopScheduler())
        .Select(DoCalc)
        .Do(Update)
        .ObserveOn(DispatcherScheduler.Current)
        .Subscribe(Dispatch);

结果序列如下(“Dispatch n”调用可以在“Update n”之后的任何时间发生):
    选择a
    更新a
    选择b
    更新b
    分派a
    分派b

我猜另一个选择是使用ManualResetEvent,它规定只有在更新发生后,下一个DoCalc才能继续进行。你可以通过在DoCalc中添加ManualResetEvent.WaitOne和在Update中添加ManualResetEvent.Set来实现这一点。
private ManualResetEvent _wait = new ManualResetEvent(true);

private string DoCalc(string input)
{
    _wait.WaitOne();
    Console.WriteLine("Selected {0}", input);
    _wait.Reset();
    return input;
}

private void Update(string input)
{
    Console.WriteLine("Update {0}", input);
    _wait.Set();
}

这种第二种方法“可行”,但是像这样阻塞线程让我感到不安——它似乎与响应式编程背道而驰。当然,同样地,如果可能的话最好避免引入状态。

我看到在使用Rx进行共享状态修改和添加并发之间存在冲突;-) - Lee Campbell
@LeeCampbell 是的,说得好...并发性和共享状态需要以某种方式分离。 - McGarnagle
@McGarnagle 我想避免线程阻塞,因为这不是正确的做法,并且我同意并发和共享状态不应该同时存在。 - skjagini

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接