20得票2回答
如何同时刷新表格?

我正在使用Spark Streaming 2.1。我想定期刷新一些缓存的表(由Spark提供的数据源,例如parquet、MySQL或用户定义的数据源)。 如何刷新表格? 假设我有一些表格是通过以下方式加载的: spark.read.format("").load().createTe...

20得票1回答
在yarn-cluster模式下,如果设置了num-executors,Spark Kafka Direct DStream需要多少个执行器和RDD分区?

我正在尝试使用Spark Kafka直接流方法。根据文档,它通过创建与kafka主题分区数量相同的RDD分区来简化并行处理。根据我的理解,Spark将为每个RDD分区创建一个执行器来执行计算。 因此,当我以yarn-cluster模式提交应用程序,并将选项num-executors指定为不...

19得票3回答
当数据源运行完毕时如何停止Spark Streaming

我有一个Spark流任务,每5秒从Kafka读取数据,对传入的数据进行一些转换,然后写入文件系统。 这个任务实际上不需要是流式任务,而且我只想每天运行一次它,以便将消息排入文件系统。但我不确定如何停止它。 如果我将超时时间传递给streamingContext.awaitTerminati...

18得票3回答
创建Kafka流时出现了AbstractMethodError错误

我正在尝试使用createDirectStream方法打开Kafka(尝试版本0.11.0.2和1.0.1)的流,并收到此AbstractMethodError错误: Translated: 我正在尝试使用createDirectStream方法打开Kafka流(尝试版本0.11.0.2和1...

18得票2回答
如何从迭代器创建Spark RDD?

为了让问题更加清晰明了,我并不是在寻找像数组/列表一样的RDD。List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7); // sample JavaRDD<Integer> rdd = new JavaSpark...

18得票5回答
如何在spark-submit命令中指定要使用的Java版本?

我希望在远程服务器上的yarn集群上运行一个Spark流应用程序。服务器上默认的Java版本是1.7,但我的应用程序需要使用1.8,而且1.8也已经安装在服务器上,但不是默认版本。是否有一种方法可以通过spark-submit指定Java 1.8的位置,以便避免出现主要次要错误?

18得票1回答
使用Kafka的Spark流处理 - createDirectStream与createStream的区别

我们之前一直在使用Spark Streaming和Kafka,使用的是KafkaUtils中的createStream方法。最近开始尝试使用createDirectStream方法,并且喜欢它的两个优点: 1) 更好/更容易实现“确切一次”语义 2) 更好地将kafka主题分区与rdd分区...

17得票7回答
Java.lang.NoClassDefFoundError: org/apache/spark/streaming/twitter/TwitterUtils$ 在运行 TwitterPopularTags 时发生。

我是一个Spark Streaming和Scala的初学者。由于项目需求,我试图在Github上运行TwitterPopularTags示例。由于SBT汇编对我不起作用,而且我不熟悉SBT,因此我正在尝试使用Maven进行构建。经过许多初始问题,我能够创建jar文件。但是,在尝试执行它时,我遇...

17得票1回答
Kafka主题分区到Spark流处理

我有一些使用案例需要更加明确,关于Kafka主题分区 -> Spark流资源利用。 我使用Spark独立模式,因此我只有"执行器总数"和"执行器内存"这两个设置。据我所知并根据文档,在Spark流中引入并行性的方法是使用分区的Kafka主题 -> RDD将具有与Kafka相同数量...

17得票2回答
SBT测试错误: java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream

当我在Windows上使用Scalatest为我的Spark Streaming代码执行单元测试时,使用以下命令会出现异常: sbt testOnly <<ClassName>> * * * * * * 2018-06-18 02...