PySpark删除行

28

如何从PySpark的RDD中删除行?特别是第一行,因为我的数据集中通常包含列名。通过查阅API,我似乎找不到一种简单的方法来完成此操作。当然,我可以通过Bash/HDFS来完成这个任务,但我想知道是否可以在PySpark内部完成。


1
使用 filter 过滤掉不良行。 - aaronman
如果您只想删除第一行怎么办?为了论证而言,假设我们不能使用行向量x的任何信息,即无法执行“lambda x:(使用x的某些条件)”。 - Jack
请查看我的答案,可能更接近您所寻找的内容。 - aaronman
6个回答

23

针对PySpark:

根据@maasg的说法,您可以这样做:

header = rdd.first()
rdd.filter(lambda line: line != header)

但从技术角度看,这并不完全正确,因为您可能会排除包含数据的行以及标题。不过,这对我来说似乎有效:

def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)

同样地:

rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])

我是Spark的新手,无法理智地评论哪种方法最快。


你可以解释一下这段代码的含义吗(我是JS开发者)?return iter(list(itr)[1:] if itr_index == 0 else itr)?1) - iter 接受一个 (object[, sentinel]),所以我猜测 iter 接受一个 itr 可迭代对象列表(行),然后使用Python的 range 运算符,从第二个索引(基于0)开始向下迭代,直到 itr_index == 0,否则保持返回 itr 行?我问这个问题是因为我正在使用相同的东西,但是字段的初始行没有出现,而是数据的第一行变成了字段。 - user3871
1
iter可能会让问题变得混淆。如果rdd.mapParitionsWithIndex返回分区的索引和分区数据作为列表,那么它只是itr[1:] if itr_index == 0 else itr-即如果它是第一个分区(即itr_index == 0),则排除第一行(即标题),如果不是第一个分区(即没有标题),则返回整个分区。iterlist之所以出现在这里,是因为它实际上使用的是可迭代对象而不是列表。顺便说一句,我相信有比iter(list(itr)[1:])更有效的方法。 - user4081921

20

据我所知,没有简单的方法可以做到这一点。

不过,以下方法应该可以解决问题:

val header = data.first
val rows = data.filter(line => line != header)

这是合理的。谢谢! - Jack
应该不是data.first吗?data.take(1)将返回一个长度为1的Array[T]。 - Bar
如果您的列表中有重复项,那么这将失败。 - Sebastian Hojas
1
@SebastianHojas 一旦您的数据中有了标题,接下来,如果您正在读取具有相同标题的多个文件,则此操作将起作用。 基本上,它不是删除第一行,而是删除任何看起来像第一行的行。 - Carlos Bribiescas

5

假设您正在使用Python 3,使用PySpark(Python API)实现此目标的简单方法如下:

noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()

Python3 不允许元组解包。因此,使用 lambda tup: tup[1] >0 更好。 - H S Rathore
1
感谢 @HSRathore,我刚刚更新了代码片段,以避免在 Python 3 中出现混淆。 - noleto

3
我做了一些与各种解决方案的分析,并得到以下结果:
集群配置
集群
- 集群1: 4核心16GB - 集群2: 4核心16GB - 集群3: 4核心16GB - 集群4: 2核心8GB
数据
7百万行,4列
#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)

#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)

#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
     if(index==0):
          for subIndex,item in enumerate(iterator):
               if subIndex > 0:
                    yield item
     else:
          yield iterator

data=data.mapPartitionsWithIndex(dropFirstRow)

我认为解决方案3是最具可扩展性的。


1

个人认为最简单的方法是使用过滤器来去除这些内容。但根据您的评论,我有另一种方法。将RDD粘合在一起,使每个分区都成为一个数组(我假设每个分区有1个文件,并且每个文件的顶部都有问题的行),然后跳过第一个元素(这是使用scala api实现的)。

data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index

请记住,RDD的一个重要特征是它们是不可变的,因此自然而然地删除一行是一件棘手的事情

更新: 更好的解决方案。
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
与glom相同,但不会产生将所有内容放入数组的开销,因为在这种情况下,x是一个迭代器


1
我已经测试过了Spark2.1。假设您想在不知道文件列数的情况下删除前14行。
sc = spark.sparkContext
lines = sc.textFile("s3://location_of_csv")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

withColumn是一个df函数。因此,在上述情况中使用的RDD样式下面将无法工作。

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)

parts = parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0]) 因为parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])不会直接修改原有数据。 - H S Rathore
这对我有用。 注意筛选参数的略微更改。 谢谢!parts.withColumn("index",monotonically_increasing_id()).filter(col('index') > 14) - adm-gis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接