如何从Parallel.ForEach中收集返回值?

72
我正在并行调用一个较慢的 Web 服务。一切都很好,直到我意识到需要从服务中获取一些信息。但是我不知道在哪里获取这些值。我无法写入数据库,因为使用 Parallel.ForEach 调用的方法内 HttpContext.Current 似乎为空。
以下是一个示例程序(请想象一个较慢的 Web 服务而不是字符串连接)。
using System;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        WordMaker m = new WordMaker();
        m.MakeIt();
    }
    public class WordMaker
    {
        public void MakeIt()
        {
            string[] words = { "ack", "ook" };
            ParallelLoopResult result = Parallel.ForEach(words, word => AddB(word));
            Console.WriteLine("Where did my results go?");
            Console.ReadKey();
        }
        public string AddB(string word)
        {
            return "b" + word;
        }
    }

}

Parallel.ForEach的不同重载可能是您想要的:http://msdn.microsoft.com/zh-cn/library/dd991486.aspx - Austin Salonen
@PhillipSchmidt:无论如何,不要使用示例中使用的过载... - Austin Salonen
@PhillipSchmidt 我第一条评论中的链接明确处理了 Func<...> 而不是 Action<...> - Austin Salonen
1
@AustinSalonen 你期望我在问愚蠢的问题之前使用我的眼睛吗?天啊。开玩笑的 - 我现在就读它。 - Phillip Schmidt
@AustinSalonen 好吧,那只是在做我建议他做的事情。但很好——我不知道那个。 - Phillip Schmidt
显示剩余2条评论
6个回答

80

你在这里将它丢弃了。

ParallelLoopResult result = Parallel.ForEach(words, word => AddB(word));

你可能想要类似这样的东西:

ParallelLoopResult result = Parallel.ForEach(words, word =>
{
    string result = AddB(word);
    // do something with result
});

如果你想在最后得到某种集合,考虑使用System.Collections.Concurrent下的集合之一,比如ConcurrentBag

ConcurrentBag<string> resultCollection = new ConcurrentBag<string>();
ParallelLoopResult result = Parallel.ForEach(words, word =>
{
    resultCollection.Add(AddB(word));
});

// Do something with the result

48
我认为ParallelLoopResult在这里没有任何用处。+1,不过。 - usr
3
LINQ中的AsParallel()会更好。 - Jorge Aguirre
3
使用ConcurrentBag而不是List可以解决这个问题。 - Russell Horwood
1
@PatrickfromNDependteam,现在你已经将你的想法发布为答案,那些认为它比这个答案更好的人将开始给你的答案点赞,并将其移动到顶部。每个点赞都会让你获得声望积分,所以这是一个双赢的局面。 - Theodor Zoulias
@TheodorZoulias 好的,我明白了,谢谢。 - Patrick from NDepend team
显示剩余7条评论

42

你可以考虑使用IEnumerableAsParallel扩展方法,它会为你处理并发问题,并收集结果。

words.AsParallel().Select(AddB).ToArray()

同步(例如锁或使用锁的并发集合)通常是并发算法的瓶颈。最好尽可能避免同步。我猜测AsParallel使用了更加智能的方式,将单个线程产生的所有项目放入本地非并发集合中,然后在最后组合这些项目。


9
这要好得多。 - Jorge Aguirre
1
当我尝试这个时,选择中的代码似乎没有在不同的线程中运行。但是Parallel.Foreach可以做到。 - Joy George Kunjikkuru
1
你确定你正在使用AsParallel吗?你怎么知道它没有使用更多的线程? - Steves
1
可选地,您也可以在 .AsParallel() 之后加上 .AsOrdered(),以按输入值的顺序接收结果。 - Theodor Zoulias
有没有一种方法可以实现这个,对于一个名为AddB的方法,它需要3个参数,其中只有第一个是IEnumerable? - Dominik
类似于 x => AddB(x, some, other, arguments) 这样的东西吗? - Steves

14

不要使用 ConcurrentBag 收集结果,因为它速度较慢。请改为使用本地锁。

var resultCollection = new List<string>();
object localLockObject = new object();

Parallel.ForEach<string, List<string>>(
      words,
      () => { return new List<string>(); },
      (word, state, localList) =>
      {
         localList.Add(AddB(word));
         return localList;
      },
      (finalResult) => { lock (localLockObject) resultCollection.AddRange(finalResult); }
); 

// Do something with resultCollection here

1
你有任何统计数据表明ConcurrentBag比使用我们自己的对象锁更慢吗?我只是想知道它有多慢,因为它使我的代码看起来比使用对象锁更清晰。 - dinesh ygv
@dineshygv 在我看来,差异微乎其微。https://dev59.com/P741zogBFxS5KdRjtFfw#34016915 - Matas Vaitkevicius
或者干脆不使用任何锁定机制 ;-) - Steves

7
这似乎是安全、快速且简单的操作:
    public string[] MakeIt() {
        string[] words = { "ack", "ook" };
        string[] results = new string[words.Length];
        ParallelLoopResult result =
            Parallel.For(0, words.Length, i => results[i] = AddB(words[i]));
        return results;
    }

这很可能会导致缓存乒乓,但仍然比并发收集要好得多。 - Steves
非常明智的做法!@overlord-zurg - Oguz Karadenizli

6

在集合大小已知的特殊情况下,可以使用数组而不是昂贵的并发集合。每个循环访问其自己的 ouputs 数组中的槽位,因此不存在冲突的风险。作为奖励,输出按照输入的相同顺序存储:

const int NB_WORDS = 1000;
var inputs = new string[NB_WORDS];
for(var i= 0; i < NB_WORDS; i++) { inputs[i] = i.ToString(); }

var outputs = new string[NB_WORDS];

Parallel.For(0, NB_WORDS, index => {
   string word = inputs[index];
   string result = word + word; // Operation on word
   outputs[index] = result; // No need of a concurrent collection to store the result!
});

Debug.Assert(outputs.All(result => !string.IsNullOrEmpty(result)));

使用Parallel.For方法是否比使用Parallel.ForEach枚举Enumerable.Range更简单? - Theodor Zoulias
1
@TheodorZoulias 当然,谢谢。 - Patrick from NDepend team

3
像这样怎么样:
public class WordContainer
{
    public WordContainer(string word)
    {
        Word = word;
    }

    public string Word { get; private set; }
    public string Result { get; set; }
}

public class WordMaker
{
    public void MakeIt()
    {
        string[] words = { "ack", "ook" };
        List<WordContainer> containers = words.Select(w => new WordContainer(w)).ToList();

        Parallel.ForEach(containers, AddB);

        //containers.ForEach(c => Console.WriteLine(c.Result));
        foreach (var container in containers)
        {
            Console.WriteLine(container.Result);
        }

        Console.ReadKey();
    }

    public void AddB(WordContainer container)
    {
        container.Result = "b" + container.Word;
    }
}

我认为除非您需要结果彼此交互(例如计算总和或组合所有单词),否则锁定或并发对象是不必要的。在这种情况下,ForEach 将原始列表整洁地分解,并将每个线程的对象分配给它自己,它可以自由地操纵它想要的对象,而不用担心干扰其他线程。

是的,这对于控制台应用程序可以起作用,但即使对于控制台应用程序,您也可能希望首先将它们聚合到集合中,否则您会在控制台窗口中得到交错的结果。 - MatthewMartin
Console.WriteLine 命令在主线程上同步运行,它将按照原始列表中定义的顺序,在 Parallel.ForEach 完成处理所有列表项并返回后打印结果。如果我从 Parallel.ForEach 中调用 WriteLine,则结果将交错。 - MichaC

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接