我开始学习Spark引擎中的spark-streaming,并且对数据分析和Spark非常陌生。我只想创建一个小型的IOT应用程序,以便预测未来的数据。
我有一个Tiva硬件,它发送以下实时传感器JSON数据:
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
这里的t是Unix时间戳,用于发布数据。sensors是一个传感器数组,每个传感器('s')的数据为'd'。
我的目标是消耗这些数据并创建一个对象,该对象通过spark-streaming,然后将所有数据通过spark的Mlib(机器学习)或等效库来预测未来数据。
我想要一个大致的想法,是否有可能使用所有技术选择实现这一目标
- 我已决定使用什么?
- 如何消耗嵌套的JSON?我尝试使用SQLContext但没有成功。
- 实现我在这里尝试做的事情的一般指导方针。
这是我使用的代码,用于从KAFKA消费消息。
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
Set<String> topics = Collections.singleton("RAH");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topics);
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
System.out.println(message._2());
return message._2();
};
});
System.out.println(" json is 0------ 0"+ json);
json.foreachRDD(rdd -> {
rdd.foreach(
record -> System.out.println(record));
});
ssc.start();
ssc.awaitTermination();
PS:我希望使用Java来实现,以保持线性和良好的性能。
sqlContext
读取 JSON 字符串时,遇到了什么问题?是Task not serializable
的问题吗? - Shankar