将一系列数据流合并成一个最新值的数据流

7

我有一个IObservable<IObservable<T>>,其中每个内部的IObservable<T>是一系列值后面跟随一个OnCompleted事件的流。

我想将其转换为IObservable<IEnumerable<T>>,即由任何未完成的内部流的最新值组成的流。每当一个内部流产生新值(或内部流过期)时,它应该生成一个新的IEnumerable<T>

可以用大理石图表来展示它(我希望这足够全面):

input ---.----.---.----------------
         |    |   '-f-----g-|      
         |    'd------e---------|
         'a--b----c-----|          

result ---a--b-b--c-c-c-e-e-e---[]-
               d  d d e f g        
                    f f            

([] 是一个空的 IEnumerable<T>-| 表示 OnCompleted)

你可以看到它有点类似于 CombineLatest 操作。 我一直在尝试使用 JoinGroupJoin,但都没有成功,但我感到这几乎肯定是正确的方向。

我希望在此操作符中尽可能少地使用状态。

更新

我已经更新了这个问题,不仅包括单值序列 - 产生的 IObservable<IEnumerable<T>> 应该仅包括每个序列的最新值 - 如果一个序列没有产生值,则不应包括它。


显然你需要一些状态在这里。 - Lasse V. Karlsen
@LasseV.Karlsen,你能解释一下为什么吗? - Alex
不使用状态是不可能的,因为按照定义,操作符需要跟踪每个内部序列直到其完成。 - Brandon
昨天你的解决方案可以工作,只需要一些小调整。如果你能重新发布它,我会向你展示这些调整。 - Brandon
@Brandon - 我已经恢复了我认为你想要的解决方案。感谢你的帮助,非常感激。 - Alex
3个回答

3
这是基于您昨天解决方案的版本,根据新的要求进行了微调。基本思路是将一个引用放入您的易腐集合中,然后随着内部序列产生新值来更新该引用的值。
我还修改了以正确跟踪内部订阅并在取消外部可观察对象订阅时取消订阅。
还修改以在任何流产生错误时拆除它们全部。
最后,我修复了一些可能违反Rx准则的竞态条件。如果您的内部可观察对象从不同的线程并发地触发,那么您可能会同时调用obs.OnNext,这是绝对不允许的。因此,我使用相同的锁门控每个内部可观察对象,以防止发生这种情况(请参见Synchronize调用)。请注意,由于这一点,您可能可以使用常规双向链表而不是PerishableCollection,因为现在使用集合的所有代码都在锁内,因此不需要PerishableCollection的线程保证。
// Acts as a reference to the current value stored in the list
private class BoxedValue<T>
{
    public T Value;
    public BoxedValue(T initialValue) { Value = initialValue; }
}

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<BoxedValue<T>>();
        var outerSubscription = new SingleAssignmentDisposable();
        var subscriptions = new CompositeDisposable(outerSubscription);
        var innerLock = new object();

        outerSubscription.Disposable = source.Subscribe(duration =>
        {
            BoxedValue<T> value = null;
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            var subscription = new SingleAssignmentDisposable();

            subscriptions.Add(subscription);
            subscription.Disposable = duration.Synchronize(innerLock)
                .Subscribe(
                    x =>
                    {
                        if (value == null)
                        {
                            value = new BoxedValue<T>(x);
                            collection.Add(value, lifetime.Lifetime);
                        }
                        else
                        {
                            value.Value = x;
                        }
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                    },
                    obs.OnError, // handle an error in the stream.
                    () => // on complete
                    {
                        if (value != null)
                        {
                            lifetime.Dispose(); // removes the item
                            obs.OnNext(collection.CurrentItems().Select(p => p.Value.Value));
                            subscriptions.Remove(subscription); // remove this subscription
                        }
                    }
            );
        });

        return subscriptions;
    });
}

谢谢这个 - 它很容易理解,尽管我想知道是否可能以不可变的方式完成。如果能够增加线程安全性,再点一个赞! - Alex
小细节:.NET Framework 提供了 System.Runtime.CompilerServices.StrongBox,其实际上与你的 BoxedValue 是相同的。 - Mark Hurd

0

Dave Sexton提供的另一种解决方案Rxx的创建者 - 它使用Rxx.CombineLatest,在实现上似乎与Brandon的解决方案非常相似:

public static IObservable<IEnumerable<T>> CombineLatestEagerly<T>(this IObservable<IObservable<T>> source)
{
  return source
    // Reify completion to force an additional combination:
    .Select(o => o.Select(v => new { Value = v, HasValue = true })
                  .Concat(Observable.Return(new { Value = default(T), HasValue = false })))
    // Merge a completed observable to force combination with the first real inner observable:
    .Merge(Observable.Return(Observable.Return(new { Value = default(T), HasValue = false })))
    .CombineLatest()
    // Filter out completion notifications:
    .Select(l => l.Where(v => v.HasValue).Select(v => v.Value));
}

是的,我本来想提到 Rxx 有一个 CombineLatest 重载函数几乎满足你的需求。对此感到抱歉。由于我的项目中使用了 Rxx,如果我要解决这个问题,我会使用这个解决方案。代码比我的更小,比 Matthew 的更易理解。 - Brandon
我会有点担心,即使内部流已经完成(即使没有被看到),最后一个值仍然会留在集合中。这意味着随着足够多的流,'剩余'项的数量会随着时间的推移而变得相当大。使用你的解决方案,一旦流完全结束,盒装值就会从集合中移除。 - Alex
啊,是的,那是一个问题,我没有意识到它存在。 - Brandon

0

这个解决方案适用于单项流,但不幸的是会在内部流中累积每个项目,直到完成。

public static IObservable<IEnumerable<T>> MergeLatest<T>(this IObservable<IObservable<T>> source)
{
    return Observable.Create<IEnumerable<T>>(obs =>
    {
        var collection = new PerishableCollection<T>();
        return source.Subscribe(duration =>
        {
            var lifetime = new DisposableLifetime(); // essentially a CancellationToken
            duration
                .Subscribe(
                    x => // on initial item
                    {
                        collection.Add(x, lifetime.Lifetime);
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value));
                    },
                    () => // on complete
                    {
                        lifetime.Dispose(); // removes the item
                        obs.OnNext(collection.CurrentItems().Select(p => p.Value));
                    }
            );
        });
    });
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接