分布式处理中通常会产生多少开销?

3
对于那些急于阅读的读者:这是一个正在进行中的工作,我在此过程中寻求帮助。请不要根据我的临时数据来判断工具的好坏,因为我在尝试获取更好的结果时它们可能会改变。
我们正在决定构建一种用于分析共模拟输出的工具的架构。
作为该过程的一部分,我被要求编写基准测试工具,并获取几个分布式处理框架的速度数据。
我测试的框架有:Apache Spark、Apache Flink、Hazelcast Jet。并对比了纯Java。
我的测试用例是一个简单的“这里有一个Pojos列表,其中pojo的一个字段是double值。找到最小值”。简单、直接,希望可以高度可比。
四个测试中有三个使用简单比较器,第四个(flink)使用的是基本上与比较器相同的缩小器。分析函数看起来像这样:
Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();

Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();

Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();

Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();

I tested this extensively, varying the size of the test list as well as the allocated resources. And the results amazed me. The BEST results can be seen below (all numbers in ms, 1 million pojos, 10 tests each):
- instances: time taken to declare and initialize the instance of the frameworks. - list: time taken to parse/transfer the List to the frameworks "list". - process: time taken to process the data to retrieve the minimum value. - overall: total time taken for each test from start to end.
Outcome:
java:
Instances: 
List: 
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16, 

spark:
Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52, 
List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0, 
Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882, 
Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943, 

hazel:
Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799, 
List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427, 
Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769, 
Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007

flink:
Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24, 
Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586, 
Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610, 

最有趣的部分是:
  • 所有最佳结果都来自纯本地测试(一个实例)
  • 任何使用分布式机制(附加节点等)的测试,仍然比单线程慢一个数量级(例如,如果分布式,则Spark慢2.5倍)。
现在别误会了,基本逻辑是分布式处理必须比单线程处理每个核心都要慢。
但是即使在单线程上使用,也会有两个数量级的差异吗?如果分布式,就有三个数量级的差异?有人能看到我在所有三个分布式过程中犯的错误吗?我预计某些因素应该小于10,因此使用更多硬件进行扩展将是一种选择。
那么有没有办法将这些框架的开销减少到,嗯,可能是x9而不是x999?
我知道我知道,我使用的测试数据太小了,但即使将其扩大,我也没有看到开销与性能之间的减少。而且它大约是我们需要分析的数据批次的大小(每个模拟0.1M-1M个对象/秒)。因此,欢迎您帮助找到我的错误。 :D
更新Spark:
经过更彻底的Spark测试后,我仍然不感到满意。设置如下:
在64个核心,480 GB RAM作业的单台机器上运行Java客户端 主节点和7个从节点位于单独的机架上,每个机架32个核心,20 GB
    1 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29, 
    spark:
      Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106, 
      List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1, 
      Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051, 

    10 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66, 
    spark:
      Instances: 20345, 
      List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1, 
      Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53, 
    spark:
      Instances: 4631, 
      List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1, 
      Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21, 
    spark:
      Instances: 4513, 
      List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1, 
      Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335

    10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64, 
    spark:
      Instances: 20181, 
      List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1, 
      Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320

    1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
    java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4, 
    spark:
      Instances: 4612, 
      List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 
      Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422

    2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
    java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34, 
    spark:
      Instances: 6316, 
      List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1, 
      Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095

    10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
    java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89, 
    spark:
      Instances: 21192, 
      List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 
      Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845

结果:无论抛出多少硬件,将任务聚集起来,使用 Spark 处理列表中的每一百万个 pojos 都需要 5-6 秒。而 Java 则只需要 5-30 毫秒处理相同数量的内容,基本上相当于一个 200-1,000 倍的差异。
有没有人有建议如何加速 Spark 处理这样一个简单的作业?
更新哈兹尔:
现在我开始感到印象深刻了。虽然我仍在与一些奇怪的问题作斗争,但至少 Hazelcast Jet 似乎明白如果可能的话可以在本地处理本地数据。它只有 100%(因素 x2)的开销,这是完全可以接受的。
1,000 万个对象
java:
   Instances: 
   List: 68987, 
   Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88, 
hazel:
  Instances: 6136, 
  List: 97225, 
  Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140

更新 Flink:

