用Scala转换PySpark RDD

5
TL;DR - 我在一个PySpark应用程序中有一个看起来像是字符串DStream的东西。我想将其作为DStream[String]发送到Scala库。然而,Py4j不能转换字符串。
我正在开发一个PySpark应用程序,使用Spark Streaming从Kafka中获取数据。我的消息是字符串,我想调用Scala代码中的方法,并传递一个DStream[String]实例。然而,我无法在Scala代码中接收到正确的JVM字符串。在我看来,Python字符串没有被转换成Java字符串,而是被序列化了。
我的问题是:如何从DStream对象中获取Java字符串?
以下是我想出的最简单的Python代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

我正在PySpark中运行此代码,并将其传递到我的JAR路径:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

在Scala方面,我有以下内容:
package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}

现在,假设我向Kafka发送了一些数据:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN

Scala代码中的println语句会打印出以下类似内容:
[B@758aa4d9

我原本期望得到 foo bar

现在,如果我将 Scala 代码中简单的 println 语句替换为以下内容:

rdd.foreach(v => println(v.getClass.getCanonicalName))

我得到:

java.lang.ClassCastException: [B cannot be cast to java.lang.String

这表明字符串实际上是作为字节数组传递的。
如果我只尝试将这个字节数组转换成字符串(我知道我甚至没有指定编码):
      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }

我得到一个看起来像(特殊字符可能被剥离)的东西:
�]qXfoo barqa.

这表明Python字符串已经被序列化(pickled?)。我该如何检索出一个正确的Java字符串呢?
1个回答

7
长话短说,没有支持的方法来做这样的事情。不要在生产环境中尝试此操作。已经警告过你了。
通常情况下,Spark仅在驱动程序上执行一些基本的RPC调用,并不会在任何其他计算机上启动Py4j网关,因此并没有使用Py4j。当需要时(主要是MLlib和SQL的某些部分),Spark使用Pyrolite序列化JVM和Python之间传递的对象。
此API的这一部分是私有的(Scala)或内部的(Python),因此并不适用于通常的用途。虽然理论上你可以按批次访问它:
package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
  def go(rdd: JavaRDD[Any]) = {
    rdd.rdd.collect {
      case s: String => s
    }.take(5).foreach(println)
  }
}

完整流:

object PythonDStreamHelper {
  def go(stream: JavaDStream[Any]) = {
    stream.dstream.transform(_.collect {
      case s: String => s
    }).print
  }
}

或者将单独的批次暴露为DataFrames(可能是最不邪恶的选项):

object PythonDataFrameHelper {
  def go(df: DataFrame) = {
    df.show
  }
}

并按照以下方式使用这些包装器:

from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
        _to_java_object_rdd(rdd), ssc.sparkContext
    ))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
        rdd.map(lambda x: (x, )).toDF()._jdf
    )
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()

这并不被支持,未经测试,因此除了使用Spark API进行实验之外,对其他任何事情都没有用处。


我很高兴能够帮忙。这里可能有点夸张了。如果你的目标是构建语言无关的扩展,那么你实际上无法避免与内部进行调试,尽管开发人员在这里做出了有意识的决定,但对此进行操作并不适合胆小的人。 - zero323
嗨@zero323,我正在进行相同的过程,但在过程中遇到了很大的问题。我创建了一个对象来将我的Python应用程序与Kerberized Kafka通信。但是当我创建对象时,Spark的JVM找不到对象中的函数。如果我创建一个类,它会找到该类。但由于错误:“pyKafka([org.apache.spark.api.java.JavaRDD,class java.lang.String])不存在”,无法发送rdd对象。我按照步骤操作。可能出了什么问题? - Thiago Baldim

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接