Scala 转 Pyspark

3
我正在尝试在Dstream和静态RDD之间执行连接。 PySpark
  #Create static data
    ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_rdd_broadcast = sc.broadcast(ip_classification_rdd)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_rdd[log_name]))

我得到了这个异常:“看起来您正在尝试广播一个RDD或从一个RDD引用”
然而,有人在这里提出了同样的需求:如何将DStream与非流文件连接? 这是解决方案:
val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}

在Pyspark中相应的等价物是什么?
1个回答

0

你的代码需要做一些更改:

  • 不能向广播一个RDD,而是要在底层“数据”上进行广播:
  • 然后使用value()方法在闭包内获取广播变量。

以下是您更新后的代码的近似示例:

 #Create static data
    data = [('log_name','enrichment_success')])
    #Broadcast it to all nodes
    ip_classification_broadcast = sc.broadcast(data)
    #Join stream with static dataset on field log_name      
    joinedStream = kafkaStream.transform(lambda rdd:  \
        rdd.join(ip_classification_broadcast.value().get[1]))

错误日志片段: rdd.join(ip_classification_broadcast.value().get()[log_name])) TypeError: 'list' 对象不可调用。 - steven
看起来 value 是一个属性而不是一个方法 - 所以我已经更新了上面的代码:尝试使用 .get[log_name] 而不是 get()[log_name]。由于我没有你的代码和测试设置,你需要稍微调整这些细节。 - WestCoastProjects
为了调试,我运行了join函数的内容。 ip_classification_broadcast.value().get[log_name] 的结果是:TypeError: 'list' object is not callable 这个:ip_classification_broadcast.value[0] 的结果是:('log_name', 'enrichment_success') 但是当我运行spark-submit时,出现了以下错误: AttributeError: 'tuple' object has no attribute 'mapValues' - steven
1
啊 - 这意味着我们接近了!您可能需要重新构造data,使其成为一个dict而不是一个tuple。但在这之前 - 通过使用当前的tuple:以下内容应该编译并给出enrichment_success作为输出:ip_classification_broadcast.value().get[1]。请注意,[1]访问元组的第二个元素。稍后,您将希望能够通过键“log_name”访问:这将需要重构。 - WestCoastProjects
案例3: ip_classification_rdd = sc.parallelize([('log_name','enrichment_success')]) joinedStream = kafkaStream.transform(lambda rdd: rdd.join(ip_classification_broadcast))编译通过但打印无输出。 - steven
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接