如何在PySpark中检查空的RDD

3

tweetStream.foreachRDD((rdd, time) => {
  val count = rdd.count()
  if (count > 0) {
    var fileName =  outputDirectory + "/tweets_" + time.milliseconds.toString    
    val outputRDD = rdd.repartition(partitionsEachInterval) 
    outputRDD.saveAsTextFile(fileName) 
}

我正在尝试以Python的方式检查流数据中的计数值或空RDD,但很难找到方法,我还尝试了下面链接中的示例。 http://spark.apache.org/docs/latest/streaming-programming-guide.html
3个回答

5

RDD.isEmpty:

若 RDD 不包含任何元素,则返回 true。


sc.range(0, 0).isEmpty()

True

sc.range(0, 1).isEmpty()

False

1

正如用户6910411建议的那样,您可以简单地使用{{link1:RDD.isEmpty}}:

df.rdd.isEmpty()

它返回布尔值。

0
尝试使用以下代码片段。
def process_rdd(rdd):
    print rdd.count()
    print("$$$$$$$$$$$$$$$$$$$$$$")
    streamrdd_to_df(rdd)

def empty_rdd():
    print "###The current RDD is empty. Wait for the next complete RDD ###"

clean.foreachRDD(lambda rdd: empty_rdd() if rdd.count() == 0 else process_rdd(rdd))

请格式化您的答案,以提高质量...特别是添加代码块。def fibonacci(n): if n <= 1: return n else: return (fibonacci(n-1) + fibonacci(n-2)) for i in range(10): print(fibonacci(i))以上是一个简单的 Python 代码,用于生成斐波那契数列。该代码使用递归函数来计算每个数字。在主程序中,我们使用 for 循环来打印前十个斐波那契数。 - nakashu
这里的“clean”是什么意思? - Itération 122442

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接