为什么这个简单的Spark程序没有利用多核心?

7

所以,我正在一个16核的多核系统上运行这个简单程序。我通过以下方式运行它:

spark-submit --master local[*] pi.py

那个程序的代码如下。

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

当我使用 top 命令查看 CPU 使用情况时,只有一个核心被使用。为什么会这样?另外,Spark 文档说默认并行度包含在属性 spark.default.parallelism 中。我如何从我的 Python 程序中读取此属性?

4个回答

27

由于以上方法都对我没有用(可能是因为我没有真正理解它们),这里是我的建议。

我正在使用 spark-submit program.py 命令启动程序,文件中有 sc = SparkContext("local", "Test") 代码。我尝试使用 sc.defaultParallelism 验证 spark 看到的核心数目,结果只有1个。当我将上下文初始化更改为 sc = SparkContext("local[*]", "Test") 后,它变成了16个(我的系统的核心数),我的程序正在使用所有核心。

我对 spark 还很陌生,但我的理解是,local 默认表示使用一个内核,并且因为它在程序内进行设置,所以会覆盖其他设置(肯定会覆盖来自配置文件和环境变量的设置,对于我来说是这样)。


4
它只是有效的。在我看来,作为一个干净的解决方案,比第一个答案更好。 - NullPointer
10
另外,您可以用想要使用的核心数替换 *。将其保留为 * 会默认使用最大数量的本地核心。仅使用 local 将其设置为1个核心。 - anmol
3
太好了!但为什么它没有被提及呢?我的意思是,甚至不直观,对吧?“local[*]”是什么语法?无论如何,如果没有这个答案,我会浪费数小时苦思冥想。 - Ahmadov

6

可能是因为调用sc.parallelize将所有数据放入一个分区中。您可以将分区数指定为parallelize的第二个参数:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

请注意,这仍然会在驱动程序中使用一个CPU生成1200万个点,然后只将它们分配到16个分区以执行减少步骤。
更好的方法是尝试在分区后尽可能多地完成大部分工作:例如,以下仅在驱动程序上生成一个微小的数组,然后让每个远程任务生成实际的随机数和随后的PI近似值:
part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

最后,(因为我们越懒越好),Spark MLlib 实际上已经自带了一个随机数据生成器,非常适用于并行计算,你可以在这里查看:http://spark.apache.org/docs/1.1.0/mllib-statistics.html#random-data-generation。所以,也许以下内容接近你想要的(未测试=& gt;可能不起作用,但应该很接近)。

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )

1
设置分区对我没有起作用。@Ivaylo Petrov提供的答案解决了问题。 - Ahmadov

2

我尝试了@Svend提到的方法,但仍然无法正常工作。

以下方法适用于我:

不要使用local url,例如:

sc = SparkContext("local", "Test App")

使用主URL,如下所示:

sc = SparkContext("spark://your_spark_master_url:port", "Test App")


或者默认情况下,Spark会采用Master,例如sc = SparkContext("Test App")。我已经在我的Cloudera安装的Spark上尝试过了。 - wadhwasahil

1
要更改CPU核心使用量,请在spark-installation-directory/conf中的spark-env.sh文件中设置工作进程要使用的核心数。 这可以通过spark-env.sh文件中的SPARK_EXECUTOR_CORES属性来完成。 默认情况下,该值设置为1。

在spark/conf目录中,我没有看到任何spark-env.sh文件,只有一个spark-evn.sh.template文件,并且该文件中的每一行都被注释掉了。 - MetallicPriest
就是这个。首先将文件重命名为spark-env.sh。然后取消注释并将值设置为16,例如SPARK_EXECUTOR_CORES = 16。 - user93
好的,太棒了,我会尝试的。 - MetallicPriest
通过取消所需行的注释,可以使用相同的文件来设置Spark集群的其他配置属性。 - user93
它仍然只使用1个核心。是否可能在运行时通过在代码内部进行修改来设置此值。我使用Python编写程序。 - MetallicPriest
可以参考这个链接:http://spark.apache.org/docs/1.0.2/configuration.html。可以使用Spark上下文在运行时设置spark.default.parallelism和其他参数。 - user93

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接