如何在pyspark中高效地向RDD添加新键

3
我有两个RDD格式,第一个是((provider, currency), value),其中关键字为(provider, currency),第二个是(provider, value),其中关键字为provider
我的目的是将RDD A(provider, value)格式转换为((provider, currency), value)。我有一个((provider, currency), value) RDD——B,我会使用它的关键字。然后,我将使用这些关键字扩展RDD A,使得来自(provider, value) RDD的每个value都可以在新的((provider, currency), value) RDD中为每个currency重复出现。
如何以有效的方式进行此操作,而不必通过collect() RDD并循环遍历它们呢?
例如:
RDD A中的一项可能是:
(1773570, 4135.7998046875)

那么 RDD B 中的一些键将会是:

[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]

输出的RDD应该是:
[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]

一个可能的解决方案是:
def get_keys(rdd):
    return rdd.map(lambda item: (item[0])).collect()

def canonicalize_keys(sc, feature, keys):
    def transform(item, keys):
        return [
            ((item[0], currency_pair), item[1])
                for provider_id, currency_pair in keys
                    if provider_id == item[0]]
    return sc.parallelize(feature
        .map(lambda item: transform(item, keys))
        .reduce(lambda a, b: a + b))

在这里,我使用 get_keys 从 RDD B 获取密钥,然后使用这些密钥转换 RDD A。问题在于,如果有很多 currency_pairs,JVM 就会出现 OutOfMemoryErrors。


我不确定我完全理解了,您想要有效的提供者->货币对来自第一个RDD的(提供者、货币)键,然后使用这个键展开第二个RDD,以便每个(提供者、价值)实际上对应于每个货币的实例重复一次? - mattinbits
是的,没错。我会编辑帖子使其更加清晰明确。 - George Lydakis
还不是很清楚。你能提供一个输入和期望输出的例子吗? - eliasah
我编辑了帖子并添加了一个例子。 - George Lydakis
更新了一篇帖子,提供了一个可能的解决方案,但在使用更大的数据集时存在内存问题。 - George Lydakis
1个回答

4
尝试这个: 给定Ardd = RDD[(provider, value)]Brdd = RDD[((provider, currency), value)],你想要做的是将ArddBrdd连接起来,使得newRDD的形式为RDD[((provider, currency), value)],其中value指的是从Ardd中找到的值。
为了实现这一点,我们需要这样做: 一行代码解决方案:
newRDD = Ardd.join(Brdd.map(lambda x: x[0])).map(lambda x: ((x[0], x[1][1]), x[1][0]))

带解释的步骤:

  1. Brdd获取密钥:Brdd_keys = Brdd.map(lambda x: x[0])。输出格式为:RDD[(提供者, 货币)]

  2. ArddBrdd_keys连接起来:AB = Ardd.join(Brdd_keys)。输出格式为:RDD[(提供者, (数值, 货币))]

  3. 映射到最终形式:newRDD = AB.map(lambda x: ((x[0], x[1][1]), x[1][0]))。现在的输出格式为:RDD[((提供者, 货币), 数值)]


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接