如何在SelectMany语句中处理来自异步方法的异常

6

我正在尝试使用Rx异步处理一些任务,例如:

var list = Enumerable.Range(0, 100)
    .ToObservable()
    .SelectMany(x => Observable.Start(() => {
        Console.WriteLine("Processing {0} ...", x);

        Thread.Sleep(100 * x % 3);

        if (x > 90) {
            Console.WriteLine("Procesing exception {0} > 90", x);
            throw new Exception("Value too large");
        }
        Console.WriteLine("Processing {0} completed.", x);
        return x;
    }))
    .Subscribe(
        x => { Console.WriteLine("Next [{0}]", x); },
        e => {
            Console.WriteLine("Exception:");
            Console.WriteLine(e.Message);
        },
        () => { Console.WriteLine("Complete"); }
    );

我对这段代码的问题在于异常没有传递给订阅者。经过多次尝试后,我放弃了并决定问这个简单的问题:
如何处理在SelectMany语句中的异步方法引发的异常?
为了明确,最终实现是一个同步函数调用,可能会抛出异常。目标是将其传递给订阅者,以便进一步处理(在特定情况下,将向用户显示消息)。
编辑
我将我的发现移到了答案中,以便我可以将此问题标记为已回答。个人上不同意自己回答...但有时候别无选择,请见谅。

1
这能回答你的问题吗? - user981225
并不完全是这样,因为这会抑制异常,但如果没有更好的方法,包装的想法可能会有用。然而,我不确定在我的情况下包装是否有效,因为我正在处理多个异步和并行调用...但我会进行调查,谢谢。 - AxelEckenberger
@user981225,感谢您提供的有价值的信息,但答案相当简单,请查看编辑。 - AxelEckenberger
你可以在哪里获取实现源代码? - Matthew Finlay
使用反射工具,如Reflector(商业版)或justDecompile(免费、闭源),甚至可以使用更多替代方案 - AxelEckenberger
2个回答

3

使用 Materialize 把你的 OnError / OnCompleted 消息转化为通知。

例如:

observable.SelectMany(x => Observable.Start(fn).Materialize())

这将把错误 / 完成信息包装成通知,以便在稍后的实际订阅点中进行处理,而不是在 SelectMany 中终止错误。

这对于大多数异步调用操作非常有用,因为该方法要么失败,要么完成。


+1 如果你有一连串的操作并想将通知推送到公共错误处理程序,Materialize可能会很有趣。这是一个不错的选择,然而正如所述...遗憾的是问题出在电脑前的人,不能正确使用工具... :-) - AxelEckenberger

2
答案 实际上,代码是正常工作的。然而,调试器会在异常处中断,因为异步操作仍在后台执行 - 至少那些在第一个异常发生时已经开始的操作。这让我很困惑!如果你不使用调试器运行代码,异常将被吞噬。所以我想问题确实出现在电脑前面 :-)
关于Observable.Start仍需要一些澄清,我认为(并且正确)实现应该实际上已经包含了一些错误处理...请参见背景。 背景 Observable.Start是一个方便的方法,它使用Observable.ToAsync方法将函数/动作转换为异步操作。如果你查看该方法的实现,你会发现它已经处理/转发了异常。
public static Func<IObservable<TResult>> ToAsync<TResult>(this Func<TResult> function, IScheduler scheduler) {
    if (function != null) {
        if (scheduler != null) {
            return () => {
                AsyncSubject<TResult> asyncSubject = new AsyncSubject<TResult>();
                scheduler.Schedule(() => {
                    TResult result = default(TResult);
                    try {
                        result = function();
                    } catch (Exception exception1) {
                        Exception exception = exception1;
                        asyncSubject.OnError(exception);
                        return;
                    }
                    asyncSubject.OnNext(result);
                    asyncSubject.OnCompleted();
                });
                return asyncSubject.AsObservable<TResult>();
            };
        } else {
            throw new ArgumentNullException("scheduler");
        }
    } else {
        throw new ArgumentNullException("function");
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接