我正在多个线程中使用UDP接收消息。每次接收后,我会通过 MessageReceived.OnNext(message)
来触发消息。
由于我在使用多个线程,所以触发的消息是无序的,这是一个问题。
如何按照消息计数器对消息进行顺序触发呢?(假设有一个 message.counter 属性)
需要考虑到通信中可能会丢失一些消息(假设如果在 X 条消息后出现了一条消息计数器不连续的空缺,那么下一条消息是空缺的,则我会触发下一条消息)
立即触发消息(如果接收到下一条计数器)。
我正在多个线程中使用UDP接收消息。每次接收后,我会通过 MessageReceived.OnNext(message)
来触发消息。
由于我在使用多个线程,所以触发的消息是无序的,这是一个问题。
如何按照消息计数器对消息进行顺序触发呢?(假设有一个 message.counter 属性)
需要考虑到通信中可能会丢失一些消息(假设如果在 X 条消息后出现了一条消息计数器不连续的空缺,那么下一条消息是空缺的,则我会触发下一条消息)
立即触发消息(如果接收到下一条计数器)。
在说明检测丢失信息的要求时,您没有考虑到最后一条消息可能不会到达。我添加了一个timeoutDuration
,如果在规定时间内未收到任何消息,则清除缓冲的消息 - 您可能需要将此视为错误,请参阅注释以了解如何处理。
我将通过定义具有以下签名的扩展方法来解决此问题:
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
TimeSpan timeoutDuration = new TimeSpan(),
int gapTolerance = 0)
source
是未排序消息的流keySelector
是一个从消息中提取int
键的函数。我假设第一个寻找的键是0;如果需要,可以修改。timeoutDuration
如上所述,如果省略,则没有超时tolerance
是等待无序消息时保留的最大消息数量。传递0
以保留任意数量的消息scheduler
是用于超时的调度程序,供测试目的使用。如果未给出,默认值将被使用。这里我将逐行讲解。完整的实现在下面重复显示。
首先,如果没有提供调度程序,我们必须分配一个默认调度程序:
scheduler = scheduler ?? Scheduler.Default;
如果请求了超时,我们将使用一份副本替换源代码。如果在timeoutDuration
中没有收到消息,它将简单地终止并发送OnCompleted
。
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
TimeoutException
,只需删除Timeout
的第二个参数 - 空流,选择执行此操作的重载。请注意,我们可以安全地与所有订阅者共享此内容,因此将其定位在Observable.Create
之外的位置。Observable.Create
来构建我们的流。作为Create
参数的Lambda函数在每次发生订阅时被调用,我们会向它传递调用观察者(o
)。Create
返回我们的IObservable<T>
,因此我们在此处返回它。return Observable.Create<TSource>(o => { ...
我们将跟踪下一个预期的键值nextKey
,并创建一个 SortedDictionary
来保存无序的消息,直到可以发送它们。
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
现在我们可以订阅消息流(可能应用超时)。首先,我们介绍OnNext
处理程序。下一条消息被分配给x
:
return source.Subscribe(x => { ...
keySelector
函数从消息中提取键:var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
nextKey
并发送消息:if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
或者,我们可能会有一条未按顺序的未来消息,这种情况下我们必须将其添加到我们的缓冲区。如果我们这样做,我们还必须确保我们的缓冲区没有超过存储未按顺序消息的容忍度 - 在这种情况下,我们还将nextKey
提升到缓冲区中第一个键,因为它是一个SortedDictionary
,所以方便地是下一个最低的键:
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
无论以上结果如何,我们需要清空缓冲区中现在已经准备好的任何键。我们使用一个辅助方法来完成这个任务。请注意,它会调整nextKey
,因此我们必须小心地通过引用传递它。我们只需循环读取、删除和发送消息,只要键跟随彼此,每次增加nextKey
:
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
接下来我们提供一个OnError
处理程序 - 这将通过任何错误传递,包括如果您选择这种方法的Timeout异常。
最后,我们必须处理OnCompleted
。在这里,我选择清空缓冲区 - 如果无序消息阻塞了消息并且从未到达,则这是必要的。这就是为什么我们需要一个超时时间:
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
以下是完整的实现。
public static IObservable<TSource> Sort<TSource>(
this IObservable<TSource> source,
Func<TSource, int> keySelector,
int gapTolerance = 0,
TimeSpan timeoutDuration = new TimeSpan(),
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
if(timeoutDuration != TimeSpan.Zero)
source = source.Timeout(
timeoutDuration,
Observable.Empty<TSource>(),
scheduler);
return Observable.Create<TSource>(o => {
int nextKey = 0;
var buffer = new SortedDictionary<int, TSource>();
return source.Subscribe(x => {
var key = keySelector(x);
// drop stale keys
if(key < nextKey) return;
if(key == nextKey)
{
nextKey++;
o.OnNext(x);
}
else if(key > nextKey)
{
buffer.Add(key, x);
if(gapTolerance != 0 && buffer.Count > gapTolerance)
nextKey = buffer.First().Key;
}
SendNextConsecutiveKeys(ref nextKey, o, buffer);
},
o.OnError,
() => {
// empty buffer on completion
foreach(var item in buffer)
o.OnNext(item.Value);
o.OnCompleted();
});
});
}
private static void SendNextConsecutiveKeys<TSource>(
ref int nextKey,
IObserver<TSource> observer,
SortedDictionary<int, TSource> buffer)
{
TSource x;
while(buffer.TryGetValue(nextKey, out x))
{
buffer.Remove(nextKey);
nextKey++;
observer.OnNext(x);
}
}
如果您在控制台应用程序中包含nuget rx-testing
,则以下内容将为您提供一个测试工具以进行操作:
public static void Main()
{
var tests = new Tests();
tests.Test();
}
public class Tests : ReactiveTest
{
public void Test()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(
OnNext(100, 0),
OnNext(200, 2),
OnNext(300, 1),
OnNext(400, 4),
OnNext(500, 5),
OnNext(600, 3),
OnNext(700, 7),
OnNext(800, 8),
OnNext(900, 9),
OnNext(1000, 6),
OnNext(1100, 12),
OnCompleted(1200, 0));
//var results = scheduler.CreateObserver<int>();
xs.Sort(
keySelector: x => x,
gapTolerance: 2,
timeoutDuration: TimeSpan.FromTicks(200),
scheduler: scheduler).Subscribe(Console.WriteLine);
scheduler.Start();
}
}
这里有各种有趣的替代方法。我选择了这种主要的命令式方法,因为我认为它最容易理解 - 但是可能有些花哨的分组技巧可以用来做到这一点。我知道关于Rx一个始终如一的事情 - 总会有很多种方式来解决问题!
在这里,我也不完全满意超时的想法 - 在生产系统中,我希望实现一些检查连接的手段,例如心跳或类似的方法。我没有深入探讨这一点,因为显然它将是应用程序特定的。此外,心跳已经在这些论坛和其他地方被讨论过(例如,在我的博客上)。
datagrams.Buffer(time).Scan(...).Select(...)
- Dave SextonObservable.Create
实际上更简单。 - Dave Sexton
OrderByUntil
变体,以类似的方式解决这个问题,但最终它对我来说似乎有些奇怪。你认为值得实现还是只是一般情况下对一个糟糕问题的不错解决方案? - Dave Sexton