不可能从单个转换中产生多个RDD。如果您想要拆分一个RDD,则必须为每个拆分条件应用一个filter
。例如:
def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
如果您只有一个二进制条件且计算代价很高,您可能更喜欢使用类似于以下方式的方法:
kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()
rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()
这意味着只有一个谓词计算,但需要额外遍历所有数据。
需要注意的是,只要输入RDD被正确缓存且没有其他关于数据分布的假设,重复筛选和嵌套if-else的for循环在时间复杂度上没有显著差异。
对于N个元素和M个条件,您需要执行的操作数量明显与N乘以M成正比。对于for循环,它应该接近(N + MN) / 2,而重复的筛选恰好是NM,但归根结底它无非就是O(NM)。您可以查看我的讨论Jason Lenderman以了解一些优缺点。
在非常高的层面上,您应该考虑两件事:
Spark transformations are lazy, until you execute an action your RDD is not materialized
Why does it matter? Going back to my example:
rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
If later I decide that I need only rdd_odd
then there is no reason to materialize rdd_even
.
If you take a look at your SAS example to compute work.split2
you need to materialize both input data and work.split1
.
RDDs provide a declarative API. When you use filter
or map
it is completely up to Spark engine how this operation is performed. As long as the functions passed to transformations are side effects free it creates multiple possibilities to optimize a whole pipeline.
在一天结束时,这个案例并不特殊到足以证明它自己的转换。
这个带有过滤器模式的地图实际上是在核心Spark中使用的。请参见我的回答
How does Sparks RDD.randomSplit actually split the RDD和
randomSplit
方法的
相关部分。
如果唯一的目标是在输入上实现拆分,则可以对
DataFrameWriter
使用
partitionBy
子句,其文本输出格式:
def makePairs(row: T): (String, String) = ???
data
.map(makePairs).toDF("key", "value")
.write.partitionBy($"key").format("text").save(...)
* Spark中只有3种基本的转换类型:
- RDD[T] => RDD[T]
- RDD[T] => RDD[U]
- (RDD[T], RDD[U]) => RDD[W]
其中T、U、W可以是原子类型或产品/元组(K,V)。任何其他操作都必须使用上述某种组合来表达。您可以查看原始RDD论文以获取更多详细信息。
** https://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman
*** 另请参见Scala Spark:将集合拆分为几个RDD?