Spark:读取InputStream而不是文件

16

我在Java应用程序中使用SparkSQL处理CSV文件,使用Databricks进行解析。

我处理的数据来自不同的来源(远程URL、本地文件、Google Cloud Storage),我习惯将所有东西转换为InputStream,这样无需知道数据来源就可以解析和处理数据。

我看到的所有关于Spark的文档都是从路径读取文件,例如

SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);

DataFrame df = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("path/to/file.csv");

DataFrame dfGrouped = df.groupBy("varA","varB")
    .avg("varC","varD");

dfGrouped.show();

我想做的是从InputStream中读取,或者只是从已经在内存中的字符串中读取。类似以下内容:

InputStream stream = new URL(
    "http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
    ).openStream();

DataFrame dfRemote = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load(stream);

String someString = "imagine,some,csv,data,here";

DataFrame dfFromString = sqlc.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .read(someString);

这里有什么简单的地方我忽略了吗?

我已经阅读了一些关于Spark Streaming和自定义接收器的文档,但据我所知,这是为了打开一个将持续提供数据的连接。 Spark Streaming似乎会将数据分成块并对其进行处理,期望更多的数据以不间断的流形式进入。

我最好的猜测是,作为Hadoop的后代,Spark期望大量的数据可能存储在某个文件系统中。但由于Spark无论如何都在内存中进行处理,因此我认为SparkSQL应该能够解析已经在内存中的数据。

任何帮助都将不胜感激。

1个回答

4
您可以使用至少四种不同的方法来使自己的生活更加轻松:
  1. 使用输入流,将数据写入本地文件(使用SSD加速),然后再用Spark读取文件。

  2. 使用Hadoop文件系统连接器连接S3、Google Cloud Storage,并将所有操作转换为文件操作。(这种方法不能解决从任意URL读取的问题,因为HDFS没有对应的连接器。)

  3. 将不同类型的输入表示为不同的URI,并创建一个实用函数,检查URI并触发相应的读取操作。

  4. 与(3)相同,但使用用例类代替URI,并根据输入类型进行重载。


1
我实际上正在尝试第五种选择,即将返回值序列化为JavaRDD,然后将RDD转换为DataFrame。感谢您的帮助。 - Nate Vaughan
你介意帮我理解一下你是如何将字符串转换为RDD,然后再转换为Dataframe的吗?在我的情况下,输入流是一个带有标题的CSV文件(可以将其转换为字符串)。我需要提供模式吗?还是我可以使用.option("header", "true")来自动检测模式? - Ponns

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接