如何对流式数据集进行数据透视?

8

我正在尝试对Spark流数据集(结构化流)进行数据透视,但是我遇到了一个AnalysisException异常(下面是异常摘录)。

是否有人能够确认在结构化流(Spark 2.0)中确实不支持数据透视,并可能提出其他的解决方案?

主线程中的异常“org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

3个回答

6

总结:截至2.4.4版本,Spark Structured Streaming不直接支持pivot聚合。

可使用DataStreamWriter.foreachBatch或更常规的DataStreamWriter.foreach进行解决。


我使用目前最新版本的Spark 2.4.4。
scala> spark.version
res0: String = 2.4.4

UnsupportedOperationChecker(可以在堆栈跟踪中找到)检查流查询的逻辑计划是否仅使用支持的操作。

当您执行pivot时,必须首先进行groupBy,因为这是唯一提供可用pivot的接口。

pivot有两个问题:

  1. pivot想知道要生成值的列数,因此会进行collect,但在流式数据集中不可能。

  2. pivot实际上是另一个聚合(除了groupBy),Spark结构化流不支持

让我们看看第1个问题,在没有定义要旋转的列的情况下。

val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp") // <-- pivot with no values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
rate
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.$anonfun$checkForBatch$1$adapted(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreachUp$1$adapted(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
  at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
  at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3365)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)
  at org.apache.spark.sql.RelationalGroupedDataset.pivot(RelationalGroupedDataset.scala:384)
  ... 49 elided

最后两行显示了问题,即pivot执行collect操作并因此产生了问题。
另一个问题是,即使您指定了要旋转的列的值,由于多个聚合,您将遇到另一个问题(您可以看到它实际上是针对流式处理而不是批处理,就像第一个案例发生的那样)。
val sq = spark
  .readStream
  .format("rate")
  .load
  .groupBy("value")
  .pivot("timestamp", Seq(1)) // <-- pivot with explicit values
  .count
  .writeStream
  .format("console")
scala> sq.start
org.apache.spark.sql.AnalysisException: Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;
Project [value#128L, __pivot_count(1) AS `count` AS `count(1) AS ``count```#141[0] AS 1#142L]
+- Aggregate [value#128L], [value#128L, pivotfirst(timestamp#127, count(1) AS `count`#137L, 1000000, 0, 0) AS __pivot_count(1) AS `count` AS `count(1) AS ``count```#141]
   +- Aggregate [value#128L, timestamp#127], [value#128L, timestamp#127, count(1) AS count(1) AS `count`#137L]
      +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dd63368,rate,List(),None,List(),None,Map(),None), rate, [timestamp#127, value#128L]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:93)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:250)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
  ... 49 elided

0
在大多数情况下,您可以使用条件聚合作为解决方法。相当于
df.groupBy("timestamp").
   pivot("name", Seq("banana", "peach")).
   sum("value")

df.filter($"name".isin(Seq("banana", "peach"):_*)).
   groupBy("timestamp").
   agg(
     sum(when($"name".equalTo("banana"), $"value").
         otherwise("null")).
         cast(IntegerType).alias("banana"),
     sum(when($"name".equalTo("peach"), $"value").
         otherwise("null")).
         cast(IntegerType).alias("peach")
   )

0

这是一个基于Jacek上面回答的简单Java示例:

JSON数组:

[{
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-01"
    },
    {
        "customer_id": "d6315a00",
        "product": "Super widget",
        "price": 10,
        "bought_date": "2019-01-02"
    },
    {
        "customer_id": "d6315a00",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-08-20"
    },
    {
        "customer_id": "d6315cd0",
        "product": "Food widget",
        "price": 4,
        "bought_date": "2019-09-19"
    }, {
        "customer_id": "d6315e2e",
        "product": "Bike widget",
        "price": 10,
        "bought_date": "2019-01-01"
    }, {
        "customer_id": "d6315a00",
        "product": "Bike widget",
        "price": 10,
        "bought_date": "2019-03-10"
    },
    {
        "customer_id": "d631614e",
        "product": "Garage widget",
        "price": 4,
        "bought_date": "2019-02-15"
    }
]

Java 代码:

package io.centilliard;

import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.from_json;

import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Function2;
import scala.runtime.BoxedUnit;

public class Pivot {

    public static void main(String[] args) throws StreamingQueryException, AnalysisException {

        StructType schema = new StructType(new StructField[]{
                new StructField("customer_id", DataTypes.StringType, false, Metadata.empty()),  
                new StructField("product", DataTypes.StringType, false, Metadata.empty()),          
                new StructField("price", DataTypes.IntegerType, false, Metadata.empty()),               
                new StructField("bought_date", DataTypes.StringType, false, Metadata.empty())
            });

        ArrayType  arrayType = new ArrayType(schema, false);

        SparkSession spark = SparkSession
                .builder()
                .appName("SimpleExample")
                .getOrCreate();

        // Create a DataSet representing the stream of input lines from Kafka
        Dataset<Row> dataset = spark
                        .readStream()
                        .format("kafka")                
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", "utilization")
                        .load()
                        .selectExpr("CAST(value AS STRING) as json");

        Column col = new Column("json");        
        Column data = from_json(col,arrayType).as("data");  
        Column explode = explode(data);
        Dataset<Row> customers = dataset.select(explode).select("col.*");

        DataStreamWriter<Row> dataStreamWriter = new DataStreamWriter<Row>(customers);

        StreamingQuery dataStream = dataStreamWriter.foreachBatch(new Function2<Dataset<Row>, Object, BoxedUnit>() {

            @Override
            public BoxedUnit apply(Dataset<Row> dataset, Object object) {               

                dataset
                .groupBy("customer_id","product","bought_date")
                .pivot("product")               
                .sum("price")               
                .orderBy("customer_id")
                .show();

                return null;
            }
        })
        .start();

        dataStream.awaitTermination();
    }

}

输出:

+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|customer_id|      product|bought_date|Bike widget|Food widget|Garage widget|Super widget|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+
|   d6315a00|  Bike widget| 2019-03-10|         20|       null|         null|        null|
|   d6315a00| Super widget| 2019-01-02|       null|       null|         null|          20|
|   d6315a00| Super widget| 2019-01-01|       null|       null|         null|          40|
|   d6315a00|  Food widget| 2019-08-20|       null|          8|         null|        null|
|   d6315cd0|  Food widget| 2019-09-19|       null|          8|         null|        null|
|   d6315e2e|  Bike widget| 2019-01-01|         20|       null|         null|        null|
|   d631614e|Garage widget| 2019-02-15|       null|       null|            8|        null|
+-----------+-------------+-----------+-----------+-----------+-------------+------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接