使用响应式扩展重新排序事件

11

我正在尝试对来自不同线程的无序事件进行重新排序。

是否可以创建一个反应式扩展查询以匹配这些弹珠图:

s1          1   2       3   4

s2          1   3   2       4

result      1       2   3   4

并且...

s1          1   2   3   4

s2          4   3   2   1

result                  1234

即:只按版本号顺序发布结果。

我最接近的方法是使用连接操作,每次s1计时时打开一个窗口,并且仅在s2到达相同数字时关闭该窗口。

像这样:

var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
        publishedEvents.Scan(0, (i, o) => i + 1),
        expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
        _ => Observable.Never<Unit>(),
        (@event, expectedVersion) => new {@event,expectedVersion})
    .Where(x => x.expectedVersion == x.@event.Version)
    .Select(x => x.@event)
    .Subscribe(Persist);

但对于图2将不起作用。当s2以数字2进行滴答声时,组2将完成,因此先于1。

有意义吗?可以用Rx做到吗?是否应该这样做?

编辑:我想它就像重叠的窗口,后面的窗口在所有前面的窗口关闭之前不能关闭。而前面的窗口在窗口编号匹配事件版本号之前不会关闭。

编辑2:

现在我有类似这样的东西,但它并不是我所期望的反应式、函数式、线程安全的LINQ启示(请暂时忽略我的事件是JObjects):

var orderedEvents = Observable.Create<JObject>(observer =>
{
    var nextVersionExpected = 1;
    var previousEvents = new List<JObject>();
    return events
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(@event =>
        {
            previousEvents.Add(@event);

            var version = (long) @event["Version"];
            if (version != nextVersionExpected) return;

            foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
            {
                if ((long) previousEvent["Version"] != nextVersionExpected)
                    break;

                observer.OnNext(previousEvent);
                previousEvents.Remove(previousEvent);
                nextVersionExpected++;
            }
        });
});
1个回答

12

介绍

解决这个问题的关键在于排序。无论你怎么看,都需要某种形式的缓冲。虽然毫无疑问一些复杂的操作符组合可以实现这一点,但我认为这是一个很好的例子,说明Observable.Create是一个不错的选择。

泛化解决方案

我已经努力将我的方法推广到接受任何类型的排序键。为了做到这一点,我希望能够提供以下内容:

  • 用于获取事件键的键选择器函数,类型为Func<TSource,TKey>
  • 类型为TKey的初始键
  • 用于获取序列中下一个键的函数,类型为Func<TKey,TKey>
  • 用于从源流中配对的事件生成结果的结果选择器,类型为Func<TSource,TSource,TSource>

由于我只是在测试中使用基于1的整数序列来满足这些条件,因此它们满足:

  • keySelector:i => i
  • firstKey:1
  • nextKeyFunc:k => k+1
  • resultSelector:(left,right) => left

排序

这是我的Sort尝试。它将事件缓冲到字典中,并尽快刷新到订阅者:

public static IObservable<TSource> Sort<TSource, TKey>
    (this IObservable<TSource> source,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc)
{
    return Observable.Create<TSource>(o =>
    {
        var nextKey = firstKey;
        var buffer = new Dictionary<TKey, TSource>();
        return source.Subscribe(i =>
        {
            if (keySelector(i).Equals(nextKey))
            {
                nextKey = nextKeyFunc(nextKey);
                o.OnNext(i);
                TSource nextValue;
                while (buffer.TryGetValue(nextKey, out nextValue))
                {
                    buffer.Remove(nextKey);
                    o.OnNext(nextValue);
                    nextKey = nextKeyFunc(nextKey);
                }
            }
            else buffer.Add(keySelector(i), i);
        });
    });
}

我必须说这是一个相当天真的实现。在过去的生产代码中,我已经使用特定的错误处理、固定大小的缓冲区和超时来防止资源泄漏。然而,它对于这个例子来说已经足够了。:)

搞定这个(抱歉!),我们现在可以考虑处理多个流。

合并结果

第一次尝试

