我认为你已经朝着达到预期输出的正确方向努力了。
针对这个问题,我找到了两个可能的解决方案:一个是使用Spark UDF,另一个是使用Pandas UDF。
Spark UDF
from pyspark.sql.functions import udf
@udf('integer')
def predict_udf(*cols):
return int(braodcast_model.value.predict((cols,)))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))
数据处理函数(Pandas UDF)
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('integer')
def predict_pandas_udf(*cols):
X = pd.concat(cols, axis=1)
return pd.Series(braodcast_model.value.predict(X))
list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_pandas_udf(*list_of_columns))
可重现的例子
在这里,我使用了一个带有 Spark 3.1.2
、pandas==1.2.4
和 pyarrow==4.0.0
的 Databricks 社区集群。
broadcasted_model
是来自 scikit-learn 的简单逻辑回归模型,在乳腺癌数据集上进行训练。
import pandas as pd
import joblib
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from pyspark.sql.functions import udf, pandas_udf
X, y = load_breast_cancer(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=28)
pipe = make_pipeline(StandardScaler(), LogisticRegression())
path = '/databricks/driver/test_model.joblib'
joblib.dump(model, path)
loaded_model = joblib.load(path)
df = spark.createDataFrame(X_test.sample(50, random_state=42))
sc = spark.sparkContext
braodcast_model = sc.broadcast(loaded_model)
我使用了上述两种方法,你会发现无论哪种方法,输出的df_prediction
都是相同的。