TL;DR - 我在一个PySpark应用程序中有一个看起来像是字符串DStream的东西。我想将其作为
我正在开发一个PySpark应用程序,使用Spark Streaming从Kafka中获取数据。我的消息是字符串,我想调用Scala代码中的方法,并传递一个
我的问题是:如何从
以下是我想出的最简单的Python代码:
在Scala方面,我有以下内容:
现在,假设我向Kafka发送了一些数据:
Scala代码中的
这表明字符串实际上是作为字节数组传递的。
如果我只尝试将这个字节数组转换成字符串(我知道我甚至没有指定编码):
我得到一个看起来像(特殊字符可能被剥离)的东西:
这表明Python字符串已经被序列化(pickled?)。我该如何检索出一个正确的Java字符串呢?
DStream[String]
发送到Scala库。然而,Py4j不能转换字符串。我正在开发一个PySpark应用程序,使用Spark Streaming从Kafka中获取数据。我的消息是字符串,我想调用Scala代码中的方法,并传递一个
DStream[String]
实例。然而,我无法在Scala代码中接收到正确的JVM字符串。在我看来,Python字符串没有被转换成Java字符串,而是被序列化了。我的问题是:如何从
DStream
对象中获取Java字符串?以下是我想出的最简单的Python代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
我正在PySpark中运行此代码,并将其传递到我的JAR路径:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
在Scala方面,我有以下内容:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
现在,假设我向Kafka发送了一些数据:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Scala代码中的
println
语句会打印出以下类似内容:[B@758aa4d9
我原本期望得到 foo bar
。
现在,如果我将 Scala 代码中简单的 println
语句替换为以下内容:
rdd.foreach(v => println(v.getClass.getCanonicalName))
我得到:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
这表明字符串实际上是作为字节数组传递的。
如果我只尝试将这个字节数组转换成字符串(我知道我甚至没有指定编码):
def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
我得到一个看起来像(特殊字符可能被剥离)的东西:
�]qXfoo barqa.
这表明Python字符串已经被序列化(pickled?)。我该如何检索出一个正确的Java字符串呢?