如何使用Java API在Apache Spark Dataset中按降序排序?

3
我正在使用Spark Session读取文件,然后将单词拆分并计算其出现次数。我需要按降序显示数据。
SparkSession sparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example")
            .config("spark.master", "local")
            .getOrCreate();

JavaRDD<Word> textFile = sparkSession
            .read()
            .textFile("/Users/myname/Documents/README.txt")
            .javaRDD()
            .flatMap(s -> Arrays.asList(s.split("[\\s.]")).iterator())
            .map(w -> {
                Word word = new Word();
                word.setWord(w.replace(",", ""));
                return word;
            });

    Dataset<Row> df = sparkSession.createDataFrame(textFile, Word.class);
    df.groupBy("word").count().orderBy(org.apache.spark.sql.functions.col("count").desc()).show();

当我使用org.apache.spark.sql.functions.col("count")时,它可以正常工作,但无法按照https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/functions.html#desc(java.lang.String)中定义的方式进行排序。 df.sort(asc("dept"), desc("age")) 同时如何在Spark SQL中按列降序排序?也没有起作用。我猜这是针对scala的。Java中的等效方法是什么?
3个回答

12

在Java中,您需要以这种方式导入包:

import static org.apache.spark.sql.functions.*

2

您的代码应该按照Spark Java文档的要求工作。您没有发布导入语句。如果您没有import functions,那么desc()asc()函数就在functions类下面。 因此,您需要使用org.apache.spark.sql.functionsasc("dept"), org.apache.spark.sql.functionsdesc("age")

或者import org.apache.spark.sql.functions.*


0

我使用Spark 2.4.0

  1. 将下一个键设置为false:spark.kryo.registrationRequired

或者

  1. 添加到kryo:


    kryo.register(org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder[].class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.SortOrder.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.BoundReference.class);
            kryo.register(org.apache.spark.sql.catalyst.trees.Origin.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.NullsFirst$.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.Descending$.class);
            kryo.register(org.apache.spark.sql.catalyst.expressions.NullsLast$.class);

     kryo.register(Class.forName("scala.math.Ordering$$anon$4"));
                kryo.register(Class.forName("scala.reflect.ClassTag$$anon$1"));
                kryo.register(Class.forName("org.apache.spark.sql.catalyst.InternalRow$$anonfun$getAccessor$8"));
                kryo.register(Class.forName("org.apache.spark.sql.catalyst.expressions.Ascending$"));


