在Python中创建自定义的Spark RDD

5
有没有可能在Python中扩展Spark的RDD以添加自定义运算符?如果不可能,如何包装Scala代码以扩展RDD的类,例如这里的类:http://blog.madhukaraphatak.com/extending-spark-api/ 编辑:我正在尝试创建一个新的RDD,比如PersonRDD,并在PersonRDD上添加一组新的运算符,例如PersonRDD.computeMedianIncome()。根据下面的链接,在Python中做到这一点并不容易。但是,由于这是一个旧线程,我想知道是否有任何新的更新。如果没有,我想使用Scala来完成它,但我不确定如何使用Py4J从Python中调用该类(mail-archives.us.apache.org/mod_mbox/spark-user/201308.mbox/…)
非常感谢任何建议或帮助。
Mandy
2个回答

4

在分布式环境中计算精确中位数需要一些努力,因此假设您想要类似于RDD中所有值的平方。我们称这个方法为squares,并假设它应按以下方式工作:

assert rdd.squares().collect() == rdd.map(lambda x: x * x).collect()

1. 修改 pyspark.RDD 定义:

from pyspark import RDD

def squares(self):
    return self.map(lambda x: x * x)

RDD.squares = squares
rdd = sc.parallelize([1, 2, 3])
assert rdd.squares().collect() == [1, 4, 9]

注意:如果您修改类定义,每个实例都将可以访问squares

2. 创建RDD子类:

class RDDWithSquares(RDD):
    def squares(self):
        return self.map(lambda x: x * x)

rdd = sc.parallelize([1, 2, 3])
rdd.__class__ = RDDWithSquares # WARNING: see a comment below

给一个类分配一个类别是一种不良的hack,因此在实践中应该以适当的方式创建RDD(例如,请参见context.parallelize的实现)。

3. 将方法添加到实例

import types

rdd = sc.parallelize([1, 2, 3])
# Reusing squares function defined above
rdd.squares = types.MethodType(squares, rdd)

免责声明

首先,我没有测试过这些内容足够长的时间来确保没有隐藏的问题。

此外,我认为这并不值得大惊小怪。如果没有静态类型检查,很难找到任何好处,您可以使用函数、柯里化和pipes以更清晰的方式获得类似的结果。

from toolz import pipe
pipe(
    sc.parallelize([1, 2, 3]),
    squares,
    lambda rdd: rdd.collect())

感谢@zero323。我本来希望能像在Scala或Java中那样干净地继承RDD,而不是通过hack解决方案。解决方案1行不通,因为用户可能会在错误的类型上调用新运算符。2对于RDD的子类(例如:newAPIHadoopFile)无效,但对我可能有效...再次感谢您花时间提出解决方案。 - mandy
好的,你必须记住几件事情。在Python中,打字纪律与Scala有很大不同,并且Python RDD没有类型参数化。从Scala的角度来看,每个Python RDD都像是RDD [Any]。因此,您有责任仅调用适用的方法。类似于sc.parallelize(range(3)).groupByKey()这样的东西显然是没有意义的,并且在执行转换时会失败,但在类型级别上没有任何问题。 - zero323
与Scala不同的是,您可以在运行时修改现有类。没有隐式转换地狱,我们知道“显式优于隐式”。如果添加一个方法,类型仍然没有问题。它唯一说明的是,根据实例的状态,调用此方法可能是有效的。从概念上讲,这可能是您可以获得的最接近Scala隐式方法的东西。尽管如此,我认为管道函数调用更安全,更符合Python风格,并且如果想在Spark之上创建DSL,它也同样有效。 - zero323
这非常有道理。显然你在Spark方面的经验比我丰富,所以我会考虑管道示例。 - mandy

0

我遇到了类似的问题,虽然我还没有在我的扩展版本上测试普通RDD的全部功能,但它正在按预期工作。这确实需要一些工作,我不确定这是否是最佳解决方案,但我所做的就是扩展RDD类,通过将它们传递给新类的构造函数重新实现返回新RDD的方法,并向该类添加方法。以下是代码的一小部分:

from pyspark.rdd import RDD, PipelinedRDD

class CustomRDD(RDD):
    def __init__(self, rdd, first=True):
        if first:
            rdd = custom_parser(rdd)
        self._jrdd = rdd._jrdd
        self.is_cached = rdd.is_cached
        self.is_checkpointed = rdd.is_checkpointed
        self.ctx = rdd.ctx
        self._jrdd_deserializer = rdd._jrdd_deserializer
        self._id = rdd._id
        self.partitioner = rdd.partitioner

    def mapPartitionsWithIndex(self, f, preservesPartition=False):
        return CustomRDD(PipelinedRDD(self, f, preservesPartition), False)

    def union(self, other):
        return WebtrendsRDD(super(WebtrendsRDD, self).union(other), False)

    def custom_method(self):
        return CustomRDD(self.filter(lambda x: x.has_property()), False)

mapPartitionsWithIndex方法被许多其他RDD功能调用,因此它涵盖了很多内容,但是还有许多其他方法需要使用自己的构造函数进行包装,以便继续获取自己的CustomRDD,就像我在union中所做的那样。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接