我正在尝试设置Spark Streaming来获取Kafka队列中的消息。我遇到了以下错误: py4j.protocol.Py4JJavaError: An error occurred while calling o30.createDirectStream. : org.apache.s...
我正在尝试在Scala IDE中执行一个基本的Spark Streaming示例,但是出现以下错误:Error: Could not find or load main class org.test.spark.streamExample. 有人可以帮我解决这个问题吗?
有没有一种方法可以使用模式将来自kafka的avro消息与spark转换为dataframe?用户记录的模式文件: { "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName",...
我是Spark的新手,正在运行我的应用程序,从文本文件读取14KB的数据,执行一些转换和操作(collect,collectAsMap),并将数据保存到数据库中。我在我的MacBook上本地运行,拥有16G的内存和8个逻辑核心。Java最大堆设置为12G。这是我用来运行应用程序的命令。bin/...
我正在尝试了解如何使Spark Streaming应用程序更具容错性(特别是在尝试写入下游依赖项时),但我不知道处理尝试将结果写入外部源(如Cassandra、DynamoDB等)失败的最佳方法。 例如,我有一个Spark Streaming作业,从流(Kafka、Flume等)中提取数据,...
我正在使用 Kafka 0.8.2 接收AdExchange的数据,然后使用 Spark Streaming 1.4.1 将数据存储到 MongoDB。 我的问题是,当我重新启动 Spark Streaming 作业(例如更新新版本、修复错误、添加新功能等)时,它将继续读取此时最新的 kaf...
我一直在尝试在线寻找资料,两种资料都是基于微批次的,那么它们有什么区别呢?
我使用Spark Streaming从Twitter接收推文。 我经常会收到许多警告,内容如下: replicated to only 0 peer(s) instead of 1 peers 这个警告是什么意思? 我的代码如下: SparkConf conf = new SparkC...
我有一个Spark流处理上下文,在10秒的间隔内从kafka读取事件数据。我想要将这些事件数据与已存在的PostgreSQL表中的数据进行补充。 我可以使用以下代码加载PostgreSQL表格: val sqlContext = new SQLContext(sc) val data = ...
我正在尝试使用Kafka(版本1.1.0)和Spark Streaming,但是由于以下错误,Spark作业一直崩溃:14/11/21 12:39:23 ERROR TaskSetManager: Task 3967.0:0 failed 4 times; aborting job org.a...