Spark:使用Scala从S3读取CSV文件

4

我正在编写一个Spark作业,尝试使用Scala读取文本文件,在我的本地机器上,以下代码可以正常工作。

  val myFile = "myLocalPath/myFile.csv"
  for (line <- Source.fromFile(myFile).getLines()) {
    val data = line.split(",")
    myHashMap.put(data(0), data(1).toDouble)
  }

然后我尝试在 AWS 上让它工作,我做了以下操作,但似乎没有正确地读取整个文件。在 s3 上读取这样的文本文件的正确方式是什么?非常感谢!

val credentials = new BasicAWSCredentials("myKey", "mySecretKey");
val s3Client = new AmazonS3Client(credentials);
val s3Object = s3Client.getObject(new GetObjectRequest("myBucket", "myFile.csv"));

val reader = new BufferedReader(new InputStreamReader(s3Object.getObjectContent()));

var line = ""
while ((line = reader.readLine()) != null) {
      val data = line.split(",")
      myHashMap.put(data(0), data(1).toDouble)
      println(line);
}
3个回答

1
我认为它的工作方式如下:

    val s3Object= s3Client.getObject(new GetObjectRequest("myBucket", "myPath/myFile.csv"));

    val myData = Source.fromInputStream(s3Object.getObjectContent()).getLines()
    for (line <- myData) {
        val data = line.split(",")
        myMap.put(data(0), data(1).toDouble)
    }

    println(" my map : " + myMap.toString())

0

即使不导入amazons3库,也可以通过SparkContext textfile实现。使用以下代码:

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
val s3Login = "s3://AccessKey:Securitykey@Externalbucket"
val filePath = s3Login + "/Myfolder/myscv.csv"
for (line <- sc.textFile(filePath).collect())
{
    var data = line.split(",")
    var value1 = data(0)
    var value2 = data(1).toDouble
}

在上面的代码中,sc.textFile将从您的文件中读取数据并存储在line RDD中。然后它使用,将每行拆分到循环内的不同RDD data中。然后,您可以使用索引从此RDD访问值。

这段代码返回错误信息 "java.io.IOException: No FileSystem for scheme: s3"。 - Mitaksh Gupta
你能解释一下答案吗?我也遇到了一个java.io.FileNotFoundException的错误。 - ibaralf
Mitkash和ibaalf - 请分享你们的代码,以便我调试。可能会有一些打字错误,因为这对我来说完全正常。 - Sarath Subramanian

0

使用sc.textFile("s3://myBucket/myFile.csv")读入csv文件。这将为您提供一个RDD[String]。将其放入map中

val myHashMap = data.collect
                    .map(line => {
                      val substrings = line.split(" ")
                      (substrings(0), substrings(1).toDouble)})
                    .toMap

您可以使用sc.broadcast来广播您的映射,以便在所有工作节点上都可以轻松访问。

(请注意,如果您喜欢,当然也可以使用Databricks的“spark-csv”包来读取csv文件。)


我的实用函数需要 myHashMap。所以我的代码是这样的:output = input.map { t => myUtiltyFunction(myHashMap, t)} 是否有可能避免每次传递 myHashMap 给 myUtitlityFunction?有没有一种方法可以广播 myHashMap 并让 myUtitlityFunction 直接知道它?非常感谢! - Edamame
另外,我不想使用sc.textFile("s3://myBucket/myFile.csv"),因为我希望使代码通用,即使没有Spark上下文也可以使用。谢谢。 - Edamame
你应该意识到,如果你让你的实用函数直接读取地图,并像你描述的那样使用实用函数 output = input.map { t => myUtiltyFunction(...)},那么地图将会被读取和创建每一个输入rdd的行。我真的不认为你想要这样做。另一方面,如果你广播变量(使用sc.broadcast),你只需要在驱动程序上读取和创建一次地图,然后所有的工作节点都可以直接访问它。为什么你不想把地图传递给实用函数呢?这对我来说很奇怪。 - Glennie Helles Sindholt
你确定地图是按照输入RDD的每一行创建的,而不是按照任务创建的吗?我不想传递HashMap的原因是:1. 代码的清晰度。2. 我希望相同的代码可以在其他读取输入数据到实用程序函数的场景中使用。 - Edamame
1
如果你使用 map 而不是 mapPartition,它会将实用函数应用于每一行,因此如果你的实用函数负责创建映射,则会为每一行执行一次。如果你使用 mapPartitions,它将仅为每个分区创建一次映射,但(当然取决于你的数据大小)这仍然可能会增加显着的开销(io 永远不便宜)。在我看来,你应该专注于编写最适合并行处理(Spark)的代码,并少关注代码的其他微不足道的(非并行)用途。 - Glennie Helles Sindholt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接