在Spark DataFrame中添加一个空列

95

正如在许多 其他网站上提到的,向现有DataFrame添加新列并不简单。尽管在分布式环境中效率低下,但拥有此功能非常重要,特别是在尝试使用unionAll连接两个DataFrame时。

添加一个null列以便于使用unionAll,最优雅的解决方案是什么?

我的版本如下:

from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
new_df = old_df.withColumn('new_column', to_none(df_old['any_col_from_old']))
6个回答

189
在这里,你只需要导入StringType并使用litcast即可:
from pyspark.sql.types import StringType
from pyspark.sql.functions import lit

new_df = old_df.withColumn('new_column', lit(None).cast(StringType()))

一个完整的示例:
df = sc.parallelize([row(1, "2"), row(2, "3")]).toDF()
df.printSchema()
# root
#  |-- foo: long (nullable = true)
#  |-- bar: string (nullable = true)

new_df = df.withColumn('new_column', lit(None).cast(StringType()))

new_df.printSchema()
# root
#  |-- foo: long (nullable = true)
#  |-- bar: string (nullable = true)
#  |-- new_column: string (nullable = true)

new_df.show()
# +---+---+----------+
# |foo|bar|new_column|
# +---+---+----------+
# |  1|  2|      null|
# |  2|  3|      null|
# +---+---+----------+

这里可以找到一个Scala版本的等价代码:创建新的带有空/空值字段的Dataframe


如何在有条件的情况下进行操作,如果首先不存在该列呢?我试图使用UDF并将DF传递给它,然后执行new_column not in df.columns检查,但无法使其正常工作。 - Gopala
我也看了它,但我仍然无法将其有条件地合并到 withColumn('blah', where(has_column(df['blah']) == False).... 这种结构中。可能缺少某些语法结构。我想添加一个带有 Null 的列,如果它不存在的话。这个答案做了前者,另一个检查后者。 - Gopala
@Gopala df if has_column(df) else df.withColumn(....) - 没有Spark特定的内容。 - zero323
1
糟糕...我总是搞混Python语法何时适用,何时不适用。例如,在withColumn中不能有条件代码,必须使用UDF。谢谢! - Gopala

12
我建议将lit(None)转换为NullType而不是StringType. 这样,如果我们需要在该列上过滤非空行,只需执行以下操作即可。
df = sc.parallelize([Row(1, "2"), Row(2, "3")]).toDF()

new_df = df.withColumn('new_column', lit(None).cast(NullType()))

new_df.printSchema() 

df_null = new_df.filter(col("new_column").isNull()).show()
df_non_null = new_df.filter(col("new_column").isNotNull()).show()

同时,如果您将数据类型转换为StringType,请注意不要使用带引号的lit("None"),因为在使用col("new_column")进行过滤条件搜索时,它会失败并返回isNull()。请注意保留HTML标记。

2
错误:Parquet 数据源不支持空数据类型。使用 StringType() 可以解决。 - ZygD
根据您的用例,这可能是一个非常糟糕的想法,因为它将列转换为Void类型,因此如果您将其写入某种存储(例如delta格式),除了null之外,无法插入任何内容。 - Chris Ivan

5

没有使用 import StringType 选项

df = df.withColumn('foo', F.lit(None).cast('string'))

完整示例:

from pyspark.sql import functions as F
df = spark.range(1, 3).toDF('c')

df = df.withColumn('foo', F.lit(None).cast('string'))

df.printSchema()
#     root
#      |-- c: long (nullable = false)
#      |-- foo: string (nullable = true)

df.show()
#     +---+----+
#     |  c| foo|
#     +---+----+
#     |  1|null|
#     |  2|null|
#     +---+----+

1

1
df1.selectExpr("school","null as col1").show()

输出:

+--------------------+----+
|              school|col1|
+--------------------+----+
|Shanghai Jiao Ton...|null|
|   Peking University|null|
|Shanghai Jiao Ton...|null|
|    Fudan University|null|
|    Fudan University|null|
| Tsinghua University|null|
|Shanghai Jiao Ton...|null|
| Tsinghua University|null|
| Tsinghua University|null|
|   Peking University|null|

或者在pyspark 2.2+中

df1.pandas_api().assign(new_column=None)

0
为什么不直接使用这个呢?
from pyspark.sql.functions import lit
df.withColumn("column_name", lit("")).show()

这会导致一个“空字符串”(长度为0的字符串)。这与null不同。事实上,很多时候,在准备数据进行分析时,空字符串会被转换为null。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接