Pyspark and local variables inside UDFs

8

当我定义一个本地变量,比如一个复杂对象的大列表,并在pyspark中的UDF中使用它时会发生什么?让我用这个作为例子:

huge_list = [<object_1>, <object_2>, ..., <object_n>]

@udf
def some_function(a, b):
    l = []
    for obj in huge_list:
        l.append(a.operation(obj))
    return l

df2 = df.withColumn('foo', some_function(col('a'), col('b')))

是否自动广播?还是节点每次与主节点通信以获取其数据?使用这种方法会有什么性能损失?还有更好的方法吗?(考虑到每次应用UDF时从头开始构建huge_list会更糟)

1个回答

2
查看代码,可以看到以下发生的情况:每个UDF都会调用一次此函数,将可调用对象序列化为CloudPickleSerializer 通过此函数。 它还具有比较已序列化的可调用对象大小与硬编码阈值(1Mb)的逻辑。 如果大小大于1Mb,则广播序列化命令,并选择使用pyspark.broadcast.Broadcast类型的对象进行序列化(因为该对象基本上是引用,因此其序列化值显然非常短)。读取序列化可调用对象的位置似乎是在这里。我理解的是,对于每个新任务执行,执行者都会从头开始创建一个python进程。 对于每个使用的UDF,它将获取序列化命令并反序列化它,或者(对于广播)需要从JVM获取广播值并反序列化它。

就我所了解的而言,如果在此处创建pyspark.broadcast.Broadcast对象,则所有执行者都将为该executor创建的所有python worker.py进程保留其值。

因此,如果您想回答某些函数是否会被广播,可以重复pyspark执行的相同操作,并查看序列化对象是否大于1Mb,例如:

from pyspark.serializers import CloudPickleSerializer
ser = CloudPickleSerializer()
x = [i**2 for i in range(10**5)]
v = ser.dumps(lambda : x)
print(len(v)) # 607434 - less than 1Mb, won't be broadcast

关于替代方法,我认为除了每次调用udf函数时创建新对象的方法(已经解释过这种方法太昂贵),唯一的替代方法是创建一个模块,在导入时创建所需的对象。在这种情况下,该对象将为每个任务执行创建一次。因此,这基本上给了你选择:(a)通过CloudPickleSerializer每个任务执行一次反序列化对象,如果您只允许udf函数捕获它;或者(b)通过导入模块每个任务执行一次创建对象。哪种方法更快是一个单独的问题 - 但我想答案可能取决于所涉及的对象。在每种情况下,似乎都很容易测量。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接