在Spark Java中将JavaRDD转换为DataFrame

12

我正在尝试处理日志文件。首先,我读取日志文件并根据我的要求拆分这些文件,并将每个列保存到单独的JavaRDD中。现在我需要将这些JavaRDD转换为DataFrames以进行未来操作。这是我迄今为止尝试过的代码:

         SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local");
         JavaSparkContext sc = new JavaSparkContext(conf);
         JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt");
         JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0]));
         System.out.println(urlrdd.take(1));
         SQLContext sql = new SQLContext(sc);

这是我尝试将JavaRDD转换为DataFrame的方式:

DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class);

但是上面的代码行不起作用。我对Model.class感到困惑。 有人能给我建议吗? 谢谢。
4个回答

30

导入:

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

为URL创建一个POJO类。我建议你编写日志行,其中包括url、日期、时间、方法、目标等作为成员。
public static class Url implements Serializable {
  private String value;

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}  

从文本文件创建一个Url对象的RDD。
JavaRDD<Url> urlsRDD = spark.read()
  .textFile("/Users/karuturi/Downloads/log.txt")
  .javaRDD()
  .map(new Function<String, Url>() {
    @Override
    public Url call(String line) throws Exception {
      String[] parts = line.split("\\t");
      Url url = new Url();
      url.setValue(parts[0].replaceAll("[", ""));
      return url;
    }
  });

从RDD创建DataFrame。
Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class);

RDD转DataFrame - Spark 2.0
RDD转DataFrame - Spark 1.6


1
如果我想将一个包含SparseVectorJavaRDD进行转换怎么办? - Alberto Bonsanto

6
你可以像这样做(我正在实时从Scala转换,所以请原谅任何拼写错误):
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() {
    @Override
    public Row call(String record) throws Exception {
        return RowFactory.create(record());
    }
}
// now you wish to create the target schema. This is basically a list of
// fields (each field would be a column) which you are adding to a StructType
List<StructField> fields = new ArrayList<>();
StructField field = DataTypes.createStructField("url", DataTypes.StringType, true);
fields.add(field);
StructType schema = DataTypes.createStructType(fields);

// now you can create the dataframe:
DataFrame df= sqlContext.createDataFrame(rowRDD, schema);    

还有几点需要注意:

  • 当你只取第一个元素时,为什么要使用flatmap? 你可以直接这样做:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • 我猜在实际情况下,你可能想删除url中的'['(你可以在map中轻松完成)。

  • 如果你正在转移到spark 2.0或更高版本,则应该使用spark session (spark)而不是sqlContext。

  • 你可以创建一个带有所有列的单个dataframe。你可以通过将所有字段添加到模式中来实现这一点(即不仅仅是对字段进行单个添加)。而不是使用urlrdd,使用diskfile,并在“public Row call”创建内部执行分割。大致如下:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

  • 你可以直接创建它:只需要使用

    sqlContext.read.option("sep","\t").csv.load(filename,schema)


4
只需根据7列表格对数据进行扁平映射,并使用下面的代码片段即可。
String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"};
List<String> tableColumns = Arrays.asList(columns);

StrucType schema = createSchema(tableColumns);

    public StructType createSchema(List<String> tableColumns){

        List<StructField> fields  = new ArrayList<StructField>();
        for(String column : tableColumns){         

                fields.add(DataTypes.createStructField(column, DataTypes.StringType, true));            

        }
        return DataTypes.createStructType(fields);
    }

sqlContext.createDataFrame(urlRDD, schema);

0

3
在Spark Java中不支持"import sqlContext.implicits._"命令。 - ROOT
是的,抱歉刚看到这个消息。你最好的选择是使用sqlContext来读取文件。因为将rdd转换为dataframe需要使用反射,所以为了减少额外的计算,请使用sqlContext来读取文件。 - Akash Sethi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接