如何将流式数据集写入Cassandra?

9

我有一个Python流数据源DataFrame df,其中包含所有我想要放入一个Cassandra表格的数据,并且我想使用spark-cassandra-connector。我已经尝试过两种方法:

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .save() 

query = df.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .outputMode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .start()

query.awaitTermination()

然而我一直在遇到这些错误,分别如下:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;

并且

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.

有没有办法将我流式处理的DataFrame发送到我的Cassandra表中?
2个回答

6
目前在Spark Cassandra Connector中没有Cassandra的流式Sink。您需要实现自己的Sink或等待其可用。
如果您使用Scala或Java,则可以使用foreach操作符,并像Using Foreach中描述的那样使用ForeachWriter

1
有没有办法将我的流式DataFrame转换为非流式数据框? - user2361174
2
不,目前没有任何转换方式(至少我不知道有)。 - RussS
你有Java的工作示例吗?看起来所有的解决方案都需要使用Scala实现的trait CassandraConnector.withSessionDo,所以Kotlin或Java没有成功的机会。 - reith
4
这个问题问的是今天(也就是2018年)还是否仍然正确? - user1870400
在Spark 2.3及以上版本中,当创建自定义sink时,在addbatch方法中似乎Spark不允许您对数据框调用.write()。它会抛出OP分享的错误。有人知道这里的替代方案吗? - DataGeek
显示剩余3条评论

5

我知道这是一篇旧文章,为了以后的参考更新一下。

您可以像下面这样从流数据中批量处理它:

def writeToCassandra(writeDF, epochId):
 writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table_name", keyspace="keyspacename")\
    .mode("append") \
    .save()

query = sdf3.writeStream \
.trigger(processingTime="10 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()

它给我报错,即无法找到数据源:org.apache.spark.sql.cassandra。我正在使用--packages选项传递包。 - Abdul Haseeb

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接