我正在尝试对来自不同线程的无序事件进行重新排序。
是否可以创建一个反应式扩展查询以匹配这些弹珠图:
s1 1 2 3 4
s2 1 3 2 4
result 1 2 3 4
并且...
s1 1 2 3 4
s2 4 3 2 1
result 1234
即:只按版本号顺序发布结果。
我最接近的方法是使用连接操作,每次s1计时时打开一个窗口,并且仅在s2到达相同数字时关闭该窗口。
像这样:
var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
publishedEvents.Scan(0, (i, o) => i + 1),
expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
_ => Observable.Never<Unit>(),
(@event, expectedVersion) => new {@event,expectedVersion})
.Where(x => x.expectedVersion == x.@event.Version)
.Select(x => x.@event)
.Subscribe(Persist);
但对于图2将不起作用。当s2以数字2进行滴答声时,组2将完成,因此先于1。
有意义吗?可以用Rx做到吗?是否应该这样做?
编辑:我想它就像重叠的窗口,后面的窗口在所有前面的窗口关闭之前不能关闭。而前面的窗口在窗口编号匹配事件版本号之前不会关闭。
编辑2:
现在我有类似这样的东西,但它并不是我所期望的反应式、函数式、线程安全的LINQ启示(请暂时忽略我的事件是JObjects):
var orderedEvents = Observable.Create<JObject>(observer =>
{
var nextVersionExpected = 1;
var previousEvents = new List<JObject>();
return events
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(@event =>
{
previousEvents.Add(@event);
var version = (long) @event["Version"];
if (version != nextVersionExpected) return;
foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
{
if ((long) previousEvent["Version"] != nextVersionExpected)
break;
observer.OnNext(previousEvent);
previousEvents.Remove(previousEvent);
nextVersionExpected++;
}
});
});
Func<TLeft,TKey>
和Func<TRight,TKey>
键选择器...但我现在已经完成了。 - James WorldSubject
滥用。 :) 你不能同时调用OnNext
。尝试在.events
后面链接一个.Synchronize()
。它是修复不良可观察对象的辅助方法! - James World