订阅响应式扩展事件

3

我正在多个线程中使用UDP接收消息。每次接收后,我会通过 MessageReceived.OnNext(message) 来触发消息。

由于我在使用多个线程,所以触发的消息是无序的,这是一个问题。

如何按照消息计数器对消息进行顺序触发呢?(假设有一个 message.counter 属性)

需要考虑到通信中可能会丢失一些消息(假设如果在 X 条消息后出现了一条消息计数器不连续的空缺,那么下一条消息是空缺的,则我会触发下一条消息)

立即触发消息(如果接收到下一条计数器)。

2个回答

6

在说明检测丢失信息的要求时,您没有考虑到最后一条消息可能不会到达。我添加了一个timeoutDuration,如果在规定时间内未收到任何消息,则清除缓冲的消息 - 您可能需要将此视为错误,请参阅注释以了解如何处理。

我将通过定义具有以下签名的扩展方法来解决此问题:

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    TimeSpan timeoutDuration = new TimeSpan(),
    int gapTolerance = 0)
  • source 是未排序消息的流
  • keySelector 是一个从消息中提取int键的函数。我假设第一个寻找的键是0;如果需要,可以修改。
  • timeoutDuration 如上所述,如果省略,则没有超时
  • tolerance 是等待无序消息时保留的最大消息数量。传递0以保留任意数量的消息
  • scheduler 是用于超时的调度程序,供测试目的使用。如果未给出,默认值将被使用。

步骤

这里我将逐行讲解。完整的实现在下面重复显示。

分配默认调度程序

首先,如果没有提供调度程序,我们必须分配一个默认调度程序:

scheduler = scheduler ?? Scheduler.Default;

排列超时

如果请求了超时,我们将使用一份副本替换源代码。如果在timeoutDuration中没有收到消息,它将简单地终止并发送OnCompleted

if(timeoutDuration != TimeSpan.Zero)
    source = source.Timeout(
        timeoutDuration,
        Observable.Empty<TSource>(),
        scheduler);

如果您想发送一个TimeoutException,只需删除Timeout的第二个参数 - 空流,选择执行此操作的重载。请注意,我们可以安全地与所有订阅者共享此内容,因此将其定位在Observable.Create之外的位置。
创建订阅处理程序
我们使用Observable.Create来构建我们的流。作为Create参数的Lambda函数在每次发生订阅时被调用,我们会向它传递调用观察者(o)。Create返回我们的IObservable<T>,因此我们在此处返回它。
return Observable.Create<TSource>(o => { ...

初始化一些变量

我们将跟踪下一个预期的键值nextKey ,并创建一个 SortedDictionary 来保存无序的消息,直到可以发送它们。

int nextKey = 0;  
var buffer = new SortedDictionary<int, TSource>();

订阅消息源并处理消息

现在我们可以订阅消息流(可能应用超时)。首先,我们介绍OnNext处理程序。下一条消息被分配给x

return source.Subscribe(x => { ...

我们调用keySelector函数从消息中提取键:
var key = keySelector(x);

如果该消息具有旧密钥(因为它超出了我们的消息顺序容忍度),我们将只是放弃它并完成此消息(您可能希望采取不同的行动):
// drop stale keys
if(key < nextKey) return;

否则,我们可能会有预期的键,这种情况下,我们可以增加 nextKey 并发送消息:
if(key == nextKey)
{
    nextKey++;
    o.OnNext(x);                    
}

或者,我们可能会有一条未按顺序的未来消息,这种情况下我们必须将其添加到我们的缓冲区。如果我们这样做,我们还必须确保我们的缓冲区没有超过存储未按顺序消息的容忍度 - 在这种情况下,我们还将nextKey提升到缓冲区中第一个键,因为它是一个SortedDictionary,所以方便地是下一个最低的键:

else if(key > nextKey)
{
    buffer.Add(key, x);
    if(gapTolerance != 0 && buffer.Count > gapTolerance)
        nextKey = buffer.First().Key;
}

无论以上结果如何,我们需要清空缓冲区中现在已经准备好的任何键。我们使用一个辅助方法来完成这个任务。请注意,它会调整nextKey,因此我们必须小心地通过引用传递它。我们只需循环读取、删除和发送消息,只要键跟随彼此,每次增加nextKey

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}

处理错误

接下来我们提供一个OnError处理程序 - 这将通过任何错误传递,包括如果您选择这种方法的Timeout异常。

刷新缓冲区

最后,我们必须处理OnCompleted。在这里,我选择清空缓冲区 - 如果无序消息阻塞了消息并且从未到达,则这是必要的。这就是为什么我们需要一个超时时间:

() => {
    // empty buffer on completion
    foreach(var item in buffer)
        o.OnNext(item.Value);                
    o.OnCompleted();
});

完整实现

以下是完整的实现。

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    int gapTolerance = 0,
    TimeSpan timeoutDuration = new TimeSpan(),
    IScheduler scheduler = null)
{       
    scheduler = scheduler ?? Scheduler.Default;

    if(timeoutDuration != TimeSpan.Zero)
        source = source.Timeout(
            timeoutDuration,
            Observable.Empty<TSource>(),
            scheduler);

    return Observable.Create<TSource>(o => {
        int nextKey = 0;  
        var buffer = new SortedDictionary<int, TSource>();

        return source.Subscribe(x => {
            var key = keySelector(x);

            // drop stale keys
            if(key < nextKey) return;

            if(key == nextKey)
            {
                nextKey++;
                o.OnNext(x);                    
            }
            else if(key > nextKey)
            {
                buffer.Add(key, x);
                if(gapTolerance != 0 && buffer.Count > gapTolerance)
                    nextKey = buffer.First().Key;
            }
            SendNextConsecutiveKeys(ref nextKey, o, buffer);
        },
        o.OnError,
        () => {
            // empty buffer on completion
            foreach(var item in buffer)
                o.OnNext(item.Value);                
            o.OnCompleted();
        });
    });
}

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}

