如何将Spark Streaming与Cassandra连接?

3

我正在使用

Cassandra v2.1.12
Spark v1.4.1
Scala 2.10

而Cassandra正在监听

rpc_address:127.0.1.1
rpc_port:9160

例如,为了连接kafka和spark-streaming,在每4秒监听kafka时,我有以下的spark作业。
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,4)
map1={'topic_name':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)

每4秒,spark-streaming会监听kafka broker并输出内容。

同样地,我希望spark streaming可以监听cassandra,并在每4秒输出指定表的内容

如何将上述流处理代码转换为使用cassandra而不是kafka?


非流处理解决方案

显然,我可以将查询保持在无限循环中运行,但这并不是真正的流处理,对吧?

Spark作业:

from __future__ import print_function
import time
import sys

from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming import *

sc = SparkContext(appName="sparkcassandra")
while(True):
    time.sleep(5)
    sqlContext = SQLContext(sc)
    stream=StreamingContext(sc,4)
    lines = stream.socketTextStream("127.0.1.1", 9160)
    sqlContext.read.format("org.apache.spark.sql.cassandra")\
                 .options(table="users", keyspace="keyspace2")\
                 .load()\
                 .show()

像这样运行

sudo ./bin/spark-submit --packages \
datastax:spark-cassandra-connector:1.4.1-s_2.10 \
examples/src/main/python/sparkstreaming-cassandra2.py

我得到了表格数值,大致看起来像这样

lastname|age|city|email|firstname

那么从cassandra中“流式传输”数据的正确方法是什么?


2个回答

2
目前从C*流式传输数据的“正确方法”不是直接从C*流式传输数据 :) 相反,通常更有意义的做法是在C*之前放置消息队列(如Kafka),然后从那里进行流式传输。虽然如果聚簇键基于插入时间,C*可以支持增量表读取,但它并不容易支持。
如果您有兴趣将C*用作流源,请务必查看并评论https://issues.apache.org/jira/browse/CASSANDRA-8844 Change Data Capture,这很可能是您要寻找的内容。
如果您实际上只是尝试定期读取完整表并执行某些操作,则最好使用cron作业启动批处理操作,因为您无论如何都无法恢复状态。

0

目前在Spark 1.6中,Cassandra不支持作为流源的本地支持,您必须为自己的情况实现自定义接收器(监听Cassandra并每4秒输出指定表的内容。

请参考实现指南:

Spark Streaming Custom Receivers


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接