如何在Java中使用Apache Spark将DataFrame转换为Dataset?

17

我可以很容易地将DataFrame转换为Scala中的Dataset:

case class Person(name:String, age:Long)
val df = ctx.read.json("/tmp/persons.json")
val ds = df.as[Person]
ds.printSchema

但是在Java版本中,我不知道如何将Dataframe转换为Dataset? 有什么思路吗?

我的努力:

DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = new Encoder<>();
Dataset<Person> ds = new Dataset<Person>(ctx,df.logicalPlan(),encoder);
ds.printSchema();

但编译器报错:

Error:(23, 27) java: org.apache.spark.sql.Encoder is abstract; cannot be instantiated

编辑(解决方案):

基于@Leet-Falcon的回答提供的解决方案:

DataFrame df = ctx.read().json(logFile);
Encoder<Person> encoder = Encoders.bean(Person.class);
Dataset<Person> ds = new Dataset<Person>(ctx, df.logicalPlan(), encoder);

2
似乎Java /w Spark 1.6缺少API。 - Leet-Falcon
我该如何用Java编写缺失的API? - Milad Khajavi
2个回答

14

官方Spark文档建议在Dataset API中使用以下方式:

通过调用Encoders上的静态方法来指定Java编码器。

List<String> data = Arrays.asList("abc", "abc", "xyz");
Dataset<String> ds = context.createDataset(data, Encoders.STRING());

编码器可以组合成元组:

Encoder<Tuple2<Integer, String>> encoder2 = Encoders.tuple(Encoders.INT(), Encoders.STRING());
List<Tuple2<Integer, String>> data2 = Arrays.asList(new scala.Tuple2(1, "a");
Dataset<Tuple2<Integer, String>> ds2 = context.createDataset(data2, encoder2);

或者通过Encoders#bean从Java Beans构建:

Encoders.bean(MyClass.class);

将DataFrame转换为Dataset会有性能影响吗?具体而言,当我将编码器附加到一个DataFrame时会发生什么?编码器是“懒惰”的(即除非调用类型操作,否则它不会执行任何操作)还是必须先处理DataFrame? - Marsellus Wallace

6
如果您想在Java中将通用的DF转换为Dataset,可以像下面这样使用RowEncoder类:
DataFrame df = sql.read().json(sc.parallelize(ImmutableList.of(
            "{\"id\": 0, \"phoneNumber\": 109, \"zip\": \"94102\"}"
    )));

    Dataset<Row> dataset = df.as(RowEncoder$.MODULE$.apply(df.schema()));

在Scala中可以这样写: df.as[Foo](encoder)``` - Painy James
问题是DataFrame类在Spark 2中已经消失了! - Thomas Decaux
它目前无法正常工作。Dataset<Row> dataset = df.as(RowEncoder$.MODULE$.apply(df.schema())); 出现了一个“不兼容类型”的错误。 - Vamshavardhan Reddy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接