使用Spark向Cassandra写入数据

3

我有一个用Scala编写的Spark作业,我只是想把来自Kafka生产者的一行用逗号分隔,写入Cassandra数据库中。 但是我无法调用saveToCassandra。 我看到了一些单词计数的例子,他们将映射结构写入具有两列的Cassandra表中,并且似乎工作正常。 但我有很多列,我发现数据结构需要并行化处理。 这是我的代码示例:

object TestPushToCassandra extends SparkStreamingJob {
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid

def runJob(ssc: StreamingContext, config: Config): Any = {

val bp_conf=BpHooksUtils.getSparkConf()
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default")


val input_topics = config.getString("topics.in").split(",").toSet


val output_topic = config.getString("topic.out")


val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics)


val lines = messages.map(_._2)
val words = lines.flatMap(_.split(","))

val li = words.par

li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3"))
li.print()



words.foreachRDD(rdd =>
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{

        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        val outMsg=x+" from spark"
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String](output_topic,null,outMsg)
        producer.send(message)
      }
    }


  )
)


ssc.start()
ssc.awaitTermination()
}
}

我认为我没有掌握Scala的语法正确。

提前感谢。


1
调用words.par几乎肯定不是正确的做法。DStream“words”已经是一个本质上已经分布和并行化的DStream。你没有那个问题吗? - RussS
它可以在没有“.par”文件的情况下工作,但现在我想知道如何拆分值以提取col1、col2、col3的值?例如,如果在kafka生产者中我写入“val1,val2,val3”,那么我该如何分别提取这些值并存储在col1、col2和col3中? - user3925365
你是说你不能用 .split(",") 分割字符串吗? - RussS
是的,完全正确。所以基本上如果我从生产者传递“val1,val2,val3”,那么在上面的代码中,我的变量“lines”和“words”的代码应该是什么,这样我就可以在“words”变量上调用saveToCassandra? - user3925365
1个回答

1
你需要将文本中的DStream更改为连接器可以处理的内容,例如元组。
val words = lines
  .map(_.split(","))
  .map( wordArr => (wordArr(0), wordArr(1), wordArr(2)) 

或者一个案例类。
case class YourRow(col1: String, col2: String, col3: String)
val words = lines
  .map(_.split(","))
  .map( wordArr => YourRow(wordArr(0), wordArr(1), wordArr(2)))

或者是CassandraRow。

这是因为如果你只放置一个数组,它可能是C*中要插入的数组,而不是3列。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md


感谢您的回答。当我尝试使用您的解决方案时,它在数据库中存储的位有所不同。也许我错过了一些小的语法问题。如果我从Kafka生产者传递abc、def、ghi,这是我的代码:val lines = messages.map(.2) val words = lines.flatMap(.split(" ")) val innerWords = words.flatMap(.split(",")) val wordCounts = innerWords.map(wordArr = (wordArr(0),wordArr(1),wordArr(2)))wordCounts.saveToCassandra("keyspace01","table1", SomeColumns("col1","col2","col3"))这段代码会在数据库中生成三个条目,即第一个:a,b,c 第二个:d,e,f 第三个:g,h,i。 - user3925365
哎呀,我不应该复制你的代码行,那应该是map而不是flatMap。 - RussS
如果我在使用map函数时将单词放在第一位进行分割,那么在words.foreachRDD函数的"case x:String"行会出现编译器错误。它说“scrutinee与pattern类型不兼容;找到:String,需要:Array [String]”。 - user3925365
如果您只想将数据传输到C*,那么使用Kafka就没有必要了,可以直接使用Spark Streaming中的fileStream。 - RussS
谢谢!我会研究一下的! - user3925365
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接