我有一个使用案例,我们想要从具有JSON的S3中读取文件。然后,基于特定的JSON节点值,我们想要对数据进行分组并将其写入S3。
我能够读取数据,但是找不到好的示例来根据JSON键分区数据,然后上传到S3。是否有人可以提供任何示例或指向可以帮助我处理此用例的教程?
在创建数据框之后,我已经获得了我的数据模式:
root
|-- customer: struct (nullable = true)
| |-- customerId: string (nullable = true)
|-- experiment: string (nullable = true)
|-- expiryTime: long (nullable = true)
|-- partitionKey: string (nullable = true)
|-- programId: string (nullable = true)
|-- score: double (nullable = true)
|-- startTime: long (nullable = true)
|-- targetSets: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- featured: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- data: struct (nullable = true)
| | | | | |-- asinId: string (nullable = true)
| | | | |-- pk: string (nullable = true)
| | | | |-- type: string (nullable = true)
| | |-- reason: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- recommended: array (nullable = true)
| | | |-- element: string (containsNull = true)
我希望根据customerId列的随机哈希值对数据进行分区。但是当我执行以下操作时:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");
它会出现错误:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));
请告诉我如何访问customerId列。