在DataFrame中用None/null值替换空字符串

38
我有一个Spark 1.5.0 DataFrame,其中一列中混合了null和空字符串。我想将所有列中的空字符串转换为null(在Python中为None)。由于DataFrame可能有数百列,因此我试图避免对每个列进行硬编码操作。
请参见下面我的尝试,结果出现错误。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

## Create a test DataFrame
testDF = sqlContext.createDataFrame([Row(col1='foo', col2=1), Row(col1='', col2=2), Row(col1=None, col2='')])
testDF.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |    |   2|
## |null|null|
## +----+----+

## Try to replace an empty string with None/null
testDF.replace('', None).show()
## ValueError: value should be a float, int, long, string, list, or tuple

## A string value of null (obviously) doesn't work...
testDF.replace('', 'null').na.drop(subset='col1').show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |null|   2|
## +----+----+

@palsch,不,它不返回列表。它返回一个DataFrame。我更新了问题,并附上了Spark文档的链接。 - dnlbrky
2
@palsch 这不是一般的Python问题!Spark DataFrames是分布式数据结构,通常用于允许在大数据上进行重型数据分析。因此,您的解决方案并不适合。 - eliasah
1
@eliasah 说实话,使用Pythonic的lambda x: None if not x else x并用udf包装起来就可以很好地工作 :) - zero323
1
@zero323 但他要求原帖回传一个列表... - eliasah
哪个答案最有效率? - GadaaDhaariGeek
9个回答

50

就是这么简单:

from pyspark.sql.functions import col, when

def blank_as_null(x):
    return when(col(x) != "", col(x)).otherwise(None)

dfWithEmptyReplaced = testDF.withColumn("col1", blank_as_null("col1"))

dfWithEmptyReplaced.show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## |null|   2|
## |null|null|
## +----+----+

dfWithEmptyReplaced.na.drop().show()
## +----+----+
## |col1|col2|
## +----+----+
## | foo|   1|
## +----+----+

如果您想填充多列,例如可以缩小:

to_convert = set([...]) # Some set of columns

reduce(lambda df, x: df.withColumn(x, blank_as_null(x)), to_convert, testDF)

或者使用推导式:

exprs = [
    blank_as_null(x).alias(x) if x in to_convert else x for x in testDF.columns]

testDF.select(*exprs)

如果您想专门操作字符串字段,请查看答案,作者是罗宾-洛克斯利


谢谢@zero323。您的答案能否自动高效地处理多列?也许列出所有列名,为每个列生成类似于您答案的代码,然后评估代码? - dnlbrky
我看不出为什么你不能这样做。DataFrames是惰性求值的,其余部分只是标准的Python代码。你可以在编辑中找到一些选项。 - zero323
我会接受这个答案,但你能否请先添加@RobinLoxley的那一部分呢?或者,如果你不介意,我可以编辑你的回答。 - dnlbrky
@dnlbrky 这是不公平的。 - zero323
3
.otherwise(None)这个语句是不必要的。对于没有匹配条件的情况,总是返回None(请参见https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.when)。 - asmaier
显示剩余2条评论

27

使用内置方法来实现此功能是正确的方式,因为UDFs不太高效。

df = df.withColumn('myCol', when(col('myCol') == '', None).otherwise(col('myCol')))

我在这里遇到了“str”不可调用的错误。有什么想法吗? - Matt W.
检查你的括号。 - bloodrootfc
我从这里直接复制了。 - Matt W.
我刚刚测试了代码,它是有效的。错误很可能是在数据框操作的其他地方引入的,并且只有在像collect()或show()这样的“操作”之后才会引发错误。如果您不包括我的代码并运行df.show(),是否会出现相同的错误? - bloodrootfc
2
这绝对是正确的解决方案,使用内置函数可以在Spark端进行大量优化。Python UDF非常昂贵,因为Spark执行器(无论您是否使用pyspark)始终在JVM上运行,需要将每一行(确切地说是一批行)序列化,通过套接字发送到子Python进程,评估您的Python函数,序列化结果并从套接字读取回来。 - qwwqwwq

8

在zero323和soulmachine的回答基础上,简单地添加以下内容。将其用于所有StringType字段的转换。

from pyspark.sql.types import StringType
string_fields = []
for i, f in enumerate(test_df.schema.fields):
    if isinstance(f.dataType, StringType):
        string_fields.append(f.name)

enumerate的目的是什么?我的意思是,我知道它的作用,但使用它而不是for field in test_df.schema.fields:有什么原因吗? - hlongmore

7

我的解决方案比我迄今所见的所有方案都要好得多,可以处理您想要的任意字段,如下是一个小函数:

  // Replace empty Strings with null values
  private def setEmptyToNull(df: DataFrame): DataFrame = {
    val exprs = df.schema.map { f =>
      f.dataType match {
        case StringType => when(length(col(f.name)) === 0, lit(null: String).cast(StringType)).otherwise(col(f.name)).as(f.name)
        case _ => col(f.name)
      }
    }

    df.select(exprs: _*)
  }

您可以轻松地在Python中重写上面的函数。

我从@liancheng那里学到了这个技巧。


2
如果您正在使用Python,可以检查以下内容。

+----+-----+----+
|  id| name| age|
+----+-----+----+
|null|name1|  50|
|   2|     |    |
|    |name3|null|
+----+-----+----+

def convertToNull(dfa):
   for i in dfa.columns:
    dfa = dfa.withColumn(i , when(col(i) == '', None ).otherwise(col(i)))
  return dfa

convertToNull(dfa).show()

+----+-----+----+
|  id| name| age|
+----+-----+----+
|null|name1|  50|
|   2| null|null|
|null|name3|null|
+----+-----+----+

0

我会在@zero323solution中添加trim以处理多个空格的情况:

def blank_as_null(x):
    return when(trim(col(x)) != "", col(x))

0

感谢 @zero323 , @Tomerikoo 和 @Robin Loxley
可直接使用的函数:

def convert_blank_to_null(df, cols=None):
    from pyspark.sql.functions import col, when, trim
    from pyspark.sql.types import StringType

    def blank_as_null(x):
        return when(trim(col(x)) == "", None).otherwise(col(x))
    # Don't know how to parallel
    for f in (df.select(cols) if cols else df).schema.fields:
        if isinstance(f.dataType, StringType):
            df = df.withColumn(f.name, blank_as_null(f.name))
    return df

0

这可以帮助我清理数值。

对于所有列:

address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in address_df.columns]).distinct()
address_sanitize_df.show()

针对特定列:
sanitize_cols=["address_line2","zip4"]
address_sanitize_df = address_df.select([when(col(c) == "", None).otherwise(col(c)).alias(c) for c in sanitize_cols])
address_sanitize_df.show()

-2

这是soulmachine解决方案的另一个版本,但我认为你不能像这样轻松地将其翻译成Python:

def emptyStringsToNone(df: DataFrame): DataFrame = {
  df.schema.foldLeft(df)(
    (current, field) =>
      field.dataType match {
        case DataTypes.StringType =>
          current.withColumn(
            field.name,
            when(length(col(field.name)) === 0, lit(null: String)).otherwise(col(field.name))
          )
        case _ => current
      }
  )
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接