从Spark DataFrame获取特定行

47

在Scala Spark数据框中,是否有 df.filter($"column" === lit("value")).take(101).lastOption.getOrElse(Seq.fill(df.columns.length)("null")).apply(colIndex) 的替代方法?我想要从Spark数据框的某一列中选择特定的行。

例如,在上面的R等效代码中选择第100行。


2
可能是如何从sparkContext读取特定行的重复问题。 - Daniel Darabos
5
这是关于DataFrames的,而如何从sparkContext读取特定行则是关于RDDs的。 - Josiah Yoder
9个回答

29
首先,您必须了解DataFrames是分布式的,这意味着您不能以典型的过程式方式访问它们,您必须先进行分析。虽然您在询问关于Scala的问题,但我建议您阅读Pyspark文档,因为它比其他任何文档都有更多的示例。
不过,继续我的解释,我将使用一些RDD API方法,因为所有DataFrame都有一个RDD属性。请看下面的示例,并注意如何获取第二行记录。
df = sqlContext.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
myIndex = 1
values = (df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

print(values[0])
# (u'b', 2)
希望有人提供更简单的解决方案。

你的链接已经失效了。正确的链接应该是:https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html?highlight=dataframe#pyspark.sql.DataFrame - MyrionSC2

20

这就是我在Scala中实现同样功能的方法。我不确定它是否比正确答案更高效,但它需要编写的代码更少。

val parquetFileDF = sqlContext.read.parquet("myParquetFule.parquet")

val myRow7th = parquetFileDF.rdd.take(7).last

1
输出结果会根据数据被分成多少个节点而改变吗? - bshelt141
1
程序相关内容的翻译如下,请仅返回翻译后的文本:顺序不保证,因此每次运行输出可能会有所不同。 - Juh_

16
在PySpark中,如果您的数据集很小(可以适应驱动程序内存),您可以执行:

df.collect()[n]

其中df是DataFrame对象,n是所需的行。 获取该行后,您可以使用以下代码row.myColumnrow["myColumn"]来获取内容,如API文档中所述。


9
下面的getrows()函数应该会获取你想要的特定行。
为了完整起见,我已经写下了完整的代码以便复制产生输出。
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('scratch').getOrCreate()

# Create the dataframe
df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])

# Function to get rows at `rownums`
def getrows(df, rownums=None):
    return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])

# Get rows at positions 0 and 2.
getrows(df, rownums=[0, 2]).collect()

# Output:
#> [(Row(letter='a', name=1)), (Row(letter='c', name=3))]

6

这个在PySpark里对我有效。

df.select("column").collect()[0][0]

2

如果您的工作机器内存足够,可以使用 Scala 方法:

val arr = df.select("column").rdd.collect
println(arr(100))

如果数据框架的模式未知,而您知道"column"字段的实际类型(例如double),则可以按照以下方式获取arr

val arr = df.select($"column".cast("Double")).as[Double].rdd.collect

2
你可以使用以下一行代码来实现:
最初的回答
val arr = df.select("column").collect()(99)

2
更像是:.collect()[1] [0],以防有人需要帮助。 - Fay007

2

当您想从数据框中获取日期列的最大值时,只需获取该值而不是对象类型或行对象信息,可以参考以下代码。

表格名称为"mytable"

max_date = df.select(max('date_col')).first()[0]

2020-06-26
而不是 Row(max(reference_week)=datetime.date(2020, 6, 26))


-2
以下是使用Java-Spark的方法,1)添加一个顺序递增列。2)使用Id选择行号。3)删除该列。
import static org.apache.spark.sql.functions.*;
..

ds = ds.withColumn("rownum", functions.monotonically_increasing_id());
ds = ds.filter(col("rownum").equalTo(99));
ds = ds.drop("rownum");

注意:monotonically_increasing_id从0开始;


1
monotonically_increasing_id - 生成的ID保证单调递增且唯一,但不是连续的。 - Gowrav

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接