从Spark DataFrame中选择特定列

43

我已将CSV数据加载到Spark DataFrame中。

我需要将此DataFrame切片成两个不同的DataFrame,其中每个都包含原始DataFrame中的一组列。

如何基于列选择子集以创建Spark DataFrame?

8个回答

49

如果你想将你的数据框分成两个不同的数据框,请使用你想要的不同列对它进行两次选择操作。

 val sourceDf = spark.read.csv(...)
 val df1 = sourceDF.select("first column", "second column", "third column")
 val df2 = sourceDF.select("first column", "second column", "third column")

注意,这当然意味着sourceDf将被评估两次,所以如果它可以适应分布式内存,并且您在两个数据帧中使用了大多数列,那么缓存它可能是一个好主意。 如果它有许多您不需要的额外列,则可以首先对其进行选择,以选择您将需要的列,以便将所有这些额外数据存储在内存中。


如何进行缓存?我是Spark的新手。 - A.HADDAD
1
@A.HADDAD,您可以在数据框上调用cache()persist()方法以获取缓存版本。 persist允许您选择缓存级别,而cache仅使用默认的仅内存缓存级别调用persist方法。 - puhlen

11

在Dataframe中,有多种选择子集列的选项(特别是在Scala中)。下面的几行展示了这些选项,其中大部分都在Column的ScalaDocs中有文档记录:

import spark.implicits._
import org.apache.spark.sql.functions.{col, column, expr}

inputDf.select(col("colA"), col("colB"))
inputDf.select(inputDf.col("colA"), inputDf.col("colB"))
inputDf.select(column("colA"), column("colB"))
inputDf.select(expr("colA"), expr("colB"))

// only available in Scala
inputDf.select($"colA", $"colB")
inputDf.select('colA, 'colB) // makes use of Scala's Symbol

// selecting columns based on a given iterable of Strings
val selectedColumns: Seq[Column] = Seq("colA", "colB").map(c => col(c))
inputDf.select(selectedColumns: _*)

// Special cases
col("columnName.field")     // Extracting a struct field
col("`a.column.with.dots`") // Escape `.` in column names.

// select the first or last 2 columns
inputDf.selectExpr(inputDf.columns.take(2): _*)
inputDf.selectExpr(inputDf.columns.takeRight(2): _*)

在Scala中可以使用$,因为它提供了一个隐式类,将字符串转换为列(Column)并使用$方法:

implicit class StringToColumn(val sc : scala.StringContext) extends scala.AnyRef {
  def $(args : scala.Any*) : org.apache.spark.sql.ColumnName = { /* compiled code */ }
}

通常,如果您要从一个DataFrame派生多个DataFrames,则在创建其他DataFrame之前将原始DataFrame persist 可能会提高性能。最后,您可以 unpersist 原始DataFrame。

请记住,列名 不是在编译时解析的,而是在查询执行的分析器阶段与目录中的列名进行比较时才会解析。如果需要更强的类型安全性,则可以创建Dataset

为了完整起见,这里是上述代码尝试的csv:

// csv file:
// colA,colB,colC
// 1,"foo","bar"

val inputDf = spark.read.format("csv").option("header", "true").load(csvFilePath)

// resulting DataFrame schema
root
 |-- colA: string (nullable = true)
 |-- colB: string (nullable = true)
 |-- colC: string (nullable = true)

6
假设我们的父级Dataframe有'n'列,我们可以创建'x'个子DataFrames(在我们的案例中考虑2个)。
子Dataframe的列可以根据需要从任何父Dataframe列中选择。
假设源有10列,我们想将其拆分为包含从父Dataframe引用的列的2个DataFrames。
可以使用select Dataframe API来确定子Dataframe的列。
val parentDF = spark.read.format("csv").load("/path of the CSV file")

val Child1_DF = parentDF.select("col1","col2","col3","col9","col10").show()

val child2_DF = parentDF.select("col5", "col6","col7","col8","col1","col2").show()

请注意,子数据框中的列计数可能长度不同,并且将小于父数据框的列计数。
我们也可以使用父数据框中所需列的位置索引来引用列名,而无需提及实际名称。
首先导入Spark implicits,它作为帮助类,使用$符号访问列时使用位置索引。
import spark.implicits._
import org.apache.spark.sql.functions._

val child3_DF  = parentDF.select("_c0","_c1","_c2","_c8","_c9").show()

