有没有Ix.NET (System.Interactive)的示例代码?

7

我有一个异步方法,比如说:

public async Task<T> GetAsync()
{

}

并且将从以下位置被调用:

public async Task<IEnumerable<T>> GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        yield return result;
    }
}

上述语法不是有效的,但我实际上是在寻找异步生成器。我知道它可以通过Observable进行处理。我尝试过Rx.NET并且它在某种程度上可以工作。但是,我试图避免它给代码库带来的复杂性,更重要的是上述要求仍然本质上不是一个反应性系统(我们仍然是基于拉取的)。例如,我只需要在一定时间内监听传入的异步流,并且我必须从消费者端停止生产者(不仅仅是取消订阅消费者)。
我可以像这样反转方法签名:
public IEnumerable<Task<T>> GetAllAsync()

但这使得在不阻塞的情况下进行LINQ操作有些棘手。我希望它既是非阻塞的,又不会将整个内容加载到内存中。这个库:AsyncEnumerable正好符合我的需求,但如何使用Ix.NET实现相同的效果?我认为它们的目的是相同的。

换句话说,当处理await时,我如何利用Ix.NET生成一个IAsyncEnumerable?比如:

public async IAsyncEnumerable GetAllAsync()
{
    foreach (var item in something)
    {
        var result = await GetAsync();
        return // what?
    }
}

我在这里找到了一个Ix.NET的扩展库,可以帮助我解决这个问题:https://github.com/CXuesong/AsyncEnumerableExtensions - nawfal
这个链接 https://weblogs.asp.net/dixin/linq-to-objects-interactive-extensions-ix 也回答了它。 - nawfal
这个链接也很有帮助:https://stu.dev/iasyncenumerable-introduction/ - nawfal
1个回答

5

使用来自NuGet的System.Linq.Async 4.0.0,现在您可以使用 SelectAwait

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something
            .ToAsyncEnumerable()
            .SelectAwait(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

(已过时)

使用来自 NuGet 的 System.Interactive.Async 3.2.0,这个怎么样?目前 Select() 不支持异步 lambda,你需要自己实现。

更好的异步支持 - Task based 的 AsyncEnumerable 重载

class Program
{
    static void Main(string[] args)
    {
        Task.Run(async () =>
            await GetAllAsync().ForEachAsync((x) => Console.WriteLine(x)));

        Thread.Sleep(4000);
    }

    static IAsyncEnumerable<string> GetAllAsync()
    {
        var something = new[] { 1, 2, 3 };

        return something.SelectAsync(async (x) => await GetAsync(x));
    }

    static async Task<string> GetAsync(int item)
    {
        await Task.Delay(1000); // heavy
        return "got " + item;
    }
}

static class AsyncEnumerableExtensions
{
    public static IAsyncEnumerable<TResult> SelectAsync<T, TResult>(this IEnumerable<T> enumerable, Func<T, Task<TResult>> selector)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var enumerator = enumerable.GetEnumerator();
            var current = default(TResult);
            return AsyncEnumerable.CreateEnumerator(async c =>
                {
                    var moveNext = enumerator.MoveNext();
                    current = moveNext
                        ? await selector(enumerator.Current).ConfigureAwait(false)
                        : default(TResult);
                    return moveNext;
                },
                () => current,
                () => enumerator.Dispose());
        });
    }
}

扩展方法是从此示例中引用的。 https://github.com/maca88/AsyncGenerator/issues/94#issuecomment-385286972

这个例子在 System.Interactive.Async 的 v4 版本中无法编译。 - nwsmith

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接