如何使用pySpark Databricks从BinaryType中提取列?

3

问题:从数据框的二进制类型列中提取列。该数据框从 Azure 的 blob 存储帐户加载。

环境:

  • Databricks 5.4(包括 Apache Spark 2.4.3)
  • Python 3.5.2

过程:

  1. 从avro文件获取数据
  2. 提取有用信息并将更用户友好的版本写回到 parquet 文件中

Avro 模式:


    SequenceNumber:long
    Offset:string
    EnqueuedTimeUtc:string
    SystemProperties:map
        key:string
        value:struct
            member0:long
            member1:double
            member2:string
            member3:binary
    Properties:map
        key:string
        value:struct
            member0:long
            member1:double
            member2:string
            member3:binary
    Body:binary

我在从Body:binary获取数据时遇到了困难。我使用下面的代码片段将列转换为字符串

df = df.withColumn("Body", col("Body").cast("string"))

我成功地使用以下代码提取了主体列中的列列表:

        #body string looks like json
        dfBody = df.select(df.Body)
        jsonList = (dfBody.collect())
        jsonString = jsonList[0][0]
        columns = []
        data = json.loads(jsonString)

        for key, value in data.items():
            columns.append(key)

        columns.sort()
        print(columns) 

这个列表有一些有趣的列,如ID、状态和名称。

问题: 如何添加ID列到位于body二进制列中并添加到当前数据帧中。总的来说,我想要展开二进制列。二进制列可能也有数组。

1个回答

2

您不需要收集数据框,而是应该能够转换和展平body字段。从外观上看,您正在使用来自事件中心的avro捕获。这是我用来处理此问题的代码:

最初的回答:

from pyspark.sql.types import StringType, IntegerType, StructType, StructField
from pyspark.sql.functions import from_json, col

# Create a schema that describes the Body field
sourceSchema = StructType([
        StructField("Attribute1", StringType(), False),
        StructField("Attribute2", StringType(), True),
        StructField("Attribute3", StringType(), True),
        StructField("Attribute4", IntegerType(), True)])


# Convert Body to String and then Json applying the schema
df = df.withColumn("Body", col("Body").cast("string"))
jsonOptions = {"dateFormat" : "yyyy-MM-dd HH:mm:ss.SSS"}
df = df.withColumn("Body", from_json(df.Body, sourceSchema, jsonOptions))

# Flatten Body
for c in df.schema['Body'].dataType:
    df = df.withColumn(c.name, col("Body." + c.name))

我认为你需要的关键部分是from_json函数。最初的回答中已经提到了这个函数。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接