从Pyspark UDF调用另一个自定义Python函数

10
假设你有一个文件,我们称之为 udfs.py,其中包含以下内容:
def nested_f(x):
    return x + 1

def main_f(x):
    return nested_f(x) + 1

然后您想将 main_f 函数制作成一个用户自定义函数,并在数据框上运行它:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

如果我们在与两个函数定义的文件(udfs.py)相同的文件中执行此操作,则可以正常工作。但是,尝试从不同的文件(例如main.py)执行此操作会产生错误ModuleNotFoundError: No module named ...
...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

我注意到如果我将nested_f实际嵌套在main_f中,就像这样:

def main_f(x):
    def nested_f(x):
        return x + 1

    return nested_f(x) + 1

一切运行良好。然而,我的目标是将逻辑漂亮地分离在多个函数中,我也可以单独测试这些函数。我认为可以通过使用“spark.sparkContext.addPyFile('...udfs.py')”将“udfs.py”文件(或整个压缩的文件夹)提交给执行程序来解决此问题。但是:
1. 我觉得这有点冗长(特别是如果您需要压缩文件夹等...) 2. 这并不总是容易/可能(例如,“udfs.py”可能正在使用许多其他模块,这些模块随后也需要被提交,导致一定程度的连锁反应...) 3. 使用“addPyFile”存在一些其他不便之处(例如{{link1:autoreload可能停止工作}}等)
因此,问题是:是否有一种方法可以同时完成所有这些操作?
  • 将UDF的逻辑分成几个Python函数
  • 从定义逻辑的文件中使用UDF
  • 不需要使用addPyFile提交任何依赖项

额外加分项是解释为什么这样做可以/为什么不能这样做!


在udfs.py文件中将您的函数注册为UDF。 - eiram_mahera
你试过那个吗?我觉得那行不通。 - Ferrard
@Ferrad:它可以工作。在udfs.py中注册您的UDF,然后在其他模块中导入您注册的UDF。 - eiram_mahera
2个回答

4
对于小型(一个或两个本地文件)依赖项,您可以使用--py-files并枚举它们,对于更大或更多依赖项的情况-最好将其打包为zip或egg文件。
文件udfs.py:
def my_function(*args, **kwargs):
    # code

文件 main.py:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

运行时:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

如果你编写了自己的Python模块,甚至是不需要C编译的第三方模块(比如geoip2),最好创建一个zip或egg文件。

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is 
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

使用pyspark --master yarn(可能是其他非本地主节点选项)时要小心,使用带有--py-files的pyspark shell:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip')  # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule  # libs.zip/MyModule

编辑 - 如何在没有使用 addPyFile ()--py-files 的情况下在执行器上获取函数的答案:

有必要在各个执行器上拥有一个带有函数的指定文件,并通过 PATH 环境变量可达。 因此,我可能会编写一个 Python 模块,然后在执行器上安装它,并在环境中提供它。


谢谢,这是一个有用的答案,尽管它不完全符合我的要求,似乎 --py-files 只是 addPyFile 的CLI等效版本(https://dev59.com/a1oT5IYBdhLWcg3w4SlQ#38072930)。也许我所问的东西不存在,如果是这样,知道原因会很好! - Ferrard
@Ferrard - 如何在不使用addPyFile()--py-files的情况下在执行器上获取函数的答案: 需要在各个执行器上有一个带有函数的指定文件,并且可以通过PATH环境变量访问。因此,我可能会编写一个Python模块,然后将其安装在执行器上,并在环境中提供该模块。 - Geekmoss

1
也许尝试将您的方法组织在一个类中,如下所示:
class temp_class:
    def nested_f(self, x):
      return x + 1

    def main_f(self, x):
      return self.nested_f(x) + 1

这可能有效!!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接