如何在Spark中根据日期时间值筛选数据集

11
我试图根据数据的日期时间字段进行过滤。这是我的一份数据样本:
303,0.00001747,4351040,75.9054,"2019-03-08 19:29:18"

这是我如何初始化Spark:

    SparkConf conf = new SparkConf().setAppName("app name").setMaster("spark://192.168.1.124:7077");
    JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));

首先,我将上述数据读入到我的自定义对象中,如下所示:

    // Read data from file into custom object
    JavaRDD<CurrencyPair> rdd = sc.textFile(System.getProperty("user.dir") + "/data/data.csv", 2).map(
        new Function<String, CurrencyPair>() {
            public CurrencyPair call(String line) throws Exception {
                String[] fields = line.split(","); // Split line from commas

                // read each data into custom object
                CurrencyPair cp = new CurrencyPair();
                cp.setId(Integer.parseInt(fields[0].trim()));
                cp.setValue(Double.parseDouble(fields[1].trim()));
                cp.setBaseVolume(Double.parseDouble(fields[2].trim()));
                cp.setQuoteVolume(Double.parseDouble(fields[3].trim()));
                cp.setTimeStamp(new Date(fields[4].trim()));

                System.out.println("Date:" + fields[4].trim()); // To see if it will print or not

                return cp;
            }
        }
     );

为了获取时间戳比某个时间晚的数据,我编写了以下过滤器:

    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.DAY_OF_MONTH, -10); // This is for test issue

    // My filter to get the data for a certain time range
    Function<CurrencyPair, Boolean> filter = new Function<CurrencyPair, Boolean>() {
        @Override
        public Boolean call(CurrencyPair currencyPair) throws Exception {
            if(calendar.getTime().compareTo(currencyPair.getTimeStamp()) > 0){
                return false;
            }else{
                return true;
            }
        }
    };

这是我的自定义对象的外观:

public class CurrencyPair implements java.io.Serializable {

    private int id;
    private double value;
    private double baseVolume;
    private double quoteVolume;
    private Date timeStamp;

    // all getters and setters are here, but no constructor
}

为了检查我的过滤器的结果,我尝试查看其中一些结果(这里是前100个):


Iterator<CurrencyPair> result = rdd.repartition(100).filter(filter).toLocalIterator();
int counter = 0;
while (counter < 100 && result.hasNext()){
    System.out.println("Here: " + result.next());
    counter++;
}

但问题在于,当我运行我的代码时,在写出前100个结果的那一行(这里:System.out.println("Here: " + result.next());)出现了以下异常:

错误:

19/05/12 00:05:47 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on 192.168.1.124, executor 0: java.lang.ClassCastException (cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD

在我的筛选器中,我使用System.out.println将日期时间字符串写入控制台,但是我在控制台中看不到它的输出结果。我做错了什么?我该如何实现这一点?


编辑:我注意到我实际上下载了2.3.0版本的Spark,但在我的maven文件中,我使用的是2.4.2版本。因此,我将我的maven文件更改为2.3.0版本。

这次我收到以下错误:

19/05/14 00:35:35 INFO BlockManager: BlockManager stopped
19/05/14 00:35:35 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/14 00:35:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/14 00:35:36 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)
19/05/14 00:35:36 INFO SparkContext: SparkContext already stopped.
19/05/14 00:35:36 INFO SparkContext: Successfully stopped SparkContext
java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystem
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:516)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2520)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at spark.BasicSpark.readDataFile(BasicSpark.java:107)
at spark.BasicSpark.getWholeData(BasicSpark.java:39)
at controller.TableScreenController$2.handle(TableScreenController.java:66)
at controller.TableScreenController$2.handle(TableScreenController.java:62)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:86)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:49)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Node.fireEvent(Node.java:8411)
at javafx.scene.control.Button.fire(Button.java:185)
at com.sun.javafx.scene.control.behavior.ButtonBehavior.mouseReleased(ButtonBehavior.java:182)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:96)
at com.sun.javafx.scene.control.skin.BehaviorSkinBase$1.handle(BehaviorSkinBase.java:89)
at com.sun.javafx.event.CompositeEventHandler$NormalEventHandlerRecord.handleBubblingEvent(CompositeEventHandler.java:218)
at com.sun.javafx.event.CompositeEventHandler.dispatchBubblingEvent(CompositeEventHandler.java:80)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:238)
at com.sun.javafx.event.EventHandlerManager.dispatchBubblingEvent(EventHandlerManager.java:191)
at com.sun.javafx.event.CompositeEventDispatcher.dispatchBubblingEvent(CompositeEventDispatcher.java:59)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:58)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.BasicEventDispatcher.dispatchEvent(BasicEventDispatcher.java:56)
at com.sun.javafx.event.EventDispatchChainImpl.dispatchEvent(EventDispatchChainImpl.java:114)
at com.sun.javafx.event.EventUtil.fireEventImpl(EventUtil.java:74)
at com.sun.javafx.event.EventUtil.fireEvent(EventUtil.java:54)
at javafx.event.Event.fireEvent(Event.java:198)
at javafx.scene.Scene$MouseHandler.process(Scene.java:3757)
at javafx.scene.Scene$MouseHandler.access$1500(Scene.java:3485)
at javafx.scene.Scene.impl_processMouseEvent(Scene.java:1762)
at javafx.scene.Scene$ScenePeerListener.mouseEvent(Scene.java:2494)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:394)
at com.sun.javafx.tk.quantum.GlassViewEventHandler$MouseEventNotification.run(GlassViewEventHandler.java:295)
at java.security.AccessController.doPrivileged(Native Method)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.lambda$handleMouseEvent$350(GlassViewEventHandler.java:432)
at com.sun.javafx.tk.quantum.QuantumToolkit.runWithoutRenderLock(QuantumToolkit.java:389)
at com.sun.javafx.tk.quantum.GlassViewEventHandler.handleMouseEvent(GlassViewEventHandler.java:431)
at com.sun.glass.ui.View.handleMouseEvent(View.java:555)
at com.sun.glass.ui.View.notifyMouse(View.java:937)
at com.sun.glass.ui.gtk.GtkApplication._runLoop(Native Method)
at com.sun.glass.ui.gtk.GtkApplication.lambda$null$208(GtkApplication.java:245)
at java.lang.Thread.run(Thread.java:748)

