在Apache Spark中按列对S3进行分区

3

我有一个使用案例,我们想要从具有JSON的S3中读取文件。然后,基于特定的JSON节点值,我们想要对数据进行分组并将其写入S3。

我能够读取数据,但是找不到好的示例来根据JSON键分区数据,然后上传到S3。是否有人可以提供任何示例或指向可以帮助我处理此用例的教程?

在创建数据框之后,我已经获得了我的数据模式:

root
 |-- customer: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |-- experiment: string (nullable = true)
 |-- expiryTime: long (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- programId: string (nullable = true)
 |-- score: double (nullable = true)
 |-- startTime: long (nullable = true)
 |-- targetSets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- featured: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- data: struct (nullable = true)
 |    |    |    |    |    |-- asinId: string (nullable = true)
 |    |    |    |    |-- pk: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- reason: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- recommended: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

我希望根据customerId列的随机哈希值对数据进行分区。但是当我执行以下操作时:
df.write.partitionBy("customerId").save("s3/bucket/location/to/save");

它会出现错误:
org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));

请告诉我如何访问customerId列。

1个回答

5

让我们以示例数据集sample.json为例。

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}

现在开始使用Spark进行黑客攻击。
val jsonDf = spark.read
  .format("json")
  .load("path/of/sample.json")

jsonDf.show()

+---------+-------+-----+-----+
|     CITY|CUST_ID|STATE|  ZIP|
+---------+-------+-----+-----+
| San Jose| 115734|   CA|95106|
|Allentown| 115728|   PA|18101|
|Allentown| 115730|   PA|18101|
|San Mateo| 114728|   CA|94401|
| Somerset| 114726|   NJ| 8873|
+---------+-------+-----+-----+

然后按列"ZIP"对数据集进行分区,并将其写入S3

jsonDf.write
  .partitionBy("ZIP")
  .save("s3/bucket/location/to/save")
  // one liner athentication to s3
  //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")

注意:为了使该代码成功,S3访问密钥和秘密密钥必须正确配置。查看此答案以与S3进行Spark/Hadoop集成

编辑:解决方案:模式中未找到分区列customerId(根据评论)

customerId 存在于 customer 结构中,请尝试提取 customerId 然后进行分区。

df.withColumn("customerId", $"customer.customerId")
  .drop("customer")
  .write.partitionBy("customerId")
  .save("s3/bucket/location/to/save")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接