如何将Spark Streaming数据转换为Spark DataFrame

10

到目前为止,Spark还没有为流数据创建DataFrame。但是,在进行异常检测时,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分工作,但是在尝试使用流数据进行实时异常检测时,出现了问题。我尝试了几种方法,仍然无法将DStream转换为DataFrame,也无法将DStream内部的RDD转换为DataFrame。

以下是我最新版本代码的部分内容:

import sys
import re

from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator


sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)

model_inputs = sys.argv[1]

def streamrdd_to_df(srdd):
    sdf = sqlContext.createDataFrame(srdd)
    sdf.show(n=2, truncate=False)
    return sdf

def main():
    indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
    inrdd = indata.map(lambda r: get_tuple(r))
    Features = Row('rawFeatures')
    features_rdd = inrdd.map(lambda r: Features(r))
    features_rdd.pprint(num=3)
    streaming_df = features_rdd.flatMap(streamrdd_to_df)

    ssc.start()
    ssc.awaitTermination()

if __name__ == "__main__":
    main()

如您在main()函数中所见,当我使用ssc.socketTextStream()方法读取输入流数据时,它会生成DStream,然后我尝试将DStream中的每个单独元素转换为Row,希望稍后可以将数据转换为DataFrame。

如果我在这里使用ppprint()打印features_rdd,它可以正常工作,这使我认为,features_rdd中的每个单独元素都是RDD批处理,而整个features_rdd是一个DStream。

然后我创建了streamrdd_to_df()方法,并希望将每个RDD批处理转换为dataframe,但它给出了错误,显示:

ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute

有没有想法可以在Spark流数据上执行DataFrame操作?

7个回答

5

您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Cherry Wu
现在与其发布时相比,它已经有了很大的改进。 - OSK

2

仔细阅读错误提示,它说没有注册任何输出操作。Spark是惰性执行的,只有在有结果需要生成时才会执行作业/代码。在您的程序中没有“输出操作”,Spark正在抱怨同样的问题。

在DataFrame上定义foreach()或原始SQL查询,然后打印结果即可解决问题。


谢谢@Sumit,我确实尝试输出结果。在streamrdd_to_df()函数中,我使用sdf.show(n=2, truncate=False)来打印结果,但是它无法... - Cherry Wu
我执行了代码,它对我起作用了。我唯一做的更改是将 get_tuple 替换为 tuple 并删除了 pyspark.mllib... 导入。虽然输出没有意义,但没有错误。错误信息中还有其他内容吗?你能粘贴完整的堆栈跟踪吗? - Sumit

0

无需将DStream转换为RDD。按定义,DStream是RDD的集合。只需使用DStream的foreach()方法循环遍历每个RDD并采取行动即可。

val conf = new SparkConf()
  .setAppName("Sample")
val spark = SparkSession.builder.config(conf).getOrCreate()
sampleStream.foreachRDD(rdd => {
    val sampleDataFrame = spark.read.json(rdd)
}

0
为什么不使用类似这样的东西:
def socket_streamer(sc): # retruns a streamed dataframe
    streamer = session.readStream\
        .format("socket") \
        .option("host", "localhost") \
        .option("port", 9999) \
        .load()
    return streamer

上述函数的输出本身(或者一般来说是readStream)就是一个DataFrame。你不需要担心df,它已经被Spark自动创建了。 请参阅Spark结构化流编程指南


0

0

Spark 文档 中有关于使用 DStream 的介绍。基本上,您需要在流对象上使用 foreachRDD 与其进行交互。

以下是一个示例(请确保您创建了一个 Spark 会话对象):

def process_stream(record, spark):
    if not record.isEmpty():
        df = spark.createDataFrame(record) 
        df.show()


def main():
    sc = SparkContext(appName="PysparkStreaming")
    spark = SparkSession(sc)
    ssc = StreamingContext(sc, 5)
    dstream = ssc.textFileStream(folder_path)
    transformed_dstream = # transformations

    transformed_dstream.foreachRDD(lambda rdd: process_stream(rdd, spark))
    #                   ^^^^^^^^^^
    ssc.start()
    ssc.awaitTermination()

-2

使用Spark 2.3 / Python 3 / Scala 2.11(使用databricks),我能够使用临时表和Scala代码片段(在笔记本中使用magic):

Python部分:

ddf.createOrReplaceTempView("TempItems")

然后在新的单元格中:

%scala
import java.sql.DriverManager
import org.apache.spark.sql.ForeachWriter

// Create the query to be persisted...
val tempItemsDF = spark.sql("SELECT field1, field2, field3 FROM TempItems")

val itemsQuery = tempItemsDF.writeStream.foreach(new ForeachWriter[Row] 
{      
  def open(partitionId: Long, version: Long):Boolean = {
    // Initializing DB connection / etc...
  }

  def process(value: Row): Unit = {
    val field1 = value(0)
    val field2 = value(1)
    val field3 = value(2)

    // Processing values ...
  }

  def close(errorOrNull: Throwable): Unit = {
    // Closing connections etc...
  }
})

val streamingQuery = itemsQuery.start()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接