Apache Spark Lambda表达式 - 序列化问题

7

我尝试在Spark任务中使用lambda表达式,但是它抛出了“java.lang.IllegalArgumentException:无效的lambda反序列化”异常。当代码类似于“transform(pRDD->pRDD.map(t->t._2))”时,会抛出此异常。以下是代码片段。

JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map( 
(Function<Tuple2<String,Integer>,Integer>)t->t._2));


JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
JavaDStream<Integer> con = aggregate.transform(
(Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>> & Serializable)pRDD-> pRDD.map( 
(Function<Tuple2<String,Integer>,Integer> & Serializable)t->t._2));

上述两个选项都没有起作用。但是,如果我将下面的对象“f”作为参数传递,而不是lambda表达式“t->t_.2”,它就能正常工作。
Function f = new Function<Tuple2<String,Integer>,Integer>(){
@Override
public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
return paramT1._2;
}
}; 

请问Lambda表达式的正确格式是什么?

    public static void main(String[] args) {

            Function f = new Function<Tuple2<String,Integer>,Integer>(){

                @Override
                public Integer call(Tuple2<String,Integer> paramT1) throws Exception {
                    return paramT1._2;
                }

            };

            JavaStreamingContext ssc = JavaStreamingFactory.getInstance();

            JavaReceiverInputDStream<String> lines = ssc.socketTextStream("localhost", 9999);
            JavaDStream<String> words =  lines.flatMap(s->{return Arrays.asList(s.split(" "));});
            JavaPairDStream<String,Integer> pairRDD =  words.mapToPair(x->new Tuple2<String,Integer>(x,1));
            JavaPairDStream<String,Integer> aggregate = pairRDD.reduceByKey((x,y)->x+y);
            JavaDStream<Integer> con = aggregate.transform(
                    (Function<JavaPairRDD<String,Integer>, JavaRDD<Integer>>)pRDD-> pRDD.map( 
                            (Function<Tuple2<String,Integer>,Integer>)t->t._2));
          //JavaDStream<Integer> con = aggregate.transform(pRDD-> pRDD.map(f)); It works
            con.print();

            ssc.start();
            ssc.awaitTermination();


        }

我可以问一下,为什么你希望像内部类一样序列化 lambda 表达式呢?lambda 表达式的序列化被强烈不建议。 - eliasah
@eliasah 在Spark中,对lambda表达式进行序列化是非常标准和预期的操作。它是一个并行执行引擎,将作业序列化以远程运行。 - whaleberg
4个回答

3

我不知道为什么Lambda不起作用。也许问题出现在嵌套了Lambda的Lambda中。这似乎已经被Spark文档所认可。

将此与http://spark.apache.org/docs/latest/programming-guide.html#basics中的示例进行对比:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

以下是来自http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation的示例:

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
final JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(
  new Function<JavaPairRDD<String, Integer>, JavaPairRDD<String, Integer>>() {
    @Override public JavaPairRDD<String, Integer> call(JavaPairRDD<String, Integer> rdd) throws Exception {
      rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
      ...
    }
  });

第二个示例使用了一个Function子类而不是lambda表达式,这可能是因为您发现的同样问题。
我不知道这对您是否有用,但是嵌套的lambda表达式在Scala中肯定可以工作。考虑上一个示例的Scala版本:
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
})

很明显这不是一个完整的答案,抱歉。如果它没有用处,请随意给它点踩! - Daniel Darabos
感谢您的回复。是的,在Apache Spark中,嵌套的lambda表达式无法被识别。 - user1182253
请务必记住,匿名类不是静态类。它们具有指向其父对象的指针,该对象也将由Spark序列化,这可能不是您想要的。使用命名的静态内部类可能更安全。 - whaleberg

0

我认为问题在于Java中的lambda函数实际上是一个“类”,它在java.util.function包内实现了一个接口,例如Function接口(https://docs.oracle.com/javase/8/docs/api/java/util/function/Function.html)。我发现这些接口没有扩展Serializable...这就是问题所在...

...当您在Spark函数内部使用lambda时... Spark会尝试序列化lambda“类”...但它没有实现Serializable。

您可以尝试使用以下内容强制进行Serializable:

Runnable r = (Runnable & Serializable)() -> System.out.println("Serializable!");

0

我之前也遇到过类似的问题,而我解决这个问题的方法是简单地创建一个SerializableFunction,如下所示:

import java.io.Serializable;
import java.util.function.Function;

interface SerializableFunction<T, R> extends Function<T, R>, Serializable {
}

把你所有的Function都替换成SerializableFunction

private static final SerializableFunction<Row, Boolean> SAMPLE_FUNCTION = row -> {
    final String userId = row.getAs("user_id");
    return userId != null;
};

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接