如何在PySpark中广播RDD?

4

在Python中是否可以广播RDD?

我正在阅读《Spark高级分析:大规模数据学习模式》一书,第三章需要广播一个RDD。我试图使用Python而不是Scala来跟随示例。

无论如何,即使是这个简单的示例,我也遇到了错误:

my_list = ["a", "d", "c", "b"]
my_list_rdd = sc.parallelize(my_list)
sc.broadcast(my_list_rdd)

错误为:
"It appears that you are attempting to broadcast an RDD or reference an RDD from an "
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an
action or transformation. RDD transformations and actions can only be invoked by the driver, n
ot inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) i
s invalid because the values transformation and count action cannot be performed inside of the
 rdd1.map transformation. For more information, see SPARK-5063.

我不太理解错误信息中的“动作或转换”指的是什么。
我正在使用 spark-2.1.1-hadoop2.7
重要修改:这本书是正确的。我只是没有读清楚,广播的不是 RDD,而是使用 collectAsMap() 得到的映射版本。
谢谢!
2个回答

5

在Python中广播RDD是否可能?

简而言之 不可能。

当你思考RDD的本质时,你会发现这是不可能的。在RDD中没有任何可以广播的内容。它太过脆弱(可以这么说)。

RDD是一种数据结构,用于描述某些数据集上的分布式计算。通过RDD的特性,您可以描述计算的内容和方式。它是一个抽象的实体。

引用RDD的scaladoc:

表示一个不可变的、分区的元素集合,可以并行操作。每个RDD内部都有五个主要属性:
  • 分区列表
  • 计算每个分片的函数
  • 依赖其他RDD的列表
  • 可选地,用于键值RDD的分区器(例如,指定RDD是哈希分区的)
  • 可选地,计算每个分片的首选位置列表(例如,HDFS文件的块位置)

你几乎无法广播什么(引用SparkContext.broadcast方法的scaladoc):

广播[T](value: T)(implicit arg0: ClassTag[T]): Broadcast[T] 将只读变量广播到集群,并返回一个用于在分布式函数中读取它的 org.apache.spark.broadcast.Broadcast 对象。该变量仅会发送一次到每个集群。
你只能广播实际值,但 RDD 只是值的容器,这些值仅在执行器处理其数据时才可用。
来自广播变量: 广播变量允许程序员将只读变量缓存在每台机器上,而不是随任务一起传输副本。例如,它们可以用于以高效的方式使每个节点具有大型输入数据集的副本。 并且在同一文档中后面写道: 这意味着只有当多个阶段的任务需要相同的数据或者将数据以反序列化形式缓存非常重要时,显式创建广播变量才有用。 然而,您可以使用collect方法收集 RDD 所包含的数据集并进行广播,如下所示:
my_list = ["a", "d", "c", "b"]
my_list_rdd = sc.parallelize(my_list)
sc.broadcast(my_list_rdd.collect) // <-- collect the dataset

在“收集数据集”步骤中,数据集离开RDD空间,变成一个本地可用的集合,即Python值,然后可以进行广播。

1
回答不错,但对于使用大型RDD的新用户来说,始终需要警告使用collect的风险。 - eliasah
收集这个数据集不就等于一开始就不将其作为RDD吗? - rjurney

2
您不能广播RDD。在处理RDD时,您可以向所有执行器节点广播多次使用的值。因此,在代码中广播RDD之前,应该先收集RDD。collect将RDD转换为本地Python对象,可以无问题地进行广播。
sc.broadcast(my_list_rdd.collect())

当你广播一个值时,该值会被序列化并发送到所有执行节点。你的my_list_rdd只是对分布在多个节点上的RDD的引用。将此引用序列化并将其广播到所有工作节点中并没有任何意义。因此,你应该收集RDD的值并广播该值。
有关Spark广播的更多信息,请单击这里
注意:如果你的RDD太大,应用程序可能会遇到OutOfMemory错误。 collect方法会将所有数据拉到驱动程序的内存中,而驱动程序的内存通常不足够大。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接