PySpark - 从Dataframe中删除第一行

3
我有一个带头部的.txt文件,我想把它去掉。该文件看起来像这样:
Entry  Per  Account     Description               
 16524  01  3930621977  TXNPUES                     
191675  01  2368183100  OUNHQEX            
191667  01  3714468136  GHAKASC             
191673  01  2632703881  PAHFSAP              
 80495  01  2766389794  XDZANTV                    
 80507  01  4609266335  BWWYEZL                   
 80509  01  1092717420  QJYPKVO                  
 80497  01  3386366766  SOQLCMU                  
191669  01  5905893739  FYIWNKA             
191671  01  2749355876  CBMJTLP 

# Create spark session
spark = SparkSession.builder.master("local").appName("fixed-width"                          )\
                                            .config("spark.some.config.option", "some-value")\
                                            .getOrCreate()

# Read in fixed-width text file into DataFrame
df = spark.read.option("header"     , "true" )\
               .option("inferSchema", "true" )\
               .text(file                    )
df.show()
df.printSchema()

这将返回:

+--------------------+
|               value|
+--------------------+
|Entry  Per  Accou...|
| 16524  01  39306...|
|191675  01  23681...|
|191667  01  37144...|
|191673  01  26327...|
| 80495  01  27663...|
| 80507  01  46092...|
| 80509  01  10927...|
| 80497  01  33863...|
|191669  01  59058...|
|191671  01  27493...|
+--------------------+

root
 |-- value: string (nullable = true)

我可以获取头部信息:

header = df.first()
header

返回以下内容:

Row(value='Entry  Per  GL Account  Description               ')

然后将其拆分成不同的列:

# Take the fixed width file and split into 3 distinct columns
sorted_df = df.select(
    df.value.substr( 1,  6).alias('Entry'      ),
    df.value.substr( 8,  3).alias('Per'        ),
    df.value.substr(12, 11).alias('GL Account' ),
    df.value.substr(24, 11).alias('Description'),
)

sorted_df.show()
sorted_df.printSchema()

这将返回:

+------+---+-----------+-----------+
| Entry|Per| GL Account|Description|
+------+---+-----------+-----------+
|Entry |Per| GL Account| Descriptio|
| 16524| 01| 3930621977| TXNPUES   |
|191675| 01| 2368183100| OUNHQEX   |
|191667| 01| 3714468136| GHAKASC   |
|191673| 01| 2632703881| PAHFSAP   |
| 80495| 01| 2766389794| XDZANTV   |
| 80507| 01| 4609266335| BWWYEZL   |
| 80509| 01| 1092717420| QJYPKVO   |
| 80497| 01| 3386366766| SOQLCMU   |
|191669| 01| 5905893739| FYIWNKA   |
|191671| 01| 2749355876|   CBMJTLP |
+------+---+-----------+-----------+

现在您可以看到,在这里我的数据框的第一行仍然显示为标题。我不确定如何将其删除。 .iloc 不可用,我经常看到这种方法,但它只适用于 RDD。
header = rdd.first()
rdd.filter(lambda line: line != header)

那么有哪些可替代的选择呢?

1个回答

11
你可以选择使用.csv.text.textFile格式来处理这个案例。
使用.csv方法读取文件,这样Spark就可以读取文件头(我们不必过滤掉文件头)。 1.使用 .csv: .csv方法返回df
df=spark.read.option("header","true").csv("path")
df.show(10,False)
#+----------------------------------------------------+
#|Entry  Per  Account     Description                 |
#+----------------------------------------------------+
#| 16524  01  3930621977  TXNPUES                     |
#|191675  01  2368183100  OUNHQEX                     |
#|191667  01  3714468136  GHAKASC                     |
#|191673  01  2632703881  PAHFSAP                     |
#| 80495  01  2766389794  XDZANTV                     |
#| 80507  01  4609266335  BWWYEZL                     |
#| 80509  01  1092717420  QJYPKVO                     |
#| 80497  01  3386366766  SOQLCMU                     |
#|191669  01  5905893739  FYIWNKA                     |
#|191671  01  2749355876  CBMJTLP                     |
#+----------------------------------------------------+

2.使用.text:

.text 返回 df 的文本内容。

#can't read header
df=spark.read.text("path")
#get the header
header=df.first()[0]
#filter the header out from data
df.filter(~col("value").contains(header)).show(10,False)
#+----------------------------------------------------+
#|value                                               |
#+----------------------------------------------------+
#| 16524  01  3930621977  TXNPUES                     |
#|191675  01  2368183100  OUNHQEX                     |
#|191667  01  3714468136  GHAKASC                     |
#|191673  01  2632703881  PAHFSAP                     |
#| 80495  01  2766389794  XDZANTV                     |
#| 80507  01  4609266335  BWWYEZL                     |
#| 80509  01  1092717420  QJYPKVO                     |
#| 80497  01  3386366766  SOQLCMU                     |
#|191669  01  5905893739  FYIWNKA                     |
#|191671  01  2749355876  CBMJTLP                     |
#+----------------------------------------------------+

然后使用。
sorted_df = df.select(
    df.value.substr( 1,  6).alias('Entry'      ),
    df.value.substr( 8,  3).alias('Per'        ),
    df.value.substr(12, 11).alias('GL Account' ),
    df.value.substr(24, 11).alias('Description'),
)

sorted_df.show()
sorted_df.printSchema()

3.使用.textFile

.textFile 方法返回一个 rdd 对象。

#get header into a variable
header=spark.sparkContext.textFile("path").first()

#.textfile and filter out the header
spark.sparkContext.textFile("path").\
filter(lambda l :not str(l).startswith(header)).\
map(lambda x:x.split()).map(lambda x:(str(x[0].strip()),str(x[1].strip()),str(x[2].strip()),str(x[3].strip()))).\
toDF(["Entry","Per","Account","Description"]).\
show()
#+------+---+----------+-----------+
#| Entry|Per|   Account|Description|
#+------+---+----------+-----------+
#| 16524| 01|3930621977|    TXNPUES|
#|191675| 01|2368183100|    OUNHQEX|
#|191667| 01|3714468136|    GHAKASC|
#|191673| 01|2632703881|    PAHFSAP|
#| 80495| 01|2766389794|    XDZANTV|
#| 80507| 01|4609266335|    BWWYEZL|
#| 80509| 01|1092717420|    QJYPKVO|
#| 80497| 01|3386366766|    SOQLCMU|
#|191669| 01|5905893739|    FYIWNKA|
#|191671| 01|2749355876|    CBMJTLP|
#+------+---+----------+-----------+

2
这太棒了!我很感激你提供的多种方法。我发现第二种方法,即使用.text,效果很好。现在我只需要对其应用模式。再次感谢!特别是那行代码: df.filter(~f.col("value").contains(header)) 正是我一直在寻找的。 - Dave Voyles

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接