private static SparkSession session;

    public static void main(String[] args) {
        /* DUMMY DATA creation */
        List<Person> personsList = Arrays.asList(
            new Person(1, "courriel_1@gmail.com", "nom_1"),
            new Person(2, "courriel_2@gmail.com", "nom_2"), 
            new Person(3, "courriel_3@gmail.com", "nom_3"),
            new Person(4, "courriel_4@gmail.com", "nom_4")
        );

        List<Profession> professionList = Arrays.asList(
            new Profession(1, 2, "profession_4"),
            new Profession(2, 1, "profession_2"), 
            new Profession(3, 1, "profession_5"),
            new Profession(4, 2, "profession_2"), 
            new Profession(5, 2, "profession_5"),
            new Profession(6, 3, "profession_7"), 
            new Profession(7, 3, "profession_2"),
            new Profession(8, 4, "profession_2"), 
            new Profession(9, 4, "profession_7")
        );

        // SparkAppConfiguration.load(args);
        // LaunchArgsEncoder launchArgs = SparkAppConfiguration.getLaunchArgs();

        // Initialisation de la session
        session = SparkUtils.initSession("test jointure");

        /* Convert from Java list to Spark Dataset */
        Dataset<Row> rowPerson = session.createDataFrame(personsList, Person.class);
        System.out.println("rowPerson.show();");
        rowPerson.show();

        Dataset<Row> personRenamed = rowPerson.withColumnRenamed("id", "personId");
        System.out.println("personRenamed.show();");
        personRenamed.show();

        Dataset<Row> rowProfession = session.createDataFrame(professionList, Profession.class);
        System.out.println("rowProfession.show();");
        rowProfession.show();

        Dataset<Row> professionRenamed = rowProfession.withColumnRenamed("personId", "personFk");
        System.out.println("professionRenamed.show();");
        professionRenamed.show();


        /* INNER JOIN IN Spark Java */
        Dataset<Row> innerJoinData = personRenamed.join(professionRenamed,
                    personRenamed.col("personId").equalTo(professionRenamed.col("personFk")));

        System.out.println("innerJoinData.show();");
        innerJoinData.show();

        Dataset<Jointure> joinResult = innerJoinData.select("personId", "nom", "courriel", "id", "profession")
                                                        .orderBy(org.apache.spark.sql.functions.col("personId").asc()) 
                                                        .as(Encoders.bean(Jointure.class));
        System.out.println("joinResult.show();");
        joinResult.show();
        System.out.println("joinResult.printSchema();");
        joinResult.printSchema();

        System.exit(0);

 }


 public class Person implements Serializable{

    /**
     * 
     */
    private static final long serialVersionUID = 7327130742162877288L;
    private long personId;
    private String nom;
    private String prenom;
    private String courriel;
    private String profession;
    private String ville;

    public Person(long personId, String nom, String prenom, String courriel, String profession, String ville) {
        super();
        this.personId = personId;
        this.nom = nom;
        this.prenom = prenom;
        this.courriel = courriel;
        this.profession = profession;
        this.ville = ville;
    }

    public Person() {
        super();
    }
   //getter and setter
  }

   public class Profession implements Serializable {

/**
 * 
 */
private static final long serialVersionUID = 7845266779357094461L;

private long id;
private long personId;
private String profession;

public Profession(long id, long personId, String profession) {
    super();
    this.id = id;
    this.personId = personId;
    this.profession = profession;
}

public Profession() {
    super();
}
    //getter and setter
   }

   public class Jointure implements Serializable {


    /**
     * 
     */
    private static final long serialVersionUID = 4341834876589947018L;

    private long id; 
    private String nom;
    private String prenom;
    private String courriel; 
    private String profession;

    public Jointure(long id, String nom, String prenom, String courriel,   String profession) {
        super();
        this.id = id;
        this.nom = nom;
        this.prenom = prenom;
        this.courriel = courriel;
        this.profession = profession;
    }

    public Jointure() {
        super();
    }
    //getter and setter
   }

    rowPerson.show();
    +--------------------+---+-----+
    |            courriel| id|  nom|
    +--------------------+---+-----+
    |courriel_1@gmail.com|  1|nom_1|
    |courriel_2@gmail.com|  2|nom_2|
    |courriel_3@gmail.com|  3|nom_3|
    |courriel_4@gmail.com|  4|nom_4|
    +--------------------+---+-----+

    personRenamed.show();
    +--------------------+--------+-----+
    |            courriel|personId|  nom|
    +--------------------+--------+-----+
    |courriel_1@gmail.com|       1|nom_1|
    |courriel_2@gmail.com|       2|nom_2|
    |courriel_3@gmail.com|       3|nom_3|
    |courriel_4@gmail.com|       4|nom_4|
    +--------------------+--------+-----+

    rowProfession.show();
    +---+--------+------------+
    | id|personId|  profession|
    +---+--------+------------+
    |  1|       2|profession_4|
    |  2|       1|profession_2|
    |  3|       1|profession_5|
    |  4|       2|profession_2|
    |  5|       2|profession_5|
    |  6|       3|profession_7|
    |  7|       3|profession_2|
    |  8|       4|profession_2|
    |  9|       4|profession_7|
    +---+--------+------------+

    professionRenamed.show();
    +---+--------+------------+
    | id|personFk|  profession|
    +---+--------+------------+
    |  1|       2|profession_4|
    |  2|       1|profession_2|
    |  3|       1|profession_5|
    |  4|       2|profession_2|
    |  5|       2|profession_5|
    |  6|       3|profession_7|
    |  7|       3|profession_2|
    |  8|       4|profession_2|
    |  9|       4|profession_7|
    +---+--------+------------+

    innerJoinData.show();
    +--------------------+--------+-----+---+--------+------------+
    |            courriel|personId|  nom| id|personFk|  profession|
    +--------------------+--------+-----+---+--------+------------+
    |courriel_2@gmail.com|       2|nom_2|  1|       2|profession_4|
    |courriel_1@gmail.com|       1|nom_1|  2|       1|profession_2|
    |courriel_1@gmail.com|       1|nom_1|  3|       1|profession_5|
    |courriel_2@gmail.com|       2|nom_2|  4|       2|profession_2|
    |courriel_2@gmail.com|       2|nom_2|  5|       2|profession_5|
    |courriel_3@gmail.com|       3|nom_3|  6|       3|profession_7|
    |courriel_3@gmail.com|       3|nom_3|  7|       3|profession_2|
    |courriel_4@gmail.com|       4|nom_4|  8|       4|profession_2|
    |courriel_4@gmail.com|       4|nom_4|  9|       4|profession_7|
    +--------------------+--------+-----+---+--------+------------+

    joinResult.show();
    +--------+-----+--------------------+---+------------+
    |personId|  nom|            courriel| id|  profession|
    +--------+-----+--------------------+---+------------+
    |       1|nom_1|courriel_1@gmail.com|  3|profession_5|
    |       1|nom_1|courriel_1@gmail.com|  2|profession_2|
    |       2|nom_2|courriel_2@gmail.com|  4|profession_2|
    |       2|nom_2|courriel_2@gmail.com|  5|profession_5|
    |       2|nom_2|courriel_2@gmail.com|  1|profession_4|
    |       3|nom_3|courriel_3@gmail.com|  7|profession_2|
    |       3|nom_3|courriel_3@gmail.com|  6|profession_7|
    |       4|nom_4|courriel_4@gmail.com|  8|profession_2|
    |       4|nom_4|courriel_4@gmail.com|  9|profession_7|
    +--------+-----+--------------------+---+------------+

    joinResult.printSchema();
    root
     |-- personId: long (nullable = false)
     |-- nom: string (nullable = true)
     |-- courriel: string (nullable = true)
     |-- id: long (nullable = false)
     |-- profession: string (nullable = true)





网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接