Java Spark Streaming JSON 解析

3

我开始学习Spark引擎中的spark-streaming,并且对数据分析和Spark非常陌生。我只想创建一个小型的IOT应用程序,以便预测未来的数据。

我有一个Tiva硬件,它发送以下实时传感器JSON数据:

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]

这里的t是Unix时间戳,用于发布数据。sensors是一个传感器数组,每个传感器('s')的数据为'd'。

我的目标是消耗这些数据并创建一个对象,该对象通过spark-streaming,然后将所有数据通过spark的Mlib(机器学习)或等效库来预测未来数据。

我想要一个大致的想法,是否有可能使用所有技术选择实现这一目标

  1. 我已决定使用什么?
  2. 如何消耗嵌套的JSON?我尝试使用SQLContext但没有成功。
  3. 实现我在这里尝试做的事情的一般指导方针。

这是我使用的代码,用于从KAFKA消费消息。

SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

    // TODO: processing pipeline
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
    Set<String> topics = Collections.singleton("RAH");


    JavaPairInputDStream<String, String> directKafkaStream = 
            KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
                    StringDecoder.class, kafkaParams, topics);


    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            System.out.println(message._2());
            return message._2();
        };
    });


    System.out.println(" json is  0------ 0"+ json);



    json.foreachRDD(rdd -> {
        rdd.foreach(
                record -> System.out.println(record));
    });

    ssc.start();
    ssc.awaitTermination(); 

PS:我希望使用Java来实现,以保持线性和良好的性能。


你能发布一下你尝试过的代码吗?可以使用Spark SQL和Streaming实现。 - Shankar
问题中发布的代码。 - Rahul Borkar
当你尝试使用 sqlContext 读取 JSON 字符串时,遇到了什么问题?是 Task not serializable 的问题吗? - Shankar
2个回答

5

如果您正在使用Spark 2.0,则可以从SparkSession中读取JSON。

json.foreachRDD( rdd -> {

      DataFrame df= spark.read.json(rdd)
      //process json with this DF.
}

或者您可以将RDD转换为包含Row的RDD,然后使用createDataFrame方法。

json.foreachRDD( rdd -> {

          DataFrame df= spark.createDataFrame(rdd);
          //process json with this DF.
    }

从DF中处理嵌套JSON是可行的,您可以查看这篇文章。另外,一旦您将json转换为DF,您可以在任何Spark模块中使用它(例如spark sql、ML)。

在我的情况下,我尝试使用的SQLContext构造函数已被弃用。而且我不知道如何使用'JavaSparkContext'获取'sc'(SparkContext)。 - Rahul Borkar
@RahulBorkar:你可以将JavaSparkContext传递给SQLContext(javasparkContext)。 - Shankar
它在Spark 2.11中已弃用。另外,当我尝试运行您的代码时,我遇到了“方法transform(Function <JavaRDD <String>,JavaRDD <Object >>)对于类型JavaDStream <String>来说是不明确的”错误。 - Rahul Borkar
我找不到“DataFrame”类。我需要添加其他库吗?另外,如何获得相同的SparkSession? - Rahul Borkar

2

回答你的问题:

1)我决定使用的所有技术选择都能实现这一点吗?

`Ans: Yes it can be done and quiet a normal use-case for spark.`

2) 我该如何解析嵌套的JSON?我尝试使用SQLContext但没有成功。

`Ans: Nested JSON with SQLContext is little tricky. You may want to use Jackson or some other JSON library.`

3) 实现我在这里尝试做的事情的一般指南。

答: 通过kafka消费消息似乎很好,但是只支持有限的流式机器学习算法。

如果您想使用其他机器学习算法或第三方库,也许您应该将模型创建视为批处理作业,在末尾发出模型。 流处理作业应加载模型并获取数据流进行预测。


你能指导我找到这种用例的正确文档吗?这将非常有帮助。 - Rahul Borkar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接