对于那些急于阅读的读者:这是一个正在进行中的工作,我在此过程中寻求帮助。请不要根据我的临时数据来判断工具的好坏,因为我在尝试获取更好的结果时它们可能会改变。
我们正在决定构建一种用于分析共模拟输出的工具的架构。
作为该过程的一部分,我被要求编写基准测试工具,并获取几个分布式处理框架的速度数据。
我测试的框架有:Apache Spark、Apache Flink、Hazelcast Jet。并对比了纯Java。
我的测试用例是一个简单的“这里有一个Pojos列表,其中pojo的一个字段是double值。找到最小值”。简单、直接,希望可以高度可比。
四个测试中有三个使用简单比较器,第四个(flink)使用的是基本上与比较器相同的缩小器。分析函数看起来像这样:
I tested this extensively, varying the size of the test list as well as the allocated resources. And the results amazed me. The BEST results can be seen below (all numbers in ms, 1 million pojos, 10 tests each):
- instances: time taken to declare and initialize the instance of the frameworks. - list: time taken to parse/transfer the List to the frameworks "list". - process: time taken to process the data to retrieve the minimum value. - overall: total time taken for each test from start to end.
Outcome:
最有趣的部分是:
但是即使在单线程上使用,也会有两个数量级的差异吗?如果分布式,就有三个数量级的差异?有人能看到我在所有三个分布式过程中犯的错误吗?我预计某些因素应该小于10,因此使用更多硬件进行扩展将是一种选择。
那么有没有办法将这些框架的开销减少到,嗯,可能是x9而不是x999?
我知道我知道,我使用的测试数据太小了,但即使将其扩大,我也没有看到开销与性能之间的减少。而且它大约是我们需要分析的数据批次的大小(每个模拟0.1M-1M个对象/秒)。因此,欢迎您帮助找到我的错误。 :D
更新Spark:
经过更彻底的Spark测试后,我仍然不感到满意。设置如下:
在64个核心,480 GB RAM作业的单台机器上运行Java客户端 主节点和7个从节点位于单独的机架上,每个机架32个核心,20 GB
结果:无论抛出多少硬件,将任务聚集起来,使用 Spark 处理列表中的每一百万个 pojos 都需要 5-6 秒。而 Java 则只需要 5-30 毫秒处理相同数量的内容,基本上相当于一个 200-1,000 倍的差异。
有没有人有建议如何加速 Spark 处理这样一个简单的作业?
更新哈兹尔:
现在我开始感到印象深刻了。虽然我仍在与一些奇怪的问题作斗争,但至少 Hazelcast Jet 似乎明白如果可能的话可以在本地处理本地数据。它只有 100%(因素 x2)的开销,这是完全可以接受的。
1,000 万个对象
我们正在决定构建一种用于分析共模拟输出的工具的架构。
作为该过程的一部分,我被要求编写基准测试工具,并获取几个分布式处理框架的速度数据。
我测试的框架有:Apache Spark、Apache Flink、Hazelcast Jet。并对比了纯Java。
我的测试用例是一个简单的“这里有一个Pojos列表,其中pojo的一个字段是double值。找到最小值”。简单、直接,希望可以高度可比。
四个测试中有三个使用简单比较器,第四个(flink)使用的是基本上与比较器相同的缩小器。分析函数看起来像这样:
Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();
Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();
Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();
Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();
I tested this extensively, varying the size of the test list as well as the allocated resources. And the results amazed me. The BEST results can be seen below (all numbers in ms, 1 million pojos, 10 tests each):
- instances: time taken to declare and initialize the instance of the frameworks. - list: time taken to parse/transfer the List to the frameworks "list". - process: time taken to process the data to retrieve the minimum value. - overall: total time taken for each test from start to end.
Outcome:
java:
Instances:
List:
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16,
Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16,
spark:
Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52,
List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0,
Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882,
Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943,
hazel:
Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799,
List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427,
Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769,
Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007
flink:
Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0,
List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24,
Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586,
Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610,
最有趣的部分是:
- 所有最佳结果都来自纯本地测试(一个实例)
- 任何使用分布式机制(附加节点等)的测试,仍然比单线程慢一个数量级(例如,如果分布式,则Spark慢2.5倍)。
但是即使在单线程上使用,也会有两个数量级的差异吗?如果分布式,就有三个数量级的差异?有人能看到我在所有三个分布式过程中犯的错误吗?我预计某些因素应该小于10,因此使用更多硬件进行扩展将是一种选择。
那么有没有办法将这些框架的开销减少到,嗯,可能是x9而不是x999?
我知道我知道,我使用的测试数据太小了,但即使将其扩大,我也没有看到开销与性能之间的减少。而且它大约是我们需要分析的数据批次的大小(每个模拟0.1M-1M个对象/秒)。因此,欢迎您帮助找到我的错误。 :D
更新Spark:
经过更彻底的Spark测试后,我仍然不感到满意。设置如下:
在64个核心,480 GB RAM作业的单台机器上运行Java客户端 主节点和7个从节点位于单独的机架上,每个机架32个核心,20 GB
1 mio objects, 256 tasks, 64 cpus local[*]
java:
Instances:
List:
Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29,
spark:
Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106,
List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1,
Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051,
10 mio objects, 256 tasks, 64 cpus local[*]
java:
Instances:
List:
Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66,
spark:
Instances: 20345,
List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1,
Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516
1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
java:
Instances:
List:
Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53,
spark:
Instances: 4631,
List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1,
Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972
1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
java:
Instances:
List:
Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21,
spark:
Instances: 4513,
List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1,
Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335
10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64,
spark:
Instances: 20181,
List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1,
Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320
1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4,
spark:
Instances: 4612,
List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1,
Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422
2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34,
spark:
Instances: 6316,
List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1,
Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095
10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89,
spark:
Instances: 21192,
List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1,
Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845
结果:无论抛出多少硬件,将任务聚集起来,使用 Spark 处理列表中的每一百万个 pojos 都需要 5-6 秒。而 Java 则只需要 5-30 毫秒处理相同数量的内容,基本上相当于一个 200-1,000 倍的差异。
有没有人有建议如何加速 Spark 处理这样一个简单的作业?
更新哈兹尔:
现在我开始感到印象深刻了。虽然我仍在与一些奇怪的问题作斗争,但至少 Hazelcast Jet 似乎明白如果可能的话可以在本地处理本地数据。它只有 100%(因素 x2)的开销,这是完全可以接受的。
1,000 万个对象
java:
Instances:
List: 68987,
Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88,
hazel:
Instances: 6136,
List: 97225,
Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140
更新 Flink:
目前已经将 Flink 从基准测试中删除,因为它在不提供很好结果的同时带来了太多麻烦。
编辑:整个基准测试可以在以下链接中找到:https://github.com/anderschbe/clusterbench
Spark 的集群设置使用 spark-2.1.0-bin-hadoop2.7,与开箱即用相同。只需在 spark_env.sh 中进行一处小修改:SPARK_NO_DAEMONIZE=true
让它在集群上正常运行所需唯一的更改是将 SparcProc 第25行中的 "localhost" 替换为 "spark://I_cant_give_you_my_cluster_IP.doo"