如何更新RDD?

21
我们正在开发Spark框架,将历史数据转移到RDD集合中。
基本上,RDD是不可变的、只读的数据集,在其上进行操作。基于此,我们已经将历史数据移入RDD,并对这些RDD进行过滤/映射等计算操作。
现在有一个用例,其中RDD中的一部分数据得到更新,我们必须重新计算值。
HistoricalData以RDD形式存在。我根据请求范围创建另一个RDD,并将该RDD的引用保存在ScopeCollection中。
到目前为止,我能够考虑以下方法: 方法1:广播更改:
  • 对于每个更改请求,我的服务器获取特定范围的RDD并生成作业。
  • 在作业中,在RDD上应用映射阶段 -

    2.a.对于RDD中的每个节点,在广播上执行查找并创建一个新值,该新值现在已更新,从而创建一个新的RDD。
    2.b.现在我再次在此新RDD(步骤2.a)上执行所有计算,如乘法、缩减等。
    2.c.将此RDD引用保存回我的ScopeCollection
方法2:为更新创建一个RDD:
  • 对于每个更改请求,我的服务器获取特定范围的RDD并生成作业。
  • 在每个RDD上,与具有更改的新RDD进行连接。
  • 现在我再次在此新RDD(步骤2)上执行所有计算,如乘法、缩减等
方法3: 我曾想过创建流式RDD,其中我将不断更新同一RDD并进行重新计算。但是据我所了解,它可以从Flume或Kafka中获取流。而在我的情况下,值基于用户交互在应用程序本身中生成。因此,我无法看到流式RDD在我的上下文中的任何集成点。
对于这种情况,有哪种方法更好或其他适合的方法吗?
谢谢!
2个回答

9
这里介绍的用例非常适合Spark Streaming。另外两个选项需要考虑一个问题:“如何提交RDD的重新计算?”Spark Streaming提供了一个框架,可以基于流式数据源不断地向Spark提交任务并将数据以RDD形式保存。Kafka和Flume只是其中两种可能的流式数据源。你可以使用SocketInputDStream进行套接字通信,使用FileInputDStream读取目录中的文件,或者甚至使用QueueInputDStream共享队列。如果这些选项都不适合你的应用程序,你可以编写自己的InputDStream
在这个使用案例中,使用Spark Streaming,您将读取基本RDD并使用传入的dstream逐步转换现有数据并维护不断发展的内存状态。 dstream.transform将允许您将基本RDD与给定批处理间隔期间收集的数据组合在一起,而updateStateByKey操作可以帮助您构建由键寻址的内存状态。有关更多信息,请参见文档
如果没有关于应用程序的更多详细信息,则很难在代码级别上了解使用Spark Streaming可能实现的内容。我建议您探索这条路,并为任何特定主题提出新问题。

谢谢@maasg。关于你的问题“如何提交重新计算”,我已经编辑了我的问题。 - user1441849
你尝试过原型设计这些想法吗?用一些代码会更清晰。例如,不确定你计划如何实现“我保存这个RDD的引用”,以及这将如何帮助。否则,你还需要进行缓存,否则RDD将从头开始重新计算。另一个选择是研究Spark Job Server:https://github.com/spark-jobserver/spark-jobserver - maasg

1
我建议您查看IndexedRDD implementation,它提供了可更新的键值对RDD。这可能会给您一些启示。
这个想法基于键的知识,这使您可以将要更新的数据块与已创建的RDD的相同键进行压缩。在更新期间,可以过滤掉先前版本的数据。
拥有历史数据,我认为您必须拥有某种事件的身份标识。
关于流式处理和消费,可以使用TCP端口。这样,驱动程序可能会打开一个TCP连接,Spark希望从中读取并发送更新。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接