将Spark结构化流数据框转换为Pandas数据框。

3

我已经设置了一个Spark Streaming应用程序来从Kafka主题中获取数据,但我需要使用一些API来处理Pandas数据框。但是当我尝试转换时,出现了以下错误:

: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2809)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

这是我的Python代码

spark = SparkSession\
    .builder\
    .appName("sparkDf to pandasDf")\
    .getOrCreate()

sparkDf = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "mytopic")\
    .option("startingOffsets", "earliest")\
    .load()


pandas_df =  sparkDf.toPandas()

query = sparkDf.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

现在我知道我正在创建流数据框的另一个实例,但无论我在哪里尝试使用start()和awaitTermination(),我都会得到相同的错误。

有什么建议吗?

1个回答

7

简而言之,这样的操作是不起作用的。

现在我意识到我正在创建流式DataFrame的另一个实例。

问题在于你并没有真正这样做。在DataFrame上调用toPandas会创建一个简单的、本地的、非分布式的Pandas DataFrame驱动节点内存中。

它不仅与Spark无关,而且作为一种抽象,与Structured Streaming本质上不兼容——Pandas DataFrame表示一组固定的元组,而结构化流表示一组无限的元组。

目前不太清楚你想要实现什么,可能存在XY问题,但如果你确实需要在Structured Streaming中使用Pandas,可以尝试使用pandas_udf——SCALARGROUPED_MAP变量至少与基本的基于时间的触发器兼容(其他变量也可能被支持,尽管其中一些组合显然没有任何意义,我也不知道有任何官方的兼容性矩阵)。


唉,我花了那么多时间尝试让它运行起来,结果发现无法实现。谢谢你的回复。 - anonuser1234
@anonuser1234 请不要攻击信息传递者 :) 但是,如果您提供有关为什么需要Pandas对象的其他详细信息,我可以尝试扩展答案。 - user10938362
当然,我正在处理的项目使用Panda Dataframes来进行一些NLP工作。我试图从Spark Structure流中获取我们需要的数据,然后将其作为panda DF发送,并让NLP人员处理它们。 - anonuser1234
如果主要目标是建模/在线学习,那么Spark结构化流处理在这里可能不会太有帮助,尽管foreach写入器可能会提供一些机会。对于预测等任务,pandas_udfs应该可以胜任。 - user10938362
我们想利用Apache Spark及其RDDs(Dataframes运行在其上)来处理大量数据。 - anonuser1234
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接