如何在Spark Java中遍历/迭代数据集?

8

我正在尝试遍历数据集以执行一些字符串相似度计算,如Jaro Winkler或余弦相似度。我将我的数据集转换为行列表,然后使用for语句遍历,但这并不是Spark中高效的方式。因此,我期待在Spark中找到更好的方法。

public class sample {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Example").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

        List<Row> data = Arrays.asList(RowFactory.create("Mysore","Mysuru"),
                RowFactory.create("Name","FirstName"));
        StructType schema = new StructType(
                new StructField[] { new StructField("Word1", DataTypes.StringType, true, Metadata.empty()),
                        new StructField("Word2", DataTypes.StringType, true, Metadata.empty()) });

        Dataset<Row> oldDF = spark.createDataFrame(data, schema);
        oldDF.show();
        List<Row> rowslist = oldDF.collectAsList(); 
    }
}

我发现很多JavaRDD的例子并不清楚,一个Dataset的例子会对我很有帮助。


当我对我的数据集执行 oldDF.collectAsList(); 时,我收到了 IllegalArgumentException。有人知道可能的原因吗? - Dhirendra Gautam
2个回答

28

您可以按照以下方式使用org.apache.spark.api.java.function.ForeachFunction

oldDF.foreach((ForeachFunction<Row>) row -> System.out.println(row));

谢谢你的回答。有没有办法同时访问前一个元素和当前元素?我想比较上一行和当前行中的一列,并做出决定。提前致谢! - Saisumanth Gopisetty

3
对于不支持Lambda表达式的旧版Java JDK,您可以在导入后使用以下代码:

import org.apache.spark.api.java.function.VoidFunction;

yourDataSet.toJavaRDD().foreach(new VoidFunction<Row>() {
        public void call(Row r) throws Exception {
            System.out.println(r.getAs("your column name here"));
        }
    });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接