测试工具

如果您在控制台应用程序中包含nuget rx-testing,则以下内容将为您提供一个测试工具以进行操作:

public static void Main()
{
    var tests = new Tests();
    tests.Test();
}

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();

        var xs = scheduler.CreateColdObservable(
            OnNext(100, 0),
            OnNext(200, 2),
            OnNext(300, 1),
            OnNext(400, 4),
            OnNext(500, 5),
            OnNext(600, 3),
            OnNext(700, 7),
            OnNext(800, 8),
            OnNext(900, 9),            
            OnNext(1000, 6),
            OnNext(1100, 12),
            OnCompleted(1200, 0));

        //var results = scheduler.CreateObserver<int>();

        xs.Sort(
            keySelector: x => x,
            gapTolerance: 2,
            timeoutDuration: TimeSpan.FromTicks(200),
            scheduler: scheduler).Subscribe(Console.WriteLine);

        scheduler.Start();
    }
}

总结

这里有各种有趣的替代方法。我选择了这种主要的命令式方法,因为我认为它最容易理解 - 但是可能有些花哨的分组技巧可以用来做到这一点。我知道关于Rx一个始终如一的事情 - 总会有很多种方式来解决问题!

在这里,我也不完全满意超时的想法 - 在生产系统中,我希望实现一些检查连接的手段,例如心跳或类似的方法。我没有深入探讨这一点,因为显然它将是应用程序特定的。此外,心跳已经在这些论坛和其他地方被讨论过(例如,在我的博客上)。


我曾考虑创建一个 OrderByUntil 变体,以类似的方式解决这个问题,但最终它对我来说似乎有些奇怪。你认为值得实现还是只是一般情况下对一个糟糕问题的不错解决方案? - Dave Sexton
看到你的回答了!大部分是后者 :) - James World

4
强烈建议在需要可靠排序时使用TCP - 这就是它的用途;否则,您将被迫与UDP玩猜谜游戏,有时您会猜错。
例如,假设您按此顺序接收以下数据报:[A,B,D]
当您接收D时,在推送D之前应等待C多长时间?
无论您选择多长时间,都可能出现以下情况:
1.如果C在传输过程中丢失,因此永远不会到达怎么办?
2.如果您选择的持续时间太短,导致您推动了D但随后收到了C,那该怎么办?
也许您可以选择启发式最佳运行时间,但为什么不直接使用TCP呢?
附注:
MessageReceived.OnNext 暗示您正在使用 Subject,这可能是不必要的。考虑将异步 UdpClient 方法直接转换为 observables,或通过编写 async iterator 通过 Observable.Create(async (observer, cancel) => { ... }) 进行转换。

绝对同意 Dave 关于 TCP 的观点 - 但是排序问题很有趣,它偶尔也有有效的应用 - 我在一个自定义的 Tibco 消息方案中遇到了这个问题,其中消息从多台机器上收集。+1 Dave - James World
我完全同意这很有趣,特别是因为 Rx 没有提供任何直接解决这个问题的 out-of-the-box 解决方案。虽然我想像下面这样的东西会起作用:datagrams.Buffer(time).Scan(...).Select(...) - Dave Sexton
尽管最后一个查询意味着所有通知都会延迟,但似乎Observable.Create实际上更简单。 - Dave Sexton
此外,不要忘记 WCF 的许多功能可以在这里提供帮助。例如:http://msdn.microsoft.com/en-us/magazine/cc163648.aspx。 - James World

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接