响应式扩展 BufferWithPredicate

3

Rx有BufferWithTimeBufferWithCountBufferWithTimeOrCount方法,我想编写一个名为BufferWithPredicate的方法,它将如下所示:

public IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)

基本上,除非谓词返回 false ,否则将向现有缓冲区添加新项目;如果谓词返回 false,则会返回缓冲区并开始新的缓冲区。谓词将下一个项目和到目前为止的缓冲区作为参数。
如何实现这一点?
1个回答

3
这应该能满足您的需求。我使用Observable.Defer,以便它也可以与冷的可观察对象一起使用:
public static class MyObservableExtensions
{
    public static IObservable<IList<T>> BufferWithPredicate<T>(this IObservable<T> input, Func<T, IList<T>, bool> predicate)
    {
        return Observable.Defer(() =>
            {
                var result = new Subject<IList<T>>();
                var list = new List<T>();
                input.Subscribe(item =>
                    {
                        if (predicate(item, list))
                        {
                            list.Add(item);
                        }
                        else
                        {
                            result.OnNext(list);
                            list = new List<T>();
                            list.Add(item);
                        }
                    }, 
                    () => result.OnNext(list));
                return result;
            });
    }
}

使用方法:

var observable = new[] { 2, 4, 6, 8, 10, 12, 13, 14, 15 }.ToObservable();
var result = observable.BufferWithPredicate((item, list) => item % 2 == 0);
result.Subscribe(l => Console.WriteLine("New list arrived. Count = {0}", l.Count));

输出:

"New list arrived. Count = 6" 
"New list arrived. Count = 3"

啊哈,我不知道Observable有一些不是IObservable扩展的方法!我的一个问题是:它是否会泄漏订阅,因为它订阅了输入但从未返回可处理对象,允许你销毁订阅? - Martin
不行。这会返回一个延迟的可观察对象 - 如果你不订阅,它将超出范围并被垃圾回收。如果你订阅了,正常的规则适用:调用返回对象上的Dispose方法,或者等待OnCompleted事件触发;无论哪种方式都能确保没有内存泄漏。 - Judah Gabriel Himango

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接