如何从PySpark的RDD中删除行?特别是第一行,因为我的数据集中通常包含列名。通过查阅API,我似乎找不到一种简单的方法来完成此操作。当然,我可以通过Bash/HDFS来完成这个任务,但我想知道是否可以在PySpark内部完成。
如何从PySpark的RDD中删除行?特别是第一行,因为我的数据集中通常包含列名。通过查阅API,我似乎找不到一种简单的方法来完成此操作。当然,我可以通过Bash/HDFS来完成这个任务,但我想知道是否可以在PySpark内部完成。
针对PySpark:
根据@maasg的说法,您可以这样做:
header = rdd.first()
rdd.filter(lambda line: line != header)
但从技术角度看,这并不完全正确,因为您可能会排除包含数据的行以及标题。不过,这对我来说似乎有效:
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
rdd.mapPartitionsWithIndex(remove_header)
同样地:
rdd.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda tup: tup[0])
我是Spark的新手,无法理智地评论哪种方法最快。
return iter(list(itr)[1:] if itr_index == 0 else itr)
?1) - iter
接受一个 (object[, sentinel])
,所以我猜测 iter
接受一个 itr
可迭代对象列表(行),然后使用Python的 range
运算符,从第二个索引(基于0)开始向下迭代,直到 itr_index == 0
,否则保持返回 itr
行?我问这个问题是因为我正在使用相同的东西,但是字段的初始行没有出现,而是数据的第一行变成了字段。 - user3871iter
可能会让问题变得混淆。如果rdd.mapParitionsWithIndex
返回分区的索引和分区数据作为列表,那么它只是itr[1:] if itr_index == 0 else itr
-即如果它是第一个分区(即itr_index == 0
),则排除第一行(即标题),如果不是第一个分区(即没有标题),则返回整个分区。iter
和list
之所以出现在这里,是因为它实际上使用的是可迭代对象而不是列表。顺便说一句,我相信有比iter(list(itr)[1:])
更有效的方法。 - user4081921据我所知,没有简单的方法可以做到这一点。
不过,以下方法应该可以解决问题:
val header = data.first
val rows = data.filter(line => line != header)
假设您正在使用Python 3,使用PySpark(Python API)实现此目标的简单方法如下:
noHeaderRDD = rawRDD.zipWithIndex().filter(lambda row_index: row_index[1] > 0).keys()
lambda tup: tup[1] >0
更好。 - H S Rathore#Solution 1
# Time Taken : 40 ms
data=sc.TextFile('file1.txt')
firstRow=data.first()
data=data.filter(lambda row:row != firstRow)
#Solution 2
# Time Taken : 3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
return iter(list(iterator)[1:]) if index==0 else iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
#Solution 3
# Time Taken : 0.3 seconds
data=sc.TextFile('file1.txt')
def dropFirstRow(index,iterator):
if(index==0):
for subIndex,item in enumerate(iterator):
if subIndex > 0:
yield item
else:
yield iterator
data=data.mapPartitionsWithIndex(dropFirstRow)
我认为解决方案3是最具可扩展性的。
个人认为最简单的方法是使用过滤器来去除这些内容。但根据您的评论,我有另一种方法。将RDD粘合在一起,使每个分区都成为一个数组(我假设每个分区有1个文件,并且每个文件的顶部都有问题的行),然后跳过第一个元素(这是使用scala api实现的)。
data.glom().map(x => for (elem <- x.drop(1){/*do stuff*/}) //x is an array so just skip the 0th index
请记住,RDD的一个重要特征是它们是不可变的,因此自然而然地删除一行是一件棘手的事情
更新:
更好的解决方案。
rdd.mapPartions(x => for (elem <- x.drop(1){/*do stuff*/} )
与glom相同,但不会产生将所有内容放入数组的开销,因为在这种情况下,x是一个迭代器
sc = spark.sparkContext
lines = sc.textFile("s3://location_of_csv")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
withColumn是一个df函数。因此,在上述情况中使用的RDD样式下面将无法工作。
parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)
parts = parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
因为parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])
不会直接修改原有数据。 - H S Rathoreparts.withColumn("index",monotonically_increasing_id()).filter(col('index') > 14)
- adm-gis
filter
过滤掉不良行。 - aaronman