通过Spark SQL查询Cassandra UDT

3
我们希望通过SparkSQL从Cassandra DB查询数据。问题在于,数据以UDT的形式存储在Cassandra中。UDT的结构是深度嵌套的,并且包含长度可变的数组,因此将数据分解为平面结构将非常困难。我找不到任何有效的示例来演示如何通过SparkSQL查询这样的UDT,特别是基于UDT值过滤结果的情况。
另外,您能否建议不同的ETL管道(查询引擎、存储引擎等),这将更适合我们的用例?
我们的ETL管道:Kafka(重复事件)-> Spark流-> Cassandra(去重以仅存储最新事件)<- Spark SQL <-分析平台(UI)
我们迄今尝试过的解决方案:
1)Kafka-> Spark-> Parquet <- Apache Drill
一切正常,我们可以查询和过滤数组和嵌套数据结构。
问题:无法去重数据(使用最新事件重写parquet文件)

2) Kafka -> Spark -> Cassandra <- Presto

问题已解决1): 数据去重。

问题:Presto不支持UDT类型 (Presto文档, Presto问题)

我们的主要需求是:

  • 支持数据去重。我们可能会收到许多具有相同ID的事件,并且我们只需要存储最新的一条。
  • 存储具有数组的深层嵌套数据结构
  • 分布式存储,可扩展以适应未来的扩展
  • 具有类似SQL查询支持的分布式查询引擎(用于与Zeppelin、Tableau、Qlik等连接)。查询不必实时运行,几分钟的延迟是可以接受的。
  • 支持模式演化(AVRO风格)

感谢您的任何建议

1个回答

1
你可以使用点语法对嵌套元素执行查询。例如,如果我有以下CQL定义:
cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;

 id | t2
----+-------------------------------
  1 | {id: 1, t1: {id: 1, t: 't1'}}
  2 | {id: 2, t1: {id: 2, t: 't2'}}

(2 rows)

然后我可以按以下方式加载该数据:
scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
     options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
|  2|[2,[2,t2]]|
+---+----------+

然后查询数据,仅选择UDT字段中特定的值:

scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> res.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
+---+----------+

你可以使用spark.sql或相应的.filter函数,具体取决于你的编程风格。这种技术适用于来自不同来源(例如JSON等)的任何结构类型数据。
但请注意,当按分区键/聚簇列进行查询时,您将无法获得来自Cassandra连接器的优化。

你好Alex,感谢你的回答,这正是我在寻找的。当我启动Spark Thrift Server时,我甚至可以通过JDBC进行查询(与分析工具连接)。我对你最后一句话中关于性能的问题感到担忧。当我将分区键(或分区键列表)包含在选择中时,我希望Spark会将它们“下推”到Cassandra? - Tomas Bartalos
是的,如果您将分区键和/或聚簇列包含到条件中,那么连接器将会将过滤器下推到Cassandra,并在减少的数据集上执行UDT的过滤,这与仅在UDT上进行过滤不同,后者需要读取所有数据并在Spark级别上执行过滤。 - Alex Ott
@AlexOtt 这个例子似乎不能在我执行 "select t2.t1.id" 的情况下正常工作,而我想要从那里创建数据框架。有没有什么解决方法? - user1228785
这段代码在真实的机器上执行。你能把它作为一个单独的问题发布吗?包括你遇到的错误、实际代码等。 - Alex Ott

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接