如何从Cassandra表中流式传输数据?

7
我希望能够从实时更新的Cassandra表中流式传输数据。是的,它是一个数据库,但是否有办法做到这一点?如果有,应保留偏移量或者可以使用哪些CQL查询?

有人知道如何在spark-cassandra-connector中使用它吗? 它自己处理偏移量并实时流式传输数据吗? - krish at
3个回答

7
短答案是否定的。
长答案是,通过艰苦的努力和聪明的聚集键,您可能可以做到这一点。基本上,如果您插入具有始终增加的聚集键的数据,则始终只能在最近的时间间隔内扫描聚集键。当然,这将错过窗口外的乱序插入。这可能对您的用例足够好或不够好。
未来的最佳答案是Change Data Capture:https://issues.apache.org/jira/browse/CASSANDRA-8844

可能也会触发? - Jeff Jirsa
有人知道如何在 Spark-Cassandra-Connector 中使用吗?它是否能够自动处理偏移量并以近乎实时的方式流式传输数据? - krish at
不可以,也不行。正如我所说的,您必须为自己的表编写一些自定义代码(或设计自定义触发器和接收器),它都无法实现这些功能。 - RussS
1
如果你真的需要流式处理,大多数人会发现将数据插入队列中,然后使用类似于Spark Streaming的工具来分析数据是最容易的。 - RussS
@RussS,未来已经到来了!那个问题几个月前就已经解决了。那么你有什么想法如何在Spark中使用它吗? - Vagif
显示剩余3条评论

0

我知道你特别问的是如何从Cassandra中流式传输数据,但我想建议像Apache Kafka这样的技术更适合你要做的事情。它被许多其他大公司使用,并具有出色的实时性能。

Jay Kreps撰写了一篇开创性的博客文章The Log: What every software engineer should know about real-time data's unifying abstraction,很好地解释了Kafka的目的和设计。博客文章中的一个关键引用总结了Kafka的作用:

将所有组织的数据放入一个中央日志中进行实时订阅。


0

如果要从Cassandra流式传输数据,您可以使用PageSize选项,如下所示:

iter := cass.Query(`SELECT * FROM cmuser.users;`).PageSize(100).Iter()

以上是一个使用Golang的示例。PageSize的描述如下:

PageSize将告诉迭代器以n个为一组的页面大小获取结果。 这对于迭代大型结果集非常有用,但是将页面大小设置得太低可能会降低性能。 此功能仅适用于Cassandra 2及以上版本。


如果您的表中有大量数据,请不要这样做... - Alex Ott
为什么?PageSize 应该有助于防止太多的数据被读入 golang 进程。 - user12834955
因为需要访问不同的节点,由于没有负载均衡等,这会给协调者带来负担...更好的解决方案是按令牌范围进行扫描,并将它们明确路由到副本。 - Alex Ott
然后,请使用令牌范围添加答案,并在此处链接。 - user12834955
关于select * from table的回答实际上没有回答原始问题-作者想要只获取自上次扫描以来的更改。最好的答案来自@RussS...如果你对令牌范围扫描感兴趣-我有一个关于它的单独答案:https://dev59.com/nbDma4cB1Zd3GeqPEv10#54108700 - Alex Ott

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接