Spark:将数据框写入S3存储桶

3

我正在尝试将DF数据写入S3存储桶。正如预期的那样,它工作得很好。现在我想根据条件将数据写入S3存储桶。

在数据帧中,我有一个名为Flag的列,在该列中值为T和F。现在的条件是如果Flag为F,则应将数据写入S3存储桶,否则不写入。请查看以下详细信息。

DF数据:

1015,2017/08,新潟,101,SW,39,1015,2017/08,山形,101,SW,10,29,74.35897435897436,11.0,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,大分,101,SW,14,25,64.1025641025641,15.4,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,山口,101,SW,6,33,84.61538461538461,6.6,T
1015,2017/08,新潟,101,SW,39,1015,2017/08,愛媛,101,SW,5,34,87.17948717948718,5.5,T
1015,2017/08,新潟,101,SW,39,1015,2017/08,神奈川,101,SW,114,75,192.30769230769232,125.4,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,富山,101,SW,12,27,69.23076923076923,13.2,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,高知,101,SW,3,36,92.3076923076923,3.3,T
1015,2017/08,新潟,101,SW,39,1015,2017/08,岩手,101,SW,11,28,71.7948717948718,12.1,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,三重,101,SW,45,6,15.384615384615385,49.5,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,京都,101,SW,23,16,41.02564102564102,25.3,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,静岡,101,SW,32,7,17.94871794871795,35.2,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,鹿児島,101,SW,18,21,53.84615384615385,19.8,F
1015,2017/08,新潟,101,SW,39,1015,2017/08,福島,101,SW,17,22,56.41025641025641,18.7,F

代码:

val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3a://test_system/transcation.csv")
    df.createOrReplaceTempView("data")
    val res = spark.sql("select count(*) from data")
    res.show(10)
    res.coalesce(1).write.format("csv").option("header","true").mode("Overwrite")
     .save("s3a://test_system/Output/Test_Result")
     res.createOrReplaceTempView("res1")
     val res2 = spark.sql("select distinct flag from res1 where flag = 'F'")
     if (res2 ==='F')
     {
     //writing to s3 bucket as raw data .Here transcation.csv file.
     df.write.format("csv").option("header","true").mode("Overwrite")
     .save("s3a://test_system/Output/Test_Result/rawdata")
     }

我正在尝试这种方法,但无法将df数据导出到s3存储桶。如何使用条件将数据导出/写入S3存储桶?
非常感谢您的帮助。

res2是一个数据框,这就是你想要写入S3的内容。 - sramalingam24
1
不。我想根据Flag列的条件编写df数据。我必须将该列条件放在Flag == F,然后它将写入df.write.....否则它不会写入。 - Raghu kanala
你的res1表只是一个计数表,它是在代码第3行创建的。 - user238607
你是想要保存只有标记为“F”的记录,还是只是想查看整个数据集中是否包含“F”值的标记? - maogautam
1个回答

5

我假设您想在数据框中写入包含“F”标志的数据。

val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("s3a://test_system/transcation.csv")
df.createOrReplaceTempView("data")
val res = spark.sql("select count(*) from data")
res.show(10)
res.coalesce(1).write.format("csv").option("header","true").mode("Overwrite")
  .save("s3a://test_system/Output/Test_Result")
res.createOrReplaceTempView("res1")

这里我们使用了data表格,因为res1表只是一个计数表格,而且从结果数据框中,我们使用first()函数选择了第一行,并从该行中使用getAs[String](0)选择第一列。

val res2 = spark.sql("select distinct flag from data where flag = 'F'").first().getAs[String](0)

println("Printing out res2 = " + res2)

在这里,我们正在比较上面提取的字符串和字符串"F"。请记住"F"是一个字符串,而'F'是Scala中的字符。

if (res2.equals("F"))
{
  println("Inside the if loop")
  //writing to s3 bucket as raw data .Here transcation.csv file.
  df.write.format("csv").option("header","true").mode("Overwrite")
    .save("s3a://test_system/Output/Test_Result/rawdata")
}

3
因为在每次写操作时都要进行查询,使用s3的覆盖写功能会使操作变得非常耗费资源,建议避免使用。 - Eric Meadows

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接