结构化流处理和将嵌套数据拆分为多个数据集

7
我正在使用Spark的Structured Streaming(2.2.1)和Kafka一起工作,每60秒从传感器接收数据。我对如何打包这个Kafka数据以正确处理它感到困惑。
我需要在Kafka接收数据时进行一些计算。
我的问题是将从Kafka接收的JSON数据解压成我可以处理的数据集。

数据

简化后的数据大致如下:
{
  id: 1,
  timestamp: "timestamp"
  pump: {
    current: 1.0,
    flow: 20.0
    torque: 5.0
  },
  reactors: [
    {
      id: 1,
      status: 200,
    },

    {
      id: 2,
      status: 300,
    }
  ],
  settings: {
    pumpTimer: 20.0,
    reactorStatusTimer: 200.0
  }
}

为了能够使用Spark,我已经为每个案例类结构创建了一些内容。
// First, general package
case class RawData(id: String, timestamp: String, pump: String, reactors: Array[String], settings: String)
// Each of the objects from the data
case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

使用以下方式生成模式:

val rawDataSchema = Encoders.product[RawData].schema

将原始数据转换为Spark Schema

首先,我将从Kafka中获取的'value'字段放入我的通用schema中:

val rawDataSet = df.select($"value" cast "string" as "json")
  .select(from_json($"json", rawDataSchema))
  .select("data.*").as[RawData]

使用这个rawDataSet,我可以将每个单独的对象打包成数据集。
val pump = rawDataSet.select(from_json($"pump", pumpSchema) as 'pumpData)
  .select("pumpData.*").as[Pump]

val settings = rawDataSet.select(from_json($"settings", settingsSchema) as 'settingsData)
  .select("settingsData.*").as[Settings]

这使我得到了每个JSON对象的干净整洁的数据集。

处理数据

以下是我的问题,如果我想比较或计算设置和泵两个数据集之间的某些值,使用Structured Streaming时JOIN无法工作。

val joinedData = pump.join(settings)

错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;

我的做法是不正确的吗?还是有其他方法推荐来处理这个问题?

谢谢

1个回答

2

我将用现在可行的解决方案回答自己的问题。

与其为JSON中的每个对象创建case类,我可以将它们连接在一起作为一个case类,并嵌套对象,如下所示:

case class RawData(
  id: String, 
  timestamp: String, 
  pump: Pump, 
  reactors: Array[Reactor], 
  settings: Settings
)

case class Pump(current: Float, flow: Float, torque: Float)
case class Reactor(id: Int, status: Int)
case class Settings(oos: Boolean, pumpTimer: Float, reactorStatusTimer: Float)

为了将这个转换为可用的数据集,我可以简单地调用以下命令:
val rawDataset = df.select($"value" cast "string" as "json")
  .select(from_json($"json", Encoders.product[RawData].schema) as 'data)
  .select("data.*").as[RawData]
  .withColumn("reactor", explode($"reactors")) // Handles the array of reactors, making one row in the dataset per reactor.

在处理JSON并将其放入我的定义模式后,我可以像这样选择每个特定的传感器:

val tester = rawDataset.select($"pump.current", $”settings.pumpTimer”)

感谢用户6910411指引我正确的方向。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接