递归/扩展在响应式扩展中的含义

4
我正试图构建一个Rx流水线,其工作方式如下:
1. 我编写了一个函数,该函数接收一个IObservable,提供有关公司信息的档案。 2. 我查询各种数据源以查找可能相关的公司档案,全部并行处理。我将它们合并成单个公司档案的IObservable。 3. 当我获取这些可能相关的档案时,我将它们与我已经观察过的档案进行比较,如果它们的相关性>80%且与我已经观察过的任何档案不同,则认为它们是匹配的。 4. 我想将匹配的公司重新输入步骤1,以便我可以搜索与这些新匹配档案相关的数据。
我使用一些已知的好档案来启动这个过程。
最终,没有更多的匹配档案可以被看到,因此进程结束。
我在编写这个程序时遇到了麻烦。如果我使用Subject允许管道的尾端将其档案发送到工作流程的开头,那么没有人会调用OnCompleted,我就永远不知道进程何时结束。如果我改为使用递归开发,由于我尝试使用其自身的返回值来调用函数,所以似乎总是出现堆栈溢出问题。
有谁能帮我解决如何以一种方式完成此任务,以便我可以确定该过程何时结束?

如果我要使用硬件术语,您是否正在寻找多路复用器-解复用器? - JerKimball
@JerKimball在这个情境下,我不确定你的意思是什么。 - David Pfeffer
嗯...让我试着重新表述一下:您有多个来源,希望将它们混合成一个流,然后从该单个流中“解开”事件,使其返回到相应的多个流侧? - JerKimball
@JerKimball 不,我不需要“取消合并”已合并的流。我需要将已合并的流重新输入到生成它的Rx工作流的开头。这是一个递归过程。 - David Pfeffer
1个回答

8
听起来您想要这样的数据流程:
seed profiles --> source --> get related --> output
                     ^                    |
                     |                    v
                     -<--- transform <-----

这似乎是一个解决通用问题比解决特定问题更容易的情况,因此我将建议提供一个通用的“反馈”函数,应该能够给你需要的构建块:

编辑:修复了函数以使其完整

IObservable<TResult> Feedback<T, TResult>(this IObservable<T> seed,
                                          Func<T, IObservable<TResult>> produce,
                                          Func<TResult, IObservable<T>> feed)
    {
        return Observable.Create<TResult>(
                obs =>
                {
                    var ret = new CompositeDisposable();
                    Action<IDisposable> partComplete = 
                        d =>
                        {
                            ret.Remove(d);
                            if (ret.Count == 0) obs.OnCompleted();
                        };
                    Action<IObservable<T>, Action<T>> ssub =
                        (o, n) =>
                        {
                            var disp = new SingleAssignmentDisposable();
                            ret.Add(disp);
                            disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
                        };
                    Action<IObservable<TResult>, Action<TResult>> rsub =
                        (o, n) =>
                        {
                            var disp = new SingleAssignmentDisposable();
                            ret.Add(disp);
                            disp.Disposable = o.Subscribe(n, obs.OnError, () => partComplete(disp));
                        };

                    Action<T> recurse = null;
                    recurse = s =>
                              {
                                  rsub(produce(s),
                                       r => 
                                       {
                                           obs.OnNext(r);
                                           ssub(feed(r), recurse);
                                       });
                              };

                    ssub(seed, recurse);
                    return ret;
                });
    }

在你的情况中,TTResult看起来是一样的,因此feed将成为恒等函数。produce将用于实现步骤2和3的函数。
以下是我使用该功能测试的一些示例代码:
void Main()
{
    var seed = new int[] { 1, 2, 3, 4, 5, 6 };
    var found = new HashSet<int>();
    var mults = seed.ToObservable()
                    .Feedback(i =>
                              {
                                  return Observable.Range(0, 5)
                                         .Select(r => r * i)
                                         .TakeWhile(v => v < 100)
                                         .Where(v => found.Add(v));
                              },
                              i => Observable.Return(i));

    using (var disp = mults.Dump())
    {
        Console.WriteLine("Press any key to stop");
        Console.ReadKey();
    }
    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}

static IDisposable Dump<T>(this IObservable<T> source)
{
    return source.Subscribe(item => Console.WriteLine(item),
                            ex => Console.WriteLine("Error occurred in dump observable: " + ex.ToString()),
                            () => Console.WriteLine("Dump completed"));
}

这非常接近,但是我还需要知道进程“停止”的时间。由于每次递归时,我都会过滤并发送比我收到的结果更少的结果,最终该过程将没有项目发送回开始步骤,并且该过程完成。我需要能够在不使用任意超时的情况下检测到这一点。 - David Pfeffer
@DavidPfeffer 我甚至没有考虑过完成的问题。原始版本的问题当然是前向和后向可观察对象都在等待另一个完成才能完成。我已经编辑了另一个版本,跟踪未完成的可观察对象,并在没有剩余时完成。尽管需要进行彻底的测试,但可能存在竞争条件,但我的基本测试仍然有效。 - Gideon Engelberth
更接近了!我最后剩下的问题是因为没有建立流水线,而是一个函数被递归调用,所以我不能使用有状态的运算符。例如,在您的图表中,“转换”步骤无法明确包含“Distinct”,因为它只会对单个结果批处理执行操作,而不是整个过程。 - David Pfeffer
@DavidPfeffer 这个示例程序展示了一种实现的方式。我不确定它有多普适,但你可以将集合管理封装成一个函数,例如 Func<T, IObservable<R>> DistinctForAll<T, R>(Func<T, IObservable<R>> selector) - Gideon Engelberth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接