到目前为止,Spark还没有为流数据创建DataFrame。但是,在进行异常检测时,使用DataFrame进行数据分析更加方便快捷。我已经完成了这部分工作,但是在尝试使用流数据进行实时异常检测时,出现了问题。我尝试了几种方法,仍然无法将DStream转换为DataFrame,也无法将DStream内部的RDD转换为DataFrame。
以下是我最新版本代码的部分内容:
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import KMeans, KMeansModel, StreamingKMeans
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import operator
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
model_inputs = sys.argv[1]
def streamrdd_to_df(srdd):
sdf = sqlContext.createDataFrame(srdd)
sdf.show(n=2, truncate=False)
return sdf
def main():
indata = ssc.socketTextStream(sys.argv[2], int(sys.argv[3]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('rawFeatures')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=3)
streaming_df = features_rdd.flatMap(streamrdd_to_df)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
如您在main()函数中所见,当我使用ssc.socketTextStream()方法读取输入流数据时,它会生成DStream,然后我尝试将DStream中的每个单独元素转换为Row,希望稍后可以将数据转换为DataFrame。
如果我在这里使用ppprint()打印features_rdd,它可以正常工作,这使我认为,features_rdd中的每个单独元素都是RDD批处理,而整个features_rdd是一个DStream。
然后我创建了streamrdd_to_df()方法,并希望将每个RDD批处理转换为dataframe,但它给出了错误,显示:
ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
有没有想法可以在Spark流数据上执行DataFrame操作?