我该如何查看响应式扩展查询的运行情况?

24

我正在编写一个包含许多算子的复杂反应式扩展查询。如何查看其运行情况?

我提出这个问题并作出回答,因为它经常出现,而且可能有很好的普遍用途。

2个回答

47
您可以在开发 Rx 操作符时随意添加此函数以查看正在发生的情况:
    public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
    {
        opName = opName ?? "IObservable";
        Console.WriteLine("{0}: Observable obtained on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId);

        return Observable.Create<T>(obs =>
        {
            Console.WriteLine("{0}: Subscribed to on Thread: {1}",
                              opName,
                              Thread.CurrentThread.ManagedThreadId);

            try
            {
                var subscription = source
                    .Do(x => Console.WriteLine("{0}: OnNext({1}) on Thread: {2}",
                                                opName,
                                                x,
                                                Thread.CurrentThread.ManagedThreadId),
                        ex => Console.WriteLine("{0}: OnError({1}) on Thread: {2}",
                                                 opName,
                                                 ex,
                                                 Thread.CurrentThread.ManagedThreadId),
                        () => Console.WriteLine("{0}: OnCompleted() on Thread: {1}",
                                                 opName,
                                                 Thread.CurrentThread.ManagedThreadId)
                    )
                    .Subscribe(obs);
                return new CompositeDisposable(
                    subscription,
                    Disposable.Create(() => Console.WriteLine(
                          "{0}: Cleaned up on Thread: {1}",
                          opName,
                          Thread.CurrentThread.ManagedThreadId)));
            }
            finally
            {
                Console.WriteLine("{0}: Subscription completed.", opName);
            }
        });
    }

这是一个使用示例,展示了Range的微妙行为差异:

Observable.Range(0, 1).Spy("Range").Subscribe();

给出输出:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7

但是这个:

Observable.Range(0, 1, Scheduler.Immediate).Spy("Range").Subscribe();

输出结果为:

Range: Observable obtained on Thread: 7
Range: Subscribed to on Thread: 7
Range: OnNext(0) on Thread: 7
Range: OnCompleted() on Thread: 7
Range: Subscription completed.
Range: Cleaned up on Thread: 7

看出区别了吗?

很明显,您可以将此更改为写入日志或调试,或使用预处理器指令对发布版本进行精简的传递订阅等...

您可以在运算符链中应用 Spy。例如:

Observable.Range(0,3).Spy("Range")
          .Scan((acc, i) => acc + i).Spy("Scan").Subscribe();

输出结果为:

Range: Observable obtained on Thread: 7
Scan: Observable obtained on Thread: 7
Scan: Subscribed to on Thread: 7
Range: Subscribed to on Thread: 7
Range: Subscription completed.
Scan: Subscription completed.
Range: OnNext(1) on Thread: 7
Scan: OnNext(1) on Thread: 7
Range: OnNext(2) on Thread: 7
Scan: OnNext(3) on Thread: 7
Range: OnCompleted() on Thread: 7
Scan: OnCompleted() on Thread: 7
Range: Cleaned up on Thread: 7
Scan: Cleaned up on Thread: 7

我相信您能找到适合自己目的的方法来丰富它。


1
如果 Do(x => Console.WriteLine(...)) 不够用,这是一个相当不错的解决方案。 - Bryan Anderson
1
我通过将订阅放入一个字段中并将其包装在CompositeDisposable中返回,同时使用Console.WriteLine记录取消订阅的情况,使代码更加丰富。不错。 - Benjol
1
不错!需要注意的一点是,即使订阅者没有调用Dispose()Create方法也会自动调用你返回的IDisposable对象的Dispose()方法。当订阅者取消订阅或可观察对象终止时,Dispose方法将被调用,以先发生的为准。这样做是为了尽早清理资源,所以请记住,这可能不是取消订阅的结果。 - James World
1
FYI Rxx有许多Trace扩展可以做这种事情。 - Brandon
3
再次回来说这个扩展方法仍然经常“拯救我的生命” :) - Benjol
显示剩余5条评论

8

又过了三年,我仍在使用你的想法。 我的版本现在已经发展成以下几点:

  • 重载以选择日志记录的目标
  • 记录订阅数
  • 记录来自不良订阅者的“下游”异常。

代码:

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName = null)
{
    return Spy(source, opName, Console.WriteLine);
}

public static IObservable<T> Spy<T>(this IObservable<T> source, string opName, 
                                                              Action<string> logger)
{
    opName = opName ?? "IObservable";
    logger($"{opName}: Observable obtained on Thread: {Thread.CurrentThread.ManagedThreadId}");

    var count = 0;
    return Observable.Create<T>(obs =>
    {
        logger($"{opName}: Subscribed to on Thread: {Thread.CurrentThread.ManagedThreadId}");
        try
        {
            var subscription = source
                .Do(x => logger($"{opName}: OnNext({x}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    ex => logger($"{opName}: OnError({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}"),
                    () => logger($"{opName}: OnCompleted() on Thread: {Thread.CurrentThread.ManagedThreadId}")
                )
                .Subscribe(t =>
                {
                    try
                    {
                        obs.OnNext(t);
                    }
                    catch(Exception ex)
                    {
                        logger($"{opName}: Downstream exception ({ex}) on Thread: {Thread.CurrentThread.ManagedThreadId}");
                        throw;
                    }
                }, obs.OnError, obs.OnCompleted);

            return new CompositeDisposable(
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) on Thread: {Thread.CurrentThread.ManagedThreadId}")),
                    subscription,
                    Disposable.Create(() => Interlocked.Decrement(ref count)),
                    Disposable.Create(() => logger($"{opName}: Dispose (Unsubscribe or Observable finished) completed, {count} subscriptions"))
                );
        }
        finally
        {
            Interlocked.Increment(ref count);
            logger($"{opName}: Subscription completed, {count} subscriptions.");
        }
    });
}

很高兴听到它对你有所帮助,还有不错的装饰! - James World

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接