我们还可以根据特定条件选择列。比如说,我们只想选择子数据框中的偶数列。所谓的偶数是指索引为偶数的列,索引从“0”开始计数。
val parentColumns = parentDF.columns.toList


res0: List[String] = List(_c0, _c1, _c2, _c3, _c4, _c5, _c6, _c7,_c8,_c9)

val evenParentColumns =  res0.zipWithIndex.filter(_._2 % 2 == 0).map( _._1).toSeq

res1: scala.collection.immutable.Seq[String] = List(_c0, _c2, _c4, _c6,_c8)

现在将这些列提供给从parentDF中选择。请注意,select API需要seq类型的参数。因此,我们将“evenParentColumns”转换为Seq集合。

val child4_DF = parentDF.select(res1.head, res1.tail:_*).show()

这将显示来自父数据框的偶数索引列。


| _c0 | _c2 | _c4 |_c6 |_c8 |


|ITE00100554|TMAX|null| E| 1 |

|TE00100554 |TMIN|null| E| 4 |

|GM000010962|PRCP|null| E| 7 |

现在,我们只剩下数据框中的偶数编号列。

类似地,我们还可以对数据框列应用其他操作,如下所示:

val child5_DF = parentDF.select($"_c0", $"_c8" + 1).show()

有很多方法可以在Dataframe中选择列,如上所述。


请问您能解释一下 parentDF.select(res1.head, res1.tail:_*) 这行代码的作用吗? - Surender Raja

3

我喜欢dehasis的方法,因为它允许我在一个步骤中选择、重命名和转换列。然而,我不得不对其进行调整,以使其在PySpark中正常工作:

from pyspark.sql.functions import col

spark.read.csv(path).select(
      col('_c0').alias("stn").cast('String'),
      col('_c1').alias("wban").cast('String'),
      col('_c2').alias("lat").cast('Double'),
      col('_c3').alias("lon").cast('Double')
    )
      .where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)

1

问题已解决,只需使用select方法选择数据框中的列:

 val df=spark.read.csv("C:\\Users\\Ahmed\\Desktop\\cabs_trajectories\\cabs_trajectories\\green\\2014\\green_tripdata_2014-09.csv")

val df1=df.select("_c0")

这将对数据框的第一列进行子集处理。

0

只需使用 select,您就可以选择特定的列,为它们指定易读的名称并进行类型转换。例如:

spark.read.csv(path).select(
          '_c0.alias("stn").cast(StringType),
          '_c1.alias("wban").cast(StringType),
          '_c2.alias("lat").cast(DoubleType),
          '_c3.alias("lon").cast(DoubleType)
        )
          .where('_c2.isNotNull && '_c3.isNotNull && '_c2 =!= 0.0 && '_c3 =!= 0.0)

Scala中的别名没有起作用。它需要特定的导入吗? - A.HADDAD
@A.HADDAD,你肯定需要 import spark.implicits._。整个代码可以在我的存储库中查看 https://github.com/dehasi/odsc05/blob/master/observatory/src/main/scala/observatory/Extraction.scala#L54 - dehasi

0

您可以使用以下代码根据它们的索引(位置)选择列。您可以更改变量colNos的数字,以仅选择那些列。

import org.apache.spark.sql.functions.col

val colNos = Seq(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35)
val Df_01 = Df.select(colNos_01 map Df.columns map col: _*)
Df_01.show(20, false)

0
问题是在与另一个数据框连接后选择列。我尝试了以下方法并从连接的数据框中选择了salaryDf的列。希望这可以帮到你。
        val empDf=spark.read.option("header","true").csv("/data/tech.txt")

        val salaryDf=spark.read.option("header","true").csv("/data/salary.txt")

        val joinData= empDf.join(salaryDf,empDf.col("first") === salaryDf.col("first") and  empDf.col("last") === salaryDf.col("last"))

      //**below will select the colums of salaryDf only**

     val finalDF=joinData.select(salaryDf.columns map  salaryDf.col:_*)

//same way we can select the columns of empDf
joinData.select(empDf.columns map  empDf.col:_*)

我们可以在Scala中使用joinData= empDf.join(salaryDf,Seq("first","last")),在Python中使用joinData= empDf.join(salaryDf,["first","last"])。希望这有所帮助。 - Suman Banerjee

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接