我的第一次尝试是生成一个无序的事件流,该事件流已经看到了所需次数。然后可以对其进行排序。我通过按键分组元素来实现这一点,使用 GroupByUntil 来保持每个组,直到捕获到两个元素。然后,每个组都是相同键的结果流。对于整数事件的简单示例,我可以只取每个组的最后一个元素。但是,我不喜欢这样做,因为在更实际的情况下,每个结果流可能会提供一些有用的内容。出于兴趣的缘故,我包括了代码。请注意,为了使测试可以在此和我的第二次尝试之间共享,我接受一个未使用的 resultSelector 参数:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc
     Func<TSource,TSource,TSource> resultSelector)
{
    return left.Merge(right)
               .GroupByUntil(keySelector, x => x.Take(2).LastAsync())
               .SelectMany(x => x.LastAsync())
               .Sort(keySelector, firstKey, nextKeyFunc);
}

小提示:你可以在 SelectMany 子句上动手脚,决定如何选择结果。这种解决方案比第二次尝试更有优势,在有多个结果流的情况下,更容易看出如何扩展它以选择比如说前三个元组中的前两个到达的结果。

第二次尝试

对于这种方法,我独立地对每个流进行排序,然后将结果一起使用 Zip 函数。这不仅是一个看起来简单得多的操作,而且还更容易以有趣的方式结合每个流的结果。为了使测试与我的第一种方法兼容,我选择了 resultSelector 函数,使用第一个流的事件作为结果,但显然您可以在您的场景中灵活地执行某些有用的操作:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc,
     Func<TSource, TSource, TSource> resultSelector)
{
    return Observable.Zip(
        left.Sort(keySelector, firstKey, nextKeyFunc),
        right.Sort(keySelector, firstKey, nextKeyFunc),
        resultSelector);
}

另外:很容易看出这段代码可以扩展到接受任意数量的输入流的更一般情况,但正如前面所提到的,使用 Zip 在给定键位时会变得非常不灵活,直到所有流的结果都准备好了才会阻塞。

测试用例

最后,这是我的测试,回应您的示例场景。要运行这些测试,请导入NuGet包rx-testingnunit并将上述实现放入静态类中:

public class ReorderingEventsTests : ReactiveTest
{
    [Test]
    public void ReorderingTest1()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(400, 3),
            OnNext(500, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(500, 4));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left,right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(300, 2),
            OnNext(400, 3),
            OnNext(500, 4));
    }

    [Test]
    public void ReorderingTest2()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 4),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(400, 1));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left, right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(400, 1),
            OnNext(400, 2),
            OnNext(400, 3),
            OnNext(400, 4));
    }
}

柯里化避免重复

最后附上一条评论:为了避免在代码中重复自己,这里有一种小技巧可以避免我在第二种方法中调用Sort时的重复方式。我没有将其包含在主体中,以免让不熟悉柯里化的读者感到困惑:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
        IObservable<TSource> right,
        Func<TSource, TKey> keySelector,
        TKey firstKey,
        Func<TKey, TKey> nextKeyFunc,
        Func<TSource, TSource, TSource> resultSelector)
{
    Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
        events => events.Sort(keySelector, firstKey, nextKeyFunc);

    return Observable.Zip(
        curriedSort(left),
        curriedSort(right),
        resultSelector);
}

编辑 - 为了更好的清晰度进行了全面改写,并且我提供了一种新的和改进的方法。 - James World
你也可以扩展它,以取代TSource的TLeft和TRight,并具有相应的Func<TLeft,TKey>Func<TRight,TKey>键选择器...但我现在已经完成了。 - James World
1
这是一个非常棒的回答!非常感谢您的努力!我从中学到了很多东西。 - asgerhallas
对于来自不同线程的事件,您是否意味着在使用OrderedCollect()之前只需使用.ObserveOn()是一种安全的方法? - asgerhallas
好的,看起来像是Subject滥用。 :) 你不能同时调用OnNext。尝试在.events后面链接一个.Synchronize()。它是修复不良可观察对象的辅助方法! - James World
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接