使用Apache Spark和Java将CSV解析为DataFrame/DataSet

20

我是Spark的新手,想要使用group-by和reduce从CSV文件中找到以下信息(每个雇员一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想要通过按照Department, Designation, State分组,添加 sum(costToCompany)TotalEmployeeCount 附加列来简化关于 CSV 的内容。

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有使用转换和操作来实现这一点的方法。或者我们应该采用RDD操作?


1
请问您能否将CSV块(输入和结果)组织起来,以清晰地区分标题和每一行?目前不清楚哪一行开始或结束。 - emecas
1
请查看此链接以了解如何使用Spark 2.x +进行操作:https://dev59.com/MF8e5IYBdhLWcg3wyM7N#44889688 - mrsrinivas
4个回答

40

过程

  • 创建一个类(模式)来封装您的结构(如果您使用Java,则不是必需的,但这将使您的代码更易于阅读)

public class Record implements Serializable {
  String department;
  String designation;
  long costToCompany;
  String state;
  // constructor , getters and setters  
}
  • 加载 CVS(JSON) 文件

  • JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

    现在您有两种方法:

    A. SparkSQL

    • 注册一个表(使用您定义的架构类)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • 使用您所需的查询-分组方式查询表格

  • JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • 在这里,您还可以使用SQL方法执行任何其他查询

  • B. Spark

    • 使用复合键进行映射:DepartmentDesignationState

    • JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
      rdd_records.mapToPair(new
        PairFunction<Record, String, Tuple2<Long, Integer>>(){
          public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
            Tuple2<String, Tuple2<Long, Integer>> t2 = 
            new Tuple2<String, Tuple2<Long,Integer>>(
              record.Department + record.Designation + record.State,
              new Tuple2<Long, Integer>(record.costToCompany,1)
            );
            return t2;
      }
      
    • 使用组合键进行reduceByKey操作,对costToCompany列求和,并按键累加记录数。

    • });

      JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
       records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
       Integer>, Tuple2<Long, Integer>>() {
          public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
          Tuple2<Long, Integer> v2) throws Exception {
              return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
          }
      });
      

    请注意,B方法也使用Record类和加载步骤。我不确定错误指的是哪个符号,请包含完整的错误跟踪信息。这也可能是由于您的数据输入文件导致的,您是否更改了输入内容? - emecas
    嗨@emecas,感谢您的出色回答。我正在使用您的代码,但是出现了一些问题,表格为空且没有模式。JavaSchemaRDD表= sqlContext.applySchema(rdd_records, Record.class);当我保存table.saveAsTextFile()时,在part文件的所有行中都打印[]。 - Umesh K
    不要忘记在你的模式类(Record)上填写(构造函数),获取器和设置器部分。请参见@user449355的问答http://stackoverflow.com/a/30103554/833336。 - emecas

    25

    CSV文件可以使用Spark内置的CSV读取器进行解析。成功读取文件后,它将返回DataFrame/DataSet。在DataFrame/DataSet上,您可以轻松地应用类似于SQL的操作。

    使用Java与Spark 2.x(及以上版本)

    创建SparkSession对象,也称为spark

    import org.apache.spark.sql.SparkSession;
    
    SparkSession spark = SparkSession
        .builder()
        .appName("Java Spark SQL Example")
        .getOrCreate();
    

    使用StructType为行创建模式

    import org.apache.spark.sql.types.StructType;
    
    StructType schema = new StructType()
        .add("department", "string")
        .add("designation", "string")
        .add("ctc", "long")
        .add("state", "string");
    

    从CSV文件创建数据框并应用模式
    Dataset<Row> df = spark.read()
        .option("mode", "DROPMALFORMED")
        .schema(schema)
        .csv("hdfs://path/input.csv");
    

    从CSV文件读取数据的更多选项

    现在我们可以用两种方式对数据进行聚合

    1. SQL way

    Register a table in spark sql metastore to perform SQL operation

    df.createOrReplaceTempView("employee");
    

    Run SQL query on registered dataframe

    Dataset<Row> sqlResult = spark.sql(
        "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
            + " FROM employee GROUP BY department, designation, state");
    
    sqlResult.show(); //for testing
    

    We can even execute SQL directly on CSV file with out creating table with Spark SQL


    2. Object chaining or Programming or Java-like way

    Do the necessary import for sql functions

    import static org.apache.spark.sql.functions.count;
    import static org.apache.spark.sql.functions.sum;
    

    Use groupBy and agg on dataframe/dataset to perform count and sum on data

    Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
        .agg(sum("ctc"), count("department"));
    // After Spark 1.6 columns mentioned in group by will be added to result by default
    
    dfResult.show();//for testing
    
    依赖库
    "org.apache.spark" % "spark-core_2.11" % "2.0.0" 
    "org.apache.spark" % "spark-sql_2.11" % "2.0.0"
    

    使用Scala和Spark 2.x读取CSV:https://dev59.com/questions/RF0b5IYBdhLWcg3wA9LI#39533431 - mrsrinivas

    4
    以下内容可能不完全正确,但应该能够让您了解如何处理数据。虽然不太美观,但应该用case类等方式进行替换,但作为如何使用Spark API的快速示例,希望它足够 :)
    val rawlines = sc.textfile("hdfs://.../*.csv")
    case class Employee(dep: String, des: String, cost: Double, state: String)
    val employees = rawlines
      .map(_.split(",") /*or use a proper CSV parser*/
      .map( Employee(row(0), row(1), row(2), row(3) )
    
    # the 1 is the amount of employees (which is obviously 1 per line)
    val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))
    
    val results = keyVals.reduceByKey{ a,b =>
        (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
    }
    
    #debug output
    results.take(100).foreach(println)
    
    results
      .map( keyval => someThingToFormatAsCsvStringOrWhatever )
      .saveAsTextFile("hdfs://.../results")
    

    或者您可以使用SparkSQL:

    val sqlContext = new SQLContext(sparkContext)
    
    # case classes can easily be registered as tables
    employees.registerAsTable("employees")
    
    val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
      from employees 
      group by dep,des,state"""
    

    感谢您的迅速回复。我想要一个按组分组的结果,就像在MySQL中选择employeeTable表中的DeptdesignationstatecostToCompany总和,按Deptdesignationstate分组一样,而不仅仅是一个部门(如销售部门)。 - mithra
    然后简单地跳过筛选步骤。我已经相应地更新了代码。目标是将行转换为键值元素,其中键包含您要分组的标识符,而值包含要减少的值。在这种情况下,我们按部门、职称和州分组,并希望总结员工数量以及成本,因此这些都是值。 - jkgeyti
    谢谢,非常感谢,我会尝试的。你救了我的一天! - mithra

    4

    对于 JSON,如果您的文本文件每行都包含一个 JSON 对象,则可以使用 sqlContext.jsonFile(path) 将其作为 SchemaRDD 加载到 Spark SQL 中(模式将自动推断)。然后,您可以将其注册为表并使用 SQL 进行查询。您还可以手动将文本文件加载为包含每个记录中一个 JSON 对象的 RDD[String],并使用 sqlContext.jsonRDD(rdd) 将其转换为 SchemaRDD。当您需要预处理数据时,jsonRDD 很有用。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接