将Spark DataFrame转换为Pojo对象

5
请看下面的代码:
    //Create Spark Context
    SparkConf sparkConf = new SparkConf().setAppName("TestWithObjects").setMaster("local");
    JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
    //Creating RDD
    JavaRDD<Person> personsRDD = javaSparkContext.parallelize(persons);
    //Creating SQL context
    SQLContext sQLContext = new SQLContext(javaSparkContext);
    DataFrame personDataFrame = sQLContext.createDataFrame(personsRDD, Person.class);
    personDataFrame.show();
    personDataFrame.printSchema();
    personDataFrame.select("name").show();
    personDataFrame.registerTempTable("peoples");
    DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
    result.show();

接下来我需要将数据框"result"转换为人员对象或列表。提前感谢。

3个回答

11

DataFrame只是Dataset[Row]的类型别名。这些操作也被称为“未经过类型定义的转换”,与强类型的Scala/Java Datasets带来的“经过类型定义的转换”形成对比。

在Spark中,将Dataset[Row]转换为Dataset[Person]非常简单。

DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");

在这一点上,Spark将您的数据转换为DataFrame = Dataset[Row],它是一个通用的Row对象集合,因为它不知道确切的类型。

// Create an Encoders for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class); 
Dataset<Person> personDF = result.as(personEncoder);
personDF.show();

现在,Spark将Dataset [Row]转换为特定类型的Scala / Java JVM对象Dataset [Person],并根据类Person进行操作。

请参考databricks提供的以下链接以获取更多详细信息:

https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html


似乎编码器试图设置所有类参数,而不仅仅是构造函数中的参数。例如,如果我有一个类如下:class A { int p1 int p2 public A(int p1) { this.p1 = p1 this.p2 = p1 * 2 } }在上述情况下,编码器要求数据框架中同时提供p1和p2,这很奇怪。 - Behrad3d

2

如果有人想要将 json字符串列Dataset<Row> 中转化为 Dataset<PojoClass>

示例POJO:Testing

@Data
public class Testing implements Serializable {
    private String name;
    private String dept;
}

在上面的代码中,@Data 是来自 Lombok 的注解,用于为这个 Testing 类生成 getter 和 setter。 Spark 中的实际转换逻辑
@Test
void shouldConvertJsonStringToPojo() {
   var sparkSession  = SparkSession.builder().getOrCreate(); 
   var structType =  new StructType(new StructField[] {
        new StructField("employee", DataTypes.StringType, false, Metadata.empty()),
    });

    var ds = sparkSession.createDataFrame(new ArrayList<>(
        Arrays.asList(RowFactory.create(new Object[]{"{ \"name\": \"test\", \"dept\": \"IT\"}"}))), structType);

    var objectMapper = new ObjectMapper();
    var bean = Encoders.bean(Testing.class);

    var testingDataset = ds.map((MapFunction<Row, Testing>) row -> {
        var dept = row.<String>getAs("employee");

        return objectMapper.readValue(dept, Testing.class);
    }, bean);

    assertEquals("test", testingDataset.head().getName());
}

2

正如方法所提供的,我们只能逐个获取每个值,而不能作为整个对象获取。 - Don Mathew
@DonMathew,目前你不会得到更好的东西。如果更容易的话,你可以将其转换为JSON并从那里开始。即将推出的“DataSet” API应该会给你想要的东西,尽管我现在不确定POJO是否需要解码器。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接