使用Spark Streaming从Cassandra读取数据

11

我在使用Spark Streaming从Cassandra读取数据时遇到了问题。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#reading-from-cassandra-from-the-streamingcontext

如上链接,我使用
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)

从cassandra选择数据,但似乎spark streaming只有一次查询,而我希望它能使用10秒的间隔持续查询。

我的代码如下,期待您的回复。

谢谢!

import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue


object SimpleApp {
def main(args: Array[String]){
    val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")

    val ssc = new StreamingContext(conf, Seconds(10))

    val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

    //rdd.collect().foreach(println)

    val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()


    val dstream = ssc.queueStream(rddQueue)

    dstream.print()

    ssc.start()
    rdd.collect().foreach(println)
    rddQueue += rdd
    ssc.awaitTermination()
}  

}


你能描述一下你想要实现什么吗?每个时间间隔都读取完整的表格吗?流数据来自哪里? - maasg
@maasg 我想在每个时间间隔(例如10秒)读取表格,以查询与时间相关的一些记录。这意味着我想让Cassandra成为Spark Streaming的数据源。总之,我在创建DStream时遇到了阻碍。您能否给出一些提示和示例?非常感谢! - Yao Yu
2个回答

9
您可以使用CassandraRDD作为输入来创建ConstantInputDStream。 ConstantInputDStream将在每个流间隔提供相同的RDD,并通过在该RDD上执行操作来触发RDD谱系的材料化,从而每次执行查询时都会在Cassandra上执行。

请确保被查询的数据不会无限增长,以避免增加查询时间并导致不稳定的流处理。

类似以下代码应该能解决问题(以您的代码为起点):

import org.apache.spark.streaming.dstream.ConstantInputDStream

val ssc = new StreamingContext(conf, Seconds(10))

val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")

val dstream = new ConstantInputDStream(ssc, cassandraRDD)

dstream.foreachRDD{ rdd => 
    // any action will trigger the underlying cassandra query, using collect to have a simple output
    println(rdd.collect.mkString("\n")) 
}
ssc.start()
ssc.awaitTermination()

4
如果我只想阅读自上一个RDD处理以来保存在表格中的新数据,是否可能? - Yuri Shkuro
4
有没有办法防止重复获取旧数据?它会导致无限循环。 - Thiago Pereira
据我所知,目前还不可能。 - maasg
只需将数据传输到Kafka,然后从那里传输到Spark,如果需要持久性,则将其转储到Cassandra即可。 - Adrian
@maasg,使用Spark Streaming与仅使用Spark批处理作业相比,我们可以获得什么好处?是否有一种方法可以流式传输Cassandra的实时数据? - Pruthvi Chitrala
显示剩余2条评论

1
我遇到了同样的问题,并通过创建InputDStream类的子类找到了解决方案。需要定义start()compute()方法。 start()可用于准备工作,主要逻辑在compute()中。它应返回Option[RDD[T]]。 为使该类更加灵活,定义了InputStreamQuery特性。
trait InputStreamQuery[T] {
  // where clause condition for partition key
  def partitionCond : (String, Any)
  // function to return next partition key
  def nextValue(v:Any) : Option[Any]
  // where clause condition for clustering key
  def whereCond : (String, (T) => Any)
  // batch size
  def batchSize : Int
}

对于Cassandra表 keyspace.test,创建test_by_date,通过分区键date重新组织表。
CREATE TABLE IF NOT exists keyspace.test
(id timeuuid, date text, value text, primary key (id))

CREATE MATERIALIZED VIEW IF NOT exists keyspace.test_by_date AS
SELECT *
FROM  keyspace.test
WHERE id IS NOT NULL 
PRIMARY KEY (date, id)
WITH CLUSTERING ORDER BY ( id ASC );

test表的一种可能的实现方式如下:

class class Test(id:UUID, date:String, value:String)

trait InputStreamQueryTest extends InputStreamQuery[Test] {
  val dateFormat = "uuuu-MM-dd"

  // set batch size as 10 records
  override def batchSize: Int = 10

  // partitioning key conditions, query string and initial value
  override def partitionCond: (String, Any) = ("date = ?", "2017-10-01")
  // clustering key condition, query string and function to get clustering key from the instance
  override def whereCond: (String, Test => Any) = (" id > ?", m => m.id)
  // return next value of clustering key. ex) '2017-10-02' for input value '2017-10-01'
  override def nextValue(v: Any): Option[Any] = {

    import java.time.format.DateTimeFormatter

    val formatter = DateTimeFormatter.ofPattern( dateFormat)
    val nextDate = LocalDate.parse(v.asInstanceOf[String], formatter).plusDays(1)
    if ( nextDate.isAfter( LocalDate.now()) ) None
    else Some( nextDate.format(formatter))
  }
}

它可以在 CassandraInputStream 类中使用,如下所示。
class CassandraInputStream[T: ClassTag]
(_ssc: StreamingContext, keyspace:String, table:String)
(implicit rrf: RowReaderFactory[T], ev: ValidRDDType[T]) 
extends InputDStream[T](_ssc) with InputStreamQuery[T] {

var lastElm:Option[T] = None
var partitionKey : Any = _

override def start(): Unit = {

  // find a partition key which stores some records
  def findStartValue(cql : String, value:Any): Any = {
    val rdd  = _ssc.sparkContext.cassandraTable[T](keyspace, table).where(cql, value).limit(1)

    if (rdd.cassandraCount() > 0 ) value
    else {
      nextValue(value).map( findStartValue( cql, _)).getOrElse( value)
    }
  }
  // get query string and initial value from partitionCond method
  val (cql, value) = partitionCond
  partitionKey = findStartValue(cql, value)
}

override def stop(): Unit = {}

override def compute(validTime: Time): Option[RDD[T]] = {
  val (cql, _) = partitionCond
  val (wh, whKey) = whereCond

  def fetchNext( patKey: Any) : Option[CassandraTableScanRDD[T]] = {
    // query with partitioning condition
    val query = _ssc.sparkContext.cassandraTable[T](keyspace, table).where( cql, patKey)

    val rdd = lastElm.map{ x =>
      query.where( wh, whKey(x)).withAscOrder.limit(batchSize)
    }.getOrElse( query.withAscOrder.limit(batchSize))

    if ( rdd.cassandraCount() > 0 ) {
      // store the last element of this RDD
      lastElm = Some(rdd.collect.last)
      Some(rdd)
    }
    else {
      // find the next partition key which stores data
      nextValue(patKey).flatMap{ k =>
        partitionKey = k
        fetchNext(k)}
    }
  }

  fetchNext( partitionKey)
}
}

将所有类组合在一起,
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(10))

val dstream = new CassandraInputStream[Test](ssc, "keyspace", "test_by_date") with InputStreamQueryTest

dstream.map(println).saveToCassandra( ... )

ssc.start()
ssc.awaitTermination()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接