目前已经将 Flink 从基准测试中删除,因为它在不提供很好结果的同时带来了太多麻烦。

编辑:整个基准测试可以在以下链接中找到:https://github.com/anderschbe/clusterbench

Spark 的集群设置使用 spark-2.1.0-bin-hadoop2.7,与开箱即用相同。只需在 spark_env.sh 中进行一处小修改:SPARK_NO_DAEMONIZE=true

让它在集群上正常运行所需唯一的更改是将 SparcProc 第25行中的 "localhost" 替换为 "spark://I_cant_give_you_my_cluster_IP.doo"


您是否有分享代码的方式?例如在GitHub私人仓库上。 - Neil Stevenson
我将在此附上最重要的部分。老实说,这只是基础知识,直接从框架教程/手册中提取的内容。 - Anders Bernard
@AndersBernard,我在我的回答中添加了重要的编辑 - 你正在运行“local”,这意味着只有一个工作线程。 - T. Gawęda
也许表达不够清晰 ;) 当你设置好集群时,请通知我 ;) - T. Gawęda
1
如果您的数据源于单个节点,则首要考虑因素应该是将其分布到网络上的成本与计算成本之间的比较。由于您的测试使用简单的_min_函数,因此这比网络延迟便宜几个数量级。如果实际计算仍然比网络开销便宜,则不会从分布式计算引擎中受益。 - Marko Topolnik
显示剩余3条评论
1个回答

5
当你在集群框架中计算某些内容(如Spark或Flink)时,框架会:
  • 将您的代码序列化
  • 发送资源请求
  • 通过网络发送您的代码
  • 调度执行
  • 等待结果
如您所见,有许多步骤需要执行 - 不仅仅是您的计算!分布式计算只有在以下情况下才有意义:
  • 可以将您的计算切分成小任务并行完成
  • 要处理的数据太多,无法在一台机器上处理,或者在一台机器上处理速度太慢 - 磁盘I/O、项目中的其他特定因素或计算非常特定,需要许多CPU,通常超过一台机器拥有的数量 - 但是数据的一部分计算必须非常耗时
尝试计算10 GB文本文件中单词出现次数的最大值 - 然后Spark和Flink将击败单节点Java。
有时用户代码可能会导致分布式计算变慢。典型错误包括:
  • 用户在具有许多引用的类中编写lambda表达式 - 所有其他类都被序列化,序列化需要很长时间
  • 任务实际上不是并行的 - 它们必须互相等待或必须在大量数据上进行操作
  • 数据分布不均 - 对象可能具有不当的hashCode实现和HashPartitioner导致所有数据都进入一个分区=一个节点
  • 分区数目不正确 - 您可以添加1000台机器,但如果仍然只有4个分区,则最多可以同时完成4个并行任务
  • 网络通信过多 - 在您的情况下这不是问题,但有时用户会执行大量的joinreduce
编辑: 在您的示例中,Spark在local上运行 - 这意味着仅使用1个线程!请至少使用local[*]或其他集群管理器。本回答中列出了各种开销,而且只有一个线程。

nod 我明白这一切。我只是对开销的极端程度感到困惑。正如所说,我们可能需要处理超出一台机器承载能力的数据...但我们不能通过将硬件要求增加1000倍来支付这个事实。增加10倍是可以接受的。 - Anders Bernard
@AndersBernard 分布式计算并不适用于所有用例。稍后我会发布详细信息,解释为什么Spark可能会更慢,请稍等片刻 ;) - T. Gawęda
nod 我也明白。虽然我现在只是在测试一个单一的软件包,但这只是为了准备处理连续的大量软件包,每秒钟可能有数十个需要分析。 - Anders Bernard
几十个?百万级别 - 没问题,数千个 - 没问题,但是几十个听起来数量相当少;) 使用更大量的数据运行测试。如果没有提供更好的结果,则发布您的配置,也许您的配置中有一些错误。 - T. Gawęda
我们正在谈论每秒模拟1-15个包(10-100次模拟)。每个包含有10-200k个对象,每个对象大约120字节。因此,低计算量将总共达到12 Mbyte至36 Gbyte每秒。 - Anders Bernard
@AndersBernard 请发布您的配置和代码 - 或许存在某些特定的错误。 - T. Gawęda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接