Parallel.ForEachAsync没有等待所有任务完成。

3
以下是示例控制台应用程序,输出如下:

enter image description here

输出每次都不同,但需要在打印结果之前完成所有任务。似乎Parallel.ForEachAsync没有等待所有任务完成。我有什么遗漏吗?
internal class Program
{
    private async static Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        await TestParallel();
        sw.Stop();
        Console.WriteLine("Elapsed={0}", sw.Elapsed);
        Console.ReadLine();
    }

    private static async Task TestParallel()
    {
        var tests = new List<int>() { 1, 2, 3, 4, 5, 6 };
        var options = new ParallelOptions { MaxDegreeOfParallelism = 5,
            CancellationToken = CancellationToken.None };
        var responses = new List<string>();
        await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) =>
        {
            var response = await TestTask(testno);
            responses.Add(response);
        });
        foreach (var response in responses)
        {
            Console.WriteLine(response);
        }
    }
    private static Task<string> TestTask(int testno)
    {
        System.Threading.Thread.Sleep(1000);
        return Task.FromResult($"Test{testno}");
    }
}

1
关于“似乎Parallel.ForEachAsync没有等待所有任务完成”,你为什么这样说? - Jonathan
1
不等待所有任务完成,因为我期望它打印所有结果(顺序无关紧要)测试1、测试2、测试3、测试4、测试5、测试6。 - user3838575
4
List<T> 不是线程安全的。可以尝试使用 ConcurrentBag<T>, 看看是否有帮助。 - John Wu
1
为什么它等了两秒钟呢?使用MaxDOP为5的6个一秒任务需要2秒钟才能完成,是吗? - Caius Jard
2
谢谢@JohnWu,我将 var responses = new List<string>(); 改为 var responses = new ConcurrentBag<string>(); 现在按照我的期望工作了。 - user3838575
显示剩余2条评论
3个回答

2

针对.NET 6之前的版本进行回答。


我认为你的示例有点令人困惑。这是因为你使用了异步回调。大多数情况下,异步用于IO目的。

要么选择:(这将是CPU绑定的,执行一些重计算)

var responses = new List<string>();
var tests = new List<int>() { 1, 2, 3, 4 ,5,6};

Parallel.ForEach(tests, options, (testno) =>
{
    // no async here...
    var response = TestTask(testno);
    // lock the shared resource.
    lock(responses)
        responses.Add(response);
});

foreach (var response in responses)
{
    Console.WriteLine(response);
}

private static string TestTask(int testno)
{
    // calculations done here
    System.Threading.Thread.Sleep(1000);
    return $"Test{testno}";
}

或者选择: (这是IO限制,例如从外部源获取内容)

var tests = new List<int>() { 1, 2, 3, 4 ,5,6};

var tasks = new List<Task<string>>();

// just add the tasks to a list, so you can await them later.
// the first part (till the first await) will be completed synchronous. 
// If any async/await is used, the Task.WhenAll will wait for it. 
// Multiple tasks can be running simultaneously.
foreach(var t in tests)
    tasks.Add(TestTask(t));

await Task.WhenAll(tasks);

foreach (var task in tasks)
{
    // the current thread won't be blocked by calling the .Result here
    // All tasks are already completed.
    Console.WriteLine(task.Result);
}

private static async Task<string> TestTask(int testno)
{
    // Getting information from external resources.
    await Task.Delay(1000);
    return $"Test{testno}";
}

(可能有一些错别字,因为我没有在VS中写)

在这里可能有一些拼写错误,因为我没有在Visual Studio中写作。

这段代码无法编译。Parallel.ForEachAsync需要一个异步委托(Func<TSource, CancellationToken, ValueTask>)。如果你要进行同步工作,最好使用Parallel.ForEach - Theodor Zoulias
我会改的。 - Jeroen van Langen
1
嗯,在阅读了一些资料之后,我发现Parallel.ForEachAsync也用于IO-Bound。这意味着我的答案主要适用于旧框架... 我看到Parallel.ForEachAsync是.NET 6 _(目前在.NET 5中工作)_。 - Jeroen van Langen

1
答案如下 - 将行变量更改为 var responses = new ConcurrentBag();
internal class Program
{
    private async static Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        await TestParallel();
        sw.Stop();
        Console.WriteLine("Elapsed={0}", sw.Elapsed);
        Console.ReadLine();
    }

    private static async Task TestParallel()
    {
        var tests = new List<int>() { 1, 2, 3, 4 ,5,6};
        var options = new ParallelOptions { MaxDegreeOfParallelism = 5, CancellationToken = CancellationToken.None };
        var responses = new ConcurrentBag<string>();
        await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) =>
        {
            var response = await TestTask(testno);
            responses.Add(response);
        });
        foreach (var response in responses)
        {
            Console.WriteLine(response);
        }
    }
    private static Task<string> TestTask(int testno)
    {
        System.Threading.Thread.Sleep(1000);
        return Task.FromResult($"Test{testno}");
    }
}

不错的答案。但是,我强烈建议使用ConcurrentQueue<T>类,而不是使用高度专业化ConcurrentBag<T>类,因为它可以保留添加项的顺序。即使顺序不重要,为什么要选择一个洗牌集合,而不是一个保持顺序的集合呢?如果没有其他原因,这会使调试更加困难。 - Theodor Zoulias
3
@Theodor 不行,这个回答很糟糕,因为它甚至没有回答问题。我不理解为什么会被点赞;我很确定它出现在John Wu的评论之后,甚至没有给出比那个评论更多的信息。如果它解释了为什么在这种情况下使用List是一个坏主意,并且导致显示的结果,我可以继续尝试,但这看起来像是将别人的灵感(抄袭)作为“尝试”发布(对OP没有学习机会,“我该怎么做才能使它工作?”不是问题)。整体表现不佳。 - Caius Jard
1
@CaiusJard 你说得对。我只是想表现得友好一些。:-) 这个答案可以有很大的改进空间。 - Theodor Zoulias

-1

我测试了你的代码。写的代码给出了不同的结果,似乎Parallel.Async没有等待。但真正的问题是List对象。List对象是共享对象,不是线程安全的。

如果你使用ConcurrentBag代替List,那么你会得到一致的结果。 我已经在Progam.cs中更改了代码并进行了测试。

using System.Collections.Concurrent; 
using System.Diagnostics;
Console.WriteLine("Hello, World!");
Stopwatch sw = new Stopwatch();   
sw.Start(); 
var tests = new  List<int>() { 1, 2, 3, 4, 5, 6 }; 
var options = new ParallelOptions {
   MaxDegreeOfParallelism = 2, CancellationToken = CancellationToken.None }; 
var responses = new ConcurrentBag<string>();

await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) => 
{
   var response = await TestTask(testno);
   responses.Add(response);
   Console.WriteLine("Executed... " + response); 
});
Console.WriteLine("Waiting.....");  

await Task.Delay(1000);

foreach(var response in responses) 
{
   Console.WriteLine("After Task Response:" + response); 
}

sw.Stop();
Console.WriteLine("Elapsed={0}", sw.Elapsed);
Console.ReadLine();

static Task<string> TestTask(int testno) 
{   System.Threading.Thread.Sleep(1000);
    return Task.FromResult($"Test{testno}"); 
}

代码混乱,缺乏基本格式。将代码分成不同的部分并澄清每个部分会使答案更加清晰明了。目前来看,这是一段长代码阅读,配以简短的文本摘要。 - M.Nar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接