结构化流处理的输出未显示在Jupyter笔记本上。

7

我有两个笔记本电脑。第一个笔记本使用 tweepy 从 Twitter 读取推文并将其写入套接字中。另一个笔记本使用 Spark Structured Streaming(Python)从该套接字中读取推文并将结果写入控制台。不幸的是,我在 Jupyter 控制台上没有得到输出。在 Pycharm 上,代码运行良好。

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# This is Spark Structured Streaming Code which is reading streams from twitter and showing them on console.
tweets = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 7000) \
    .load()

query = tweets \
    .writeStream \
    .option("truncate", "false") \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

你是从Twitter或Socket读取数据吗?因为你的评论说正在读取Twitter。 - Srinivas
我正在从套接字读取数据。 - Abdul Haseeb
一个笔记本正在从Twitter读取推文并将它们写入套接字,另一个笔记本正在使用结构化流从该套接字中读取推文。 - Abdul Haseeb
1个回答

8

我不确定在Jupyter Notebook中是否能够实现这一点。但是,您可以使用内存输出来实现类似的结果。在complete模式下,这很简单,但对于append可能需要进行一些更改。

对于complete模式

complete输出模式下,您的查询应该看起来差不多如下:

query = tweets \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("your_query_name") \
    .start()

请注意,结尾处没有query.awaitTermination()。现在,在另一个单元格中查询your_query_name临时表,并观察连续更新的结果,直到您想停止为止:

from IPython.display import display, clear_output

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM your_query_name').show())
    sleep(1)

关于 append 模式

如果您想使用 append 输出模式,必须使用水印。您也无法使用聚合功能,因此可能需要对代码进行进一步的更改。

query = tweets \
    .withWatermark("timestampColumn", "3 minutes")
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("your_query_name") \
    .start()

显示的代码保持不变。 您还可以以类似方式显示query.lastProgress以获取更详细的信息。

灵感和参考


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接