Spark Streaming UpdateStateByKey

5

我正在运行一个spark streaming,使用updateStateByKey函数保存类似于NetworkWordCount示例中计算的历史数据。

我尝试流式传输一个包含3lac记录的文件,并且每1500条记录后暂停1秒钟。我使用了3个worker。

  1. 随着时间的推移,updateStateByKey不断增长,然后程序抛出以下异常:

ERROR Executor: Exception in task ID 1635 java.lang.ArrayIndexOutOfBoundsException: 3

14/10/23 21:20:43 ERROR TaskSetManager: Task 29170.0:2 failed 1 times; aborting job
14/10/23 21:20:43 ERROR DiskBlockManager: Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/24

14/10/23 21:20:43 ERROR Executor: Exception in task ID 8037
java.io.FileNotFoundException: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/22/shuffle_81_0_1 (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)

如何处理这个问题? 我猜想updateStateByKey应该定期重置,因为它的增长速度很快,请分享一些关于何时以及如何重置updateStateByKey的示例。或者还有其他问题吗?请指点迷津。

任何帮助都将不胜感激。谢谢您的时间。

1个回答

0
你设置了 CheckPoint 吗? ssc.checkpoint("检查点路径")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接