我有两个RDD格式,第一个是
我的目的是将RDD A从
如何以有效的方式进行此操作,而不必通过collect() RDD并循环遍历它们呢?
例如:
RDD A中的一项可能是:
输出的RDD应该是:
一个可能的解决方案是:
((provider, currency), value)
,其中关键字为(provider, currency)
,第二个是(provider, value)
,其中关键字为provider
。我的目的是将RDD A从
(provider, value)
格式转换为((provider, currency), value)
。我有一个((provider, currency), value)
RDD——B,我会使用它的关键字。然后,我将使用这些关键字扩展RDD A,使得来自(provider, value)
RDD的每个value
都可以在新的((provider, currency), value)
RDD中为每个currency
重复出现。如何以有效的方式进行此操作,而不必通过collect() RDD并循环遍历它们呢?
例如:
RDD A中的一项可能是:
(1773570, 4135.7998046875)
那么 RDD B 中的一些键将会是:
[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]
输出的RDD应该是:
[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]
一个可能的解决方案是:
def get_keys(rdd):
return rdd.map(lambda item: (item[0])).collect()
def canonicalize_keys(sc, feature, keys):
def transform(item, keys):
return [
((item[0], currency_pair), item[1])
for provider_id, currency_pair in keys
if provider_id == item[0]]
return sc.parallelize(feature
.map(lambda item: transform(item, keys))
.reduce(lambda a, b: a + b))
在这里,我使用 get_keys
从 RDD B 获取密钥,然后使用这些密钥转换 RDD A。问题在于,如果有很多 currency_pairs,JVM 就会出现 OutOfMemoryErrors。