Spark Streaming 动态查找表实现

10

我目前正在研究使用Spark Streaming来接收类似于日志文件的条目,并出于统计目的对它们进行一些计算。

现在可以从HBase和Hive访问保存在HDFS上的数据集,其中需要查找一些数据并对其进行转换,例如IP和机器名称以及机器所有者之间的映射。

预计Spark应用程序将在我们的集群上每天运行数周而不需要重新启动。 然而,这些参考表格每隔几小时更新一次。

如果使用的数据略旧,则可以接受,但数据两周以上就不可以了。 因此,我想知道如何在我的map和reduce阶段中查找数据以进行转换和增强。 我有一些想法。

  1. 广播变量可以读取数据集并有效地传递它。 但是,一旦设置了广播变量,就无法更改它,而在驱动程序类中再次获取数据,取消持久化并广播新的数据集将无效,因为工作节点的指针都指向旧的数据集。 我不知道是否有办法解决这个问题。

  2. HBase get()查询可以进行。 如果根据查找的键将数据定向到reducer,则每个reducer可以拥有整个数据集的子集缓存,并且可以拥有自己的本地缓存。 HBase在获取单个记录时应具有最小延迟。

  3. 还有其他方法吗?

1个回答

3

这里有两个选项。

第一个选项是在DStream上使用foreachRDD转换。 foreachRDD在驱动程序端执行,这意味着你可以在那里创建任何新的RDD。你可以存储时间计数器并每10-15分钟从HDFS重新读取文件。

第二种方法是在DStream上使用transform转换读取某个文件,并将其结果保存在内存中。使用这种方法,你必须让每个执行程序都读取整个查找表,这是不高效的。

我建议你使用第一种方法。为了更加精确,你可以在某处存储数据最后更新的标志,并将其存储在Spark应用程序中。在每次迭代中,你检查此标志的值(例如,在HBase或Zookeeper中存储),并将其与本地存储的值进行比较 - 如果它不同,则重新读取查找表;否则,使用旧的进行操作。


我有一个相关的问题。我的查找表大约有200万行,是静态的。键是一个大约100个字符的字符串,值是一个大约10个字符的字符串。现在我将这些数据存储在一个索引的mongo db集合中,并在转换步骤中进行查找。我批量调用,所以每次转换只产生一次命中,但仍然需要进行网络调用。将这么大的查找表作为Spark广播变量是否有意义? - Sean Glover
1
每个记录 110 字节的 200 万条数据仅占用 220MB 的空间 - 对于广播变量来说并不多。每个硬件节点配备一个执行器应该可以确保最小化需要在集群中存储这些 220MB 数据的副本数量。如果这些数据是静态的,则可以在处理开始时将其加载到内存中,以便稍后使用。我不建议你使用像 MongoDB 这样的集中式东西,因为随着集群的扩大,它将成为你的瓶颈。如果数据完全静态,您可以考虑将数据存储在每个节点上的文件中或在每个节点上的本地存储中(例如 redis)。 - 0x0FFF

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接