在PySpark数据框中选择列

42
我正在寻找一种在 PySpark 中选择数据框列的方法。对于第一行,我知道可以使用 df.first(),但是不确定如何选择列,因为它们没有列名。我有 5 列,并希望逐个循环遍历每一列。
+--+---+---+---+---+---+---+
|_1| _2| _3| _4| _5| _6| _7|
+--+---+---+---+---+---+---+
|1 |0.0|0.0|0.0|1.0|0.0|0.0|
|2 |1.0|0.0|0.0|0.0|0.0|0.0|
|3 |0.0|0.0|1.0|0.0|0.0|0.0|
6个回答

73

试着像这样做:

df.select([c for c in df.columns if c in ['_2','_4','_5']]).show()

我不想硬编码,因为我将不得不为数百列重复这样做。所以我想要循环遍历列并进行一些分析。 - Nivi
@Nivi,我已经更新了我的答案 - 这是你想要的吗? - MaxU - stand with Ukraine
啊!那是我一直在使用的简单方法。现在我突然忘了,谢谢Max :) - Nivi
1
@rishijain,https://stackoverflow.com/posts/46813599/revisions ;-) 原因 - MaxU - stand with Ukraine
1
这帮助了我。 - abdoulsn

35

前两列和5行

 df.select(df.columns[:2]).take(5)

26
您可以使用数组并在选择器中拆包它:
cols = ['_2','_4','_5']
df.select(*cols).show()

1
这个解决方案解决了我的问题,但是 * 运算符是什么意思? - yeliabsalohcin
3
@yeliabsalohcin * 运算符用于解包数组。Pyspark 的 select 函数不接受数组。 - Shadowtrooper
3
如果您的列名中有特殊字符,例如'.',则需要将每个字符串用反引号'`'括起来。请注意细节。 - Jas
你能帮我实现这个吗?选择包含'.'的多列名字? - charlie_boy
是的 @charlie_boy,对于这种情况,您可以使用列表推导式过滤列名:cols = [x for x in columns if "." in x]。这里,columns 是一个包含您的列名的列表。此外,请注意您的列名中的“.”字符,它必须用反引号括起来。 - Shadowtrooper

7
方法select接受一个包含列名(字符串)或表达式(Column)的列表作为参数。要选择列,您可以使用以下方法:
- 列名(字符串):
df.select('col_1','col_2','col_3')

-- 列对象:

import pyspark.sql.functions as F

df.select(F.col('col_1'), F.col('col_2'), F.col('col_3'))

# or

df.select(df.col_1, df.col_2, df.col_3)

# or

df.select(df['col_1'], df['col_2'], df['col_3'])

-- 一列列名或列对象的列表:
df.select(*['col_1','col_2','col_3'])

#or

df.select(*[F.col('col_1'), F.col('col_2'), F.col('col_3')])

#or 

df.select(*[df.col_1, df.col_2, df.col_3])

星号运算符*可以省略,因为它的作用是与其他不接受列表作为参数的函数(如drop)保持一致。

5
使用 df.schema.names:
spark.version
# u'2.2.0'

df = spark.createDataFrame([("foo", 1), ("bar", 2)])
df.show()
# +---+---+ 
# | _1| _2|
# +---+---+
# |foo|  1| 
# |bar|  2|
# +---+---+

df.schema.names
# ['_1', '_2']

for i in df.schema.names:
  # df_new = df.withColumn(i, [do-something])
  print i
# _1
# _2

4

我对 ss.csv 数据集中的一些列感兴趣:

ss_ = spark.read.csv("ss.csv", header= True, 
                      inferSchema = True)
ss_.columns

['Reporting Area', 'MMWR Year', 'MMWR Week', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Current week', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Current week, flag', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Previous 52 weeks Med', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Previous 52 weeks Med, flag', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Previous 52 weeks Max', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Previous 52 weeks Max, flag', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Cum 2018', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Cum 2018, flag', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Cum 2017', 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Cum 2017, flag', 'Shiga toxin-producing Escherichia coli, Current week', 'Shiga toxin-producing Escherichia coli, Current week, flag', 'Shiga toxin-producing Escherichia coli, Previous 52 weeks Med', 'Shiga toxin-producing Escherichia coli, Previous 52 weeks Med, flag', 'Shiga toxin-producing Escherichia coli, Previous 52 weeks Max', 'Shiga toxin-producing Escherichia coli, Previous 52 weeks Max, flag', 'Shiga toxin-producing Escherichia coli, Cum 2018', 'Shiga toxin-producing Escherichia coli, Cum 2018, flag', 'Shiga toxin-producing Escherichia coli, Cum 2017', 'Shiga toxin-producing Escherichia coli, Cum 2017, flag', 'Shigellosis, Current week', 'Shigellosis, Current week, flag', 'Shigellosis, Previous 52 weeks Med', 'Shigellosis, Previous 52 weeks Med, flag', 'Shigellosis, Previous 52 weeks Max', 'Shigellosis, Previous 52 weeks Max, flag', 'Shigellosis, Cum 2018', 'Shigellosis, Cum 2018, flag', 'Shigellosis, Cum 2017', 'Shigellosis, Cum 2017, flag']

但我只需要一点:

columns_lambda = lambda k: k.endswith(', Current week') or k == 'Reporting Area' or k == 'MMWR Year' or  k == 'MMWR Week'

过滤器返回所需列的列表,对该列表进行求值:
sss = filter(columns_lambda, ss_.columns)
to_keep = list(sss)

所需列的列表被拆分为数据帧(dataframe)选择函数的参数,该函数返回仅包含列表中列的数据集:

dfss = ss_.select(*to_keep)
dfss.columns

结果:
['Reporting Area',
 'MMWR Year',
 'MMWR Week',
 'Salmonellosis (excluding Paratyphoid fever andTyphoid fever)†, Current week',
 'Shiga toxin-producing Escherichia coli, Current week',
 'Shigellosis, Current week']

df.select()有一个互补对应的函数:http://spark.apache.org/docs/2.4.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.drop

可以使用该函数来删除列。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接