Java Spark:GroupBy时出现堆栈溢出错误

4
我正在使用Java的Spark 2.3.1版本。
我有一个数据集,想要根据给定的列列表对其进行分组以进行一些聚合(例如count())。分组必须按照给定的列列表进行。
我的函数如下:
public Dataset<Row> compute(Dataset<Row> data, List<String> columns){

    final List<Column> columns_col = new ArrayList<Column>();

    for (final String tag : columns) {
        columns_col.add(new Column(tag));
    }

    Seq<Column> columns_seq = JavaConverters.asScalaIteratorConverter(columns_col.iterator()).asScala().toSeq();

    System.out.println("My columns : "+columns_seq.mkString(", "));
    System.out.println("Data count : "+data.count());

    final Dataset<Row> dataset_count = data.groupBy(columns_seq).agg(count(col("value")));

    System.out.println("Result count : "+dataset_count.count()); 

    return dataset_count;
}       

当我这样调用它时:

Dataset<Row> df = compute(MyDataset, Arrays.asList("field1","field2","field3","field4"));

我在 dataset_count.count() 上遇到了 StackOverflowError:
My columns : field1, field2, field3, field4
Data count : 136821
Exception in thread "main" java.lang.StackOverflowError
    at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
    at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
    at scala.collection.immutable.Stream.drop(Stream.scala:858)
    at scala.collection.immutable.Stream.drop(Stream.scala:202)
    at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64)
    at scala.collection.immutable.Stream.apply(Stream.scala:202)
    ...

但是如果我在我的函数中替换了这一行
final Dataset<Row> dataset_count = data.groupBy(columns_seq).agg(count(col("value")));

通过

final Dataset<Row> dataset_count = data.groupBy("field1","field2","field3","field4").agg(count(col("value")));

我的程序没有错误,并且计算得很好:

My columns : field1, field2, field3, field4
Data count : 136821
Result count : 74698

这个问题可能出现在哪里?有没有办法根据一列未知的列名来对数据集进行分组?

1
尝试使用以下代码替换: "Seq<Column> columns_seq = JavaConversions.asScalaBuffer(columns_col).seq()" - Abdennacer Lachiheb
@AbdennacerLachiheb,你真的从我数小时甚至数天的挫败中拯救了我。请考虑将您的评论作为答案 - 这就是解决方案。 - PALEN
@PALEN 很高兴它对你有用,我将其添加为答案,这可能会帮助其他人。 - Abdennacer Lachiheb
2个回答

2
尝试使用这个替代方法:
Seq<Column> columns_seq = JavaConversions.asScalaBuffer(columns_col).seq();

0
替换
JavaConverters.asScalaIteratorConverter(columns_col.iterator()).asScala().toSeq();

for:

JavaConversions.asScalaBuffer(columns_col).seq()

对我来说已经完成了(完全测试过)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接