Spark: 将DataSet中的2列合并为一列

3

我有一个表格,其中包含2个不同列的ID。我还有另一个表格,其中包含与这些ID相关联的对象。我想过滤掉Table 2中的ID,其存在于Table 1的id1或id2中。

表 1:

| id1  | id2 |
|  1   |  1  |
|  1   |  1  |
|  1   |  3  |
|  2   |  5  |
|  3   |  1  | 
|  3   |  2  |
|  3   |  3  |

表2:

| id  | obj   |
|  1  |  'A'  |
|  2  |  'B'  |
|  3  |  'C'  |
|  4  |  'D'  | 
|  5  |  'E'  |  
|  6  |  'F'  |
|  7  |  'G'  |

我的想法是从table1中创建一个包含唯一ID的列表,上面的示例中唯一ID应该是[1, 2, 3, 5]。

然后根据该列表过滤数据框,将得到结果。

| id  | obj   |
|  1  |  'A'  |
|  2  |  'B'  |
|  3  |  'C'  |
|  5  |  'E'  |  

尽管我对解决方案的可扩展性存在顾虑。列表可能很大,有些情况甚至可能无法加载到内存中。在这种情况下有什么建议吗?谢谢。
3个回答

1
使用Spark SQL - 注意 - Spark中的连接操作涉及到一系列性能考虑,包括数据框大小、键分布等,请务必熟悉这些内容。
通常情况下:
table2.as("t2")
  .join(
    table1.as("t1"),
    $"t2.id" === $"t1.id1" || $"t2.id" === $"t1.id2",
    "left"
  )
  .where($"t1.id1".isNull)
  .select("t2.*")

1
另一种方法:

val id_table = table1.select(explode(array('*)).as("id")).distinct()
val result = table2.join(id_table,"id")
result.show()

输出:

+---+---+
| id|obj|
+---+---+
|  1|'A'|
|  2|'B'|
|  3|'C'|
|  5|'E'|
+---+---+

1
下面的方法可以运作。
      import spark.implicits._
      val t1 = Seq((1,1),(1,1),(1,3),(2,5),(3,1),(3,2),(3,3))
      val t2 = Seq((1,"A"),(2,"B"),(3,"C"),(4,"D"),(5,"E"),(6,"F"),(7,"G"))
      val tt1 = sc.parallelize(t1).toDF("id1","id2")
                  .persist(StorageLevel.MEMORY_AND_DISK)
      val tt2 = sc.parallelize(t2).toDF("id", "obj")
                  .persist(StorageLevel.MEMORY_AND_DISK)

      tt1.show()
      tt2.show()

      tt1.createOrReplaceTempView("table1")
      tt2.createOrReplaceTempView("table2")

     val output = sqlContext.sql(
        """
          |SELECT DISTINCT id, obj
          |FROM table1 t1
          |JOIN table2 t2 ON(t1.id1 = t2.id) OR (t1.id2 = id)
          |ORDER BY id
          |""".stripMargin).persist(StorageLevel.MEMORY_AND_DISK)

      output.show()

输出

+---+---+
| id|obj|
+---+---+
|  1|  A|
|  2|  B|
|  3|  C|
|  5|  E|
+---+---+

对于内存问题,你可以将数据持久化到内存和磁盘中,但是还有更多的选项,你可以选择最适合你特定问题的最佳选项,你可以查看以下链接: RDD Persistence

我也会考虑通过配置分区数来解决问题:

spark.sql.shuffle.partitions
/*
Configures the number of partitions to use when shuffling data for joins or aggregations.
*/

  val spark = SparkSession
    .builder()
    .appName("MySparkProcess")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","400") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","MySparkProcess") // To silence Metrics warning
    .getOrCreate()

我建议您查看以下链接以进行进一步的配置:

性能调整


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接