在Spark中从数据框的列值中删除空格

9

I have a data frame (business_df) of schema:

|-- business_id: string (nullable = true)
|-- categories: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- city: string (nullable = true)
|-- full_address: string (nullable = true)
|-- hours: struct (nullable = true)
|-- name: string (nullable = true)

我希望创建一个新的数据框 (new_df),使得'name'列中的值不包含任何空格。

我的代码如下:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

udf = UserDefinedFunction(lambda x: x.replace(' ', ''), StringType())
new_df = business_df.select(*[udf(column).alias(name) if column == name else column for column in business_df.columns])
new_df.registerTempTable("vegas")
new_df.printSchema()
vegas_business = sqlContext.sql("SELECT stars, name from vegas limit 10").collect()

我一直收到这个错误:
NameError: 全局名称 'replace' 未定义
这段代码有什么问题?

有什么问题吗?就我所知,它无法用于重现问题 :) 您确定这是您使用的代码吗?您报告的错误表明您在某个地方使用了无限制的“replace”。这种方法也相当低效,但这完全是另一回事。 - zero323
@zero323,你会怎么做呢?我也会创建一些UDF的 :( - Alberto Bonsanto
@AlbertoBonsanto 这取决于逻辑。不清楚 OP 是否只想要空格(这是发生的情况)或空行。无论如何,都可以轻松地完成,而无需使用 Python 批处理作业。 - zero323
5个回答

29

虽然您提供的代码无法重现所描述的问题,但使用Python UDFs来处理像这样简单的任务是相当低效的。如果您只想从文本中删除空格,请使用regexp_replace

from pyspark.sql.functions import regexp_replace, col

df = sc.parallelize([
    (1, "foo bar"), (2, "foobar "), (3, "   ")
]).toDF(["k", "v"])

df.select(regexp_replace(col("v"), " ", ""))

如果您想规范化空行,请使用trim

from pyspark.sql.functions import trim

df.select(trim(col("v")))

如果你想保留开头和结尾的空格,你可以调整regexp_replace

df.select(regexp_replace(col("v"), "^\s+$", ""))

我想做的与您之前回答有关 SQL select 语句和绑定变量的帖子相关。对于这个问题,设置与之前相同:我有一个餐厅列表及其所在城市,例如[[Le Bernadin,曼哈顿][...,...]],我想循环遍历并从包含上述模式数据的数据框中检索匹配的餐厅。然而,由于空格会产生错误,因此我想从查询的数据框中消除这些空格,以便可以比较单词字符串,例如“LeBernadin”。 - Iz M
正则表达式的解决方案似乎更好,但是当我运行它时,df模式被替换为正则表达式表达式(root | -- regexp_replace(name,[\ s'():^],):string(nullable = true))。我有点困惑,不知道这是为什么? - Iz M
因为您没有使用别名。如果您想要一个特定的名称,请使用withColumn或alias。 - zero323

5
这是一个删除字符串中所有空格的函数:

import pyspark.sql.functions as F

def remove_all_whitespace(col):
    return F.regexp_replace(col, "\\s+", "")

您可以像这样使用该函数:

actual_df = source_df.withColumn(
    "words_without_whitespace",
    quinn.remove_all_whitespace(col("words"))
)

remove_all_whitespace函数在quinn库中定义。quinn还定义了single_spaceanti_trim方法来管理空格。PySpark定义了ltrimrtrimtrim方法来管理空格。


就像我说的那样,我使用 regex_replace 的经验并不好,因为它太慢了!使用 rdd.map 可以获得更好的性能。 - Andre Carneiro
@AndreCarneiro,你能分享一下使用rdd.map的修改后代码吗?我正在尝试对英国邮政编码进行一些检查,但正则表达式处理时间非常长。 - E B
@AndreCarneiro - 我认为你的代码不会比 regexp_replace 更快。原生的Spark函数可以被编译器看到,因此它们可以在执行计划中进行优化。也许我的Spark理解有误,但我不这么认为 :) - Powers
@EB 嗯,如果你从未尝试过,你永远不会知道!如果你想的话,可以放心地进行“基准测试”!我会在有时间的时候去做。目前为止,这解决了我的问题!所以,对我来说已经足够好了! - Andre Carneiro

4
正如@zero323所说,很可能是您在某个地方重叠了replace函数。我测试了您的代码,它完美地工作了。
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = sqlContext.createDataFrame([("aaa 111",), ("bbb 222",), ("ccc 333",)], ["names"])
spaceDeleteUDF = udf(lambda s: s.replace(" ", ""), StringType())
df.withColumn("names", spaceDeleteUDF("names")).show()

#+------+
#| names|
#+------+
#|aaa111|
#|bbb222|
#|ccc333|
#+------+

1

正如 @Powers 所展示的,有一种非常好的、易于阅读的函数可以通过名为 quinn 的包来删除空格。您可以在这里找到它: https://github.com/MrPowers/quinn 如果在 Data Bricks 工作区上工作,以下是安装它的说明: https://docs.databricks.com/libraries.html

下面再次演示它的工作原理:

#import library 
import quinn

#create an example dataframe
df = sc.parallelize([
    (1, "foo bar"), (2, "foobar "), (3, "   ")
]).toDF(["k", "v"])

#function call to remove whitespace. Note, withColumn will replace column v if it already exists
df = df.withColumn(
    "v",
    quinn.remove_all_whitespace(col("v"))
)

输出结果: 在此输入图片描述

0

我认为使用regexp_replace的解决方案即使对于少量数据来说也太慢了!所以我尝试找到另一种方法,我认为我找到了!

虽然不是很美观,有点幼稚,但速度很快!你觉得呢?

def normalizeSpace(df,colName):

  # Left and right trim
  df = df.withColumn(colName,ltrim(df[colName]))
  df = df.withColumn(colName,rtrim(df[colName]))

  #This is faster than regexp_replace function!
  def normalize(row,colName):
      data = row.asDict()
      text = data[colName]
      spaceCount = 0;
      Words = []
      word = ''

      for char in text:
          if char != ' ':
              word += char
          elif word == '' and char == ' ':
              continue
          else:
              Words.append(word)
              word = ''

      if len(Words) > 0:
          data[colName] = ' '.join(Words)

      return Row(**data)

      df = df.rdd.map(lambda row:
                     normalize(row,colName)
                 ).toDF()
      return df
schema = StructType([StructField('name',StringType())])
rows = [Row(name='  dvd player samsung   hdmi hdmi 160W reais    de potencia 
bivolt   ')]
df = spark.createDataFrame(rows, schema)
df = normalizeSpace(df,'name')
df.show(df.count(),False)

这将打印

+---------------------------------------------------+
|name                                               |
+---------------------------------------------------+
|dvd player samsung hdmi hdmi 160W reais de potencia|
+---------------------------------------------------+

@Andrei Carneiro,如果我理解你的函数正确的话,你只需要在字符串中添加一个空格,并替换任何超过1个空格的内容,只要它不是前导或尾随空格,是吗? - E B
是的!但是同样的原则也可以应用!但我意识到regexp_replace是解决这个问题的最佳方案。我想我被我的VM愚弄了。类似于:df = df.withColumn(colName,regexp_replace( df[colName] r"\s+","") - Andre Carneiro

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接