如何从Rx Subscribe调用异步函数?

33
我希望能够在Rx订阅中调用异步函数。例如,像这样:
public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

为了正确捕获异常,需要做些什么?


1
我刚刚发现我可以把async放在第一位。但不幸的是,这并没有解决我面临的问题。我将发布一个更明确的例子。 - Martin Komischke
可能是重复的问题:有没有一种方法可以将观察者订阅为异步 - Zache
4个回答

45

Ana Betts的答案在大多数情况下都有效,但如果你想在等待异步函数完成时阻止流,则需要像这样做:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();
或:
Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();

2
这是另一个使用案例,虽然非常有趣。 - Martin Komischke
你必须非常小心地使用这种方法。如果你的异步处理程序抛出异常,订阅将被终止,你的处理程序将停止接收事件。基本上,如果你需要对可观察事件进行异步处理,你必须将它们排队,并由单独的后台任务消耗。没有其他办法 - 我是通过艰难的方式学到的。 - C-F
相关讨论:https://github.com/dotnet/reactive/issues/459 - undefined

21

请将此更改为:

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();

订阅操作并未将异步操作保留在Observable中。


2
谢谢保罗的建议,非常有趣。你有什么可以让我进一步了解的资料吗? - Martin Komischke
1
对于使用此解决方案的其他人,请注意,如果您需要保持顺序,只需将SelectMany替换为Select,然后使用.Concat(与reijerh's答案类似)。 - bokibeg
2
这将产生一个未完成任务的可观测序列。接下来你该怎么做?你的“订阅者”什么也不做。所有的任务都悬而未决,无法控制异常/终止。这不是一个解决方案。 - C-F

19

不要将async方法传递给Subscribe,因为这会创建一个async void方法。尽量避免使用async void

在你的情况下,我认为你想要对序列中的每个元素调用async方法,然后缓存所有结果。在这种情况下,使用SelectMany调用每个元素的async方法,使用Replay进行缓存(加上Connect来启动):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

我将Results属性更改为从Trigger方法返回,我认为这样更有意义,因此现在测试看起来像这样:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}

1
那是一个很棒的解决方案。谢谢Stephen。 - Martin Komischke
如果您想捕获从RunAsync抛出的异常,该怎么办?我得到的是: obs.SelectMany(x => Observable.FromAsync(() => RunAsync()).Catch(Observable.Empty<string>()))有更好的方法吗? - Victor Grigoriu
@VictorGrigoriu:我建议您提出自己的问题,并使用“system.reactive”标签进行标记。 - Stephen Cleary
1
此解决方案不保留结果的顺序(以防读者试图实现该目标)。要保留顺序,请查看下面的答案。 - bboyle1234
1
这是@Victor Grigoriu的后续问题(附有答案):https://dev59.com/eYnca4cB1Zd3GeqP9liU - Reyhn

1

在reijerh的答案基础上,我创建了一个扩展方法。

public static IDisposable SubscribeAsync<TResult>(this IObservable<TResult> source, Func<Task> action) =>
            source.Select(_ => Observable.FromAsync(action))
                .Concat()
                .Subscribe();

如果我理解正确,它应该会阻塞直到异步任务完成。但是它允许您调用SubscribeAsync并传递您的任务。在我看来,这使得它更易读。
WhenSomethingHappened
    .SubscribeAsync(async () => { 
        await DoFirstAsyncThing(); 
        await DoSecondAsyncThing(); 
    });


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接