异步LINQ——非惰性?多线程?

3

我有以下代码:

var things = await GetDataFromApi(cancellationToken);

var builder = new StringBuilder(JsonSerializer.Serialize(things));

await things
    .GroupBy(x => x.Category)
    .ToAsyncEnumerable()
    .SelectManyAwaitWithCancellation(async (category, ct) =>
    {
        var thingsWithColors = await _colorsApiClient.GetColorsFor(category.Select(thing => thing.Name).ToList(), ct);
        
        return category
            .Select(thing => ChooseBestColor(thingsWithColors))
            .ToAsyncEnumerable();
    })
    .ForEachAsync(thingAndColor =>
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId); // prints different IDs
        builder.Replace(thingAndColor.Thing, $"{thingAndColor.Color} {thingAndColor.Thing}");
    }, cancellationToken);

它使用了System.Linq.Async,我觉得很难理解。在“传统”的同步LINQ中,只有在调用ToList()ToArray()时才会执行整个操作。在上面的示例中,没有这样的调用,但是lambda表达式仍然被执行。它是如何工作的?

我担心的另一个问题是关于多线程的。我听说过很多次async!= multithreading。那么,为什么Console.WriteLine(Thread.CurrentThread.ManagedThreadId);会输出各种ID呢?其中一些ID被多次打印,但总体上输出了大约5个线程ID。我的代码没有显式地创建任何线程。所有都是异步等待。

StringBuilder不支持多线程,我想了解上面的实现是否有效。

请忽略我的代码算法,它并不真正重要,这只是一个示例。重要的是使用System.Async.Linq。


1
在异步方法中,第一个 await 之后的代码 可能 在不同的线程上运行。 - Klaus Gütter
因此,我的代码不安全,因为StringBuilder不支持多线程? - mnj
或许即使有不同的线程,它们也从未并行,这种情况下使用 StringBuilder 会没问题吗? - mnj
@PanagiotisKanavos 是的,我知道代码很奇怪,我只是在探索 System.Linq.Async。感谢您的参与。 - mnj
@mnj,你还没有解释你想做什么。创建一个数据流块管道很容易,从一个API调用获取项目列表,每个项目并行执行一个API调用,然后将结果组合并将所有内容写入文件。您可以使用接收/返回通道或IAsyncEnumerable的函数来完成相同的操作,但在内部使用Parallel.ForEachAsync并行进行调用。LINQ Async可用于过滤或转换,但不能用于实际并发执行任务。 - Panagiotis Kanavos
显示剩余5条评论
2个回答

5

ForEachAsync 的效果类似于 ToList/ToArray,因为它强制评估整个列表。

默认情况下,await 后的任何内容都将继续在同一 执行上下文 中运行,这意味着如果代码运行在 UI 线程上,则将继续在 UI 线程上运行。如果它运行在后台线程上,它将继续在后台线程上运行,但不一定是 同一个 后台线程。

然而,您的代码中没有任何应该以 并行方式 运行的部分。这并不一定意味着它是线程安全的,可能需要一些内存屏障来确保数据正确地刷新,但我会假设这些屏障是由框架代码本身发出的。


3

System.Async.Linq 以及整个 dotnet/reactive 存储库目前是一个半废弃的项目。 GitHub上的问题已经积累了,官方几乎一年没有回复。除了每种方法顶部的源代码中的XML文档之外,没有发布任何文档。您无法真正使用此库而不学习源代码,通常很容易做到这一点,因为代码很短,易读且实际上并没有做太多事情。此库提供的功能类似于在System.Linq中找到的功能,主要区别在于输入是IAsyncEnumerable<T>而不是IEnumerable<T>,委托可以返回包装在ValueTask<T>中的值。

除了一些运算符(如 Merge仅有的其中一个重载)之外,System.Async.Linq 不会引入并发。异步操作是一个接一个地调用,然后在调用下一个操作之前等待它们。 SelectManyAwaitWithCancellation 运算符不是例外情况。对于每个元素,依次调用 selector ,并枚举其生成的IAsyncEnumerable<TResult>,其值一个接一个地产生。因此,不太可能创建线程安全问题。

ForEachAsync 操作符仅是标准的await foreach循环的替代品,在 C# 8 之前语言对 await foreach 的支持不完善时包含在库中。我建议不要使用此操作符,因为它与新的Parallel.ForEachAsync API 相似,可能会造成混淆。以下是 ForEachAsync 操作符源代码中的内容:
// REVIEW: Once we have C# 8.0 language support, we may want to do away with these
//         methods. An open question is how to provide support for cancellation,
//         which could be offered through WithCancellation on the source. If we still
//         want to keep these methods, they may be a candidate for
//         System.Interactive.Async if we consider them to be non-standard
//         (i.e. IEnumerable<T> doesn't have a ForEach extension method either).

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接