我正在使用pyspark,使用spark-csv将一个大型csv文件加载到dataframe中,作为预处理步骤,我需要对包含json字符串的一个列中可用数据应用各种操作。这将返回X个值,每个值都需要存储在自己单独的列中。
该功能将在UDF中实现,但是我不确定如何从UDF返回值列表并将其提供给各个列。以下是一个简单的示例:
(...)
from pyspark.sql.functions import udf
def udf_test(n):
return [n/2, n%2]
test_udf=udf(udf_test)
df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)
这会产生以下结果:
+------+----------+--------------------+
|amount|trans_date| test|
+------+----------+--------------------+
| 28.0|2016-02-07| [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows
现在这个示例中,UDF返回的两个值被视为字符串,如何将它们存储在不同的列中是最佳的方法?
df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()
root
|-- amount: float (nullable = true)
|-- trans_date: string (nullable = true)
|-- test: string (nullable = true)
foobars.select("foobar.*")
来代替逐个列出每一列的名字。该方法不仅简洁,而且效果相同。 - paultdf.select("x", test_udf("y").alias("foobar")).select("x", "foobar.*")
- mjvfrom pyspark.sql.types import StructType, StructField, FloatType
- alvaro nortes