我在初始化Spark上下文的代码行处遇到了以下错误:

JavaSparkContext sc = JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(conf));

编辑2:当我使用local而不是我的自己的spark主节点IP时,它运行得很好。但是我需要在自己的IP上运行它。那么我的主节点可能出了什么问题呢?


编辑3:我已将整个错误堆栈上传到第一次编辑下方的代码片段中。


问题可能不在打印语句上,Spark操作是惰性求值的,这意味着它们直到请求数据时才会被执行(因此只有在那时您才会得到运行时异常)。您能否发布使用筛选器的完整代码? - matanper
@matanper,我已经在这里发布了这部分内容:"textFile.repartition(100).filter(filter).toLocalIterator();"。除此以外,我没有写任何代码片段。 - JollyRoger
@matanper,顺便感谢您的评论。我注意到我忘记了初始化Spark的部分,所以我编辑了问题。我的Maven文件中的版本是错误的,所以我进行了更改,但这次出现了另一个错误。我把所有内容都添加到了问题中。 - JollyRoger
看起来 Spark 没有运行或 URL 错误,请尝试使用 Telnet 查看 Spark 是否在此地址上侦听。 - matanper
当我尝试使用命令 telnet 192.168.1.124 7077 时,我得到了以下输出:Trying 192.168.1.124... Connected to 192.168.1.124. Escape character is '^]'。 - JollyRoger
显示剩余7条评论
1个回答

2
这种情况发生在运行时未找到定义lambda表达式的类。如果您正在尝试从本地IDE运行远程集群上的作业,则需要添加setJars。"Original Answer"翻译成"最初的答案"。
SparkConf conf = new SparkConf()
                .setAppName("app name")
                .setJars(new String[]{"/fatJarPath/jar.path"})
                .setMaster("spark://Remote_spark_ip:port");

或者您可以构建fat jar并使用spark-submit提交作业。

注:fat jar是包含所有依赖项的可执行jar文件。


1
谢谢你的回答。实际上我之前尝试了你说的方法。但是这一次,除了我在其上运行IDE的主节点之外的节点都出现了文件未找到的错误。因此,我将JAR文件添加到与主节点相同的目录中,以便所有从节点都可以使用。然后我解决了这个错误,但是这一次它开始报错:Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD - JollyRoger
1
当我尝试从RDD中收集数据时(针对我上面分享的第二个代码片段),我遇到了上述错误。问题可能出在我构建JAR文件的方式上。我只是使用了IntelliJ的构建工件功能,也许我需要使用Maven来完成这个过程。你有什么建议吗? - JollyRoger
1
这个stackoverflow的答案应该能帮助你构建fat jar(uber jar)https://dev59.com/io7ea4cB1Zd3GeqPGNGt - m-bhole
1
我尝试了,但不幸的是我得到了我之前提到的同样的错误。 - JollyRoger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接