为什么在这种情况下使用AsParallel()比foreach慢?

3
我正在从这种格式的Excel中提取数据:
 产品1 | 未命名列2 | 产品2 | 未命名列4 | 产品3 | 未命名列6 |
-------------------------------------------------- -------------
 @1foo | 1.10 | @1foo | 0.3 | @1foo | 0.3
 @2foo | 1.00 | @2foo | 2 | @2foo |
 @3foo | 1.52 | @3foo | 2.53 | @3foo |
 @4foo | 1.47 | | | @4foo | 1.31
 @5foo | 1.49 | | | @5foo | 1.31
该文件使用了所有255个字段。使用dapper-dot-net,我通过以下代码获取数据
IEnumerable<IDictionary<string, object>> excelDataRaw =
                conn.Query(string.Format("select * from {0}", table)).Cast<IDictionary<string, object>>();

我将这些数据传递给这些测试方法。这些数据以IDictionary的IEnumerable形式返回,其中每个键都是产品,每个值都是一个IDictionary,其中每个键都是来自产品列的值,相应的值是未命名列中该产品值的右侧的值。

var excelDataRefined = new List<IDictionary<string, IDictionary<string, decimal>>>();
excelDataRefined.Add(new Dictionary<string, IDictionary<string, decimal>>());
excelDataRefined[0].Add( "product", new Dictionary<string, decimal>());
excelDataRefined[0]["product"].Add("@1foo", 1.1m);

方法如下:
private static Dictionary<string, IDictionary<string, decimal>> Benchmark_foreach(IEnumerable<IDictionary<string, object>> excelDataRaw)
{
    Console.WriteLine("1. Using foreach");
    var watch = new Stopwatch();
    watch.Start();

    List<string> headers = excelDataRaw.Select(dictionary => dictionary.Keys).First().ToList();
    bool isEven = false;
    List<string> products = headers.Where(h => isEven = !isEven).ToList();
    var dates = new List<IEnumerable<object>>();
    var prices = new List<IEnumerable<object>>();

    foreach (string field in headers)
    {
        string product1 = field;
        if (headers.IndexOf(field) % 2 == 0)
        {
            dates.Add(
                excelDataRaw.AsParallel().AsOrdered().Select(col => col[product1]).Where(row => row != null));
        }

        if (headers.IndexOf(field) % 2 == 1)
        {
            prices.Add(
                excelDataRaw.AsParallel().AsOrdered().Select(col => col[product1] ?? 0m).Take(dates.Last().Count()));
        }
    }

    watch.Stop();
    Console.WriteLine("Rearange the data in: {0}s", watch.Elapsed.TotalSeconds);
    watch.Restart();

    var excelDataRefined = new Dictionary<string, IDictionary<string, decimal>>();
    foreach (IEnumerable<object> datelist in dates)
    {
        decimal num;
        IEnumerable<object> datelist1 = datelist;
        IEnumerable<object> pricelist =
            prices[dates.IndexOf(datelist1)].Select(value => value ?? 0m).Where(
                content => decimal.TryParse(content.ToString(), out num));
        Dictionary<string, decimal> dict =
            datelist1.Zip(pricelist, (k, v) => new { k, v }).ToDictionary(
                x => (string)x.k, x => decimal.Parse(x.v.ToString()));

        if (!excelDataRefined.ContainsKey(products[dates.IndexOf(datelist1)]))
        {
            excelDataRefined.Add(products[dates.IndexOf(datelist1)], dict);
        }
    }

    watch.Stop();
    Console.WriteLine("Zipped the data in: {0}s", watch.Elapsed.TotalSeconds);

    return excelDataRefined;
}

private static Dictionary<string, IDictionary<string, decimal>> Benchmark_AsParallel(IEnumerable<IDictionary<string, object>> excelDataRaw)
{
    Console.WriteLine("2. Using AsParallel().AsOrdered().ForAll");
    var watch = new Stopwatch();
    watch.Start();

    List<string> headers = excelDataRaw.Select(dictionary => dictionary.Keys).First().ToList();
    bool isEven = false;
    List<string> products = headers.Where(h => isEven = !isEven).ToList();
    var dates = new List<IEnumerable<object>>();
    var prices = new List<IEnumerable<object>>();

    headers.AsParallel().AsOrdered().ForAll(
        field =>
        dates.Add(
            excelDataRaw.AsParallel().AsOrdered().TakeWhile(x => headers.IndexOf(field) % 2 == 0).Select(
                col => col[field]).Where(row => row != null).ToList()));
    headers.AsParallel().AsOrdered().ForAll(
        field =>
        prices.Add(
            excelDataRaw.AsParallel().AsOrdered().TakeWhile(x => headers.IndexOf(field) % 2 == 1).Select(
                col => col[field] ?? 0m).Take(256).ToList()));
    dates.RemoveAll(x => x.Count() == 0);
    prices.RemoveAll(x => x.Count() == 0);

    watch.Stop();
    Console.WriteLine("Rearange the data in: {0}s", watch.Elapsed.TotalSeconds);
    watch.Restart();

    var excelDataRefined = new Dictionary<string, IDictionary<string, decimal>>();
    foreach (IEnumerable<object> datelist in dates)
    {
        decimal num;
        IEnumerable<object> datelist1 = datelist;
        IEnumerable<object> pricelist =
            prices[dates.IndexOf(datelist1)].Select(value => value ?? 0m).Where(
                content => decimal.TryParse(content.ToString(), out num));
        Dictionary<string, decimal> dict =
            datelist1.Zip(pricelist, (k, v) => new { k, v }).ToDictionary(
                x => (string)x.k, x => decimal.Parse(x.v.ToString()));

        if (!excelDataRefined.ContainsKey(products[dates.IndexOf(datelist1)]))
        {
            excelDataRefined.Add(products[dates.IndexOf(datelist1)], dict);
        }
    }

    watch.Stop();
    Console.WriteLine("Zipped the data in: {0}s", watch.Elapsed.TotalSeconds);

    return excelDataRefined;
}

private static Dictionary<string, IDictionary<string, decimal>> Benchmark_ForEach(IEnumerable<IDictionary<string, object>> excelDataRaw)
{
    Console.WriteLine("3. Using ForEach");
    var watch = new Stopwatch();
    watch.Start();

    List<string> headers = excelDataRaw.Select(dictionary => dictionary.Keys).First().ToList();
    bool isEven = false;
    List<string> products = headers.Where(h => isEven = !isEven).ToList();
    var dates = new List<IEnumerable<object>>();
    var prices = new List<IEnumerable<object>>();

    headers.ForEach(
        field =>
        dates.Add(
            excelDataRaw.TakeWhile(x => headers.IndexOf(field) % 2 == 0).Select(col => col[field]).Where(
                row => row != null).ToList()));
    headers.ForEach(
        field =>
        prices.Add(
            excelDataRaw.TakeWhile(x => headers.IndexOf(field) % 2 == 1).Select(col => col[field] ?? 0m).
            Take(256).ToList()));
    dates.RemoveAll(x => x.Count() == 0);
    prices.RemoveAll(x => x.Count() == 0);

    watch.Stop();
    Console.WriteLine("Rearange the data in: {0}s", watch.Elapsed.TotalSeconds);
    watch.Restart();

    var excelDataRefined = new Dictionary<string, IDictionary<string, decimal>>();
    foreach (IEnumerable<object> datelist in dates)
    {
        decimal num;
        IEnumerable<object> datelist1 = datelist;
        IEnumerable<object> pricelist =
            prices[dates.IndexOf(datelist1)].Select(value => value ?? 0m).Where(
                content => decimal.TryParse(content.ToString(), out num));
        Dictionary<string, decimal> dict =
            datelist1.Zip(pricelist, (k, v) => new { k, v }).ToDictionary(
                x => (string)x.k, x => decimal.Parse(x.v.ToString()));

        if (!excelDataRefined.ContainsKey(products[dates.IndexOf(datelist1)]))
        {
            excelDataRefined.Add(products[dates.IndexOf(datelist1)], dict);
        }
    }

    watch.Stop();
    Console.WriteLine("Zipped the data in: {0}s", watch.Elapsed.TotalSeconds);

    return excelDataRefined;
}
  • 使用Benchmark_foreach需要约3.5秒来重新排列数据,3秒来压缩数据。
  • 使用Benchmark_AsParallel需要约12秒来重新排列数据,0.005秒来压缩数据。
  • 使用Benchmark_ForEach需要约16秒来重新排列数据,0.005秒来压缩数据。

为什么会出现这种情况?我原本以为AsParallel会是最快的,因为它可以并行执行,而不是顺序执行。我该如何优化它呢?


1.) 测试 if .....(field) % 2 == 1 是不必要的,可以用 'else' 替换,因为 % 2 只会产生零或一。 2.) 您正在使用的 'IEnumerable' 有多大? - docmanhattan
excelDataRaw.Count() 返回 65535 个元素(Excel 文件中的最大行数)。excelDataRaw.SelectMany(x=>x.Values).Count() 返回 16645890 个元素,其中有很多空值我无法忽略。 - mrt181
它基本上是一个由254列和65535行组成的网格,其中每个网格点都附有其列头作为键。 - mrt181
3个回答

6
为了进行并行计算,您必须拥有多个处理器或核心,否则您只是在线程池中排队等待CPU的任务。即使在单核机器上使用AsParallel也是顺序执行加上线程池和线程上下文切换的开销。即使在双核机器上,您可能也无法使用两个核心,因为同一台机器上运行着许多其他程序。
真正有用的只有当您有长时间运行的阻塞操作(I/O)时,操作系统可以挂起阻塞线程并让另一个线程运行时,.AsParallel() 才变得有用。

是的,公司电脑只有两个内核,我猜我的输入数据太小了,所以使用AsParallel会增加太多开销。 - mrt181
16
如果您的计算机有多个核心,且受限于CPU而非IO,则AsParallel是最佳选择。如果您受限于IO,则最好使用异步策略。 - Jim Wooley
2
你应该使用异步来避免阻塞工作线程,即为每个线程创建任务链,但仍需要使用AsParallel来扇出工作线程,以便它们可以从池中获取线程来启动这些任务链。 - Arne Claassen

3
创建额外的线程和管理每个线程的工作负载会带来开销。 如果您有限制的工作量,那么创建额外的线程、在线程之间进行任务切换、工作窃取和重新分配等操作可能会抵消在第一次并行化工作时获得的收益。 您可能需要对应用程序进行分析,以确定单个进程运行时是否真正受到CPU限制。 如果不是,则最好保持单线程,并将瓶颈转变为难以并行化的IO操作。
另外还有一些建议:使用AsOrdered和TakeWhile会导致性能下降,因为它们都需要返回到源线程进行同步。 考虑在不需要排序的情况下进行分析,并查看是否提供了任何性能改进。
此外,考虑使用ConcurrentDictionary而不是标准通用字典,以避免添加项目时出现并发问题。

0
在Benchmark_AsParallel和Benchmark_ForEach中,您在Benchmark_foreach n中执行2n个操作。

我测量了ForEach和AsParallel方法中每个重新排列操作之间的时间。第一个操作占用了所有时间,而第二个操作只需要0.05秒。 - mrt181

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接