Spark CombineByKey的Java Lambda表达式

5

我希望使用lambda函数来按键计算一个(JavaPairRDD<Integer, Double> pairs)的平均值。为此,我编写了以下代码:

java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc = x -> new Tuple2<Double, Integer>(x, 1);

BiFunction<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>  addAndCount = (Tuple2<Double, Integer> x, Double y) -> {  return new Tuple2(x._1()+y, x._2()+1 );   };

BiFunction<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>  combine = (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> {  return new Tuple2(x._1()+y._1(), x._2()+y._2() );   };

JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = pairs.combineByKey(createAcc, addAndCount, combine);

然而,eclipse显示出以下的错误:
The method combineByKey(Function<Double,C>, Function2<C,Double,C>, Function2<C,C,C>) in the type JavaPairRDD<Integer,Double> is not applicable for the arguments (Function<Double,Tuple2<Double,Integer>>,
 BiFunction<Tuple2<Double,Integer>,Double,Tuple2<Double,Integer>>, BiFunction<Tuple2<Double,Integer>,Tuple2<Double,Integer>,Tuple2<Double,Integer>>) 

1
尝试将java.util.function.BiFunction替换为org.apache.spark.api.java.function.Function2。 - G Quintana
谢谢!这解决了问题。 - Wassim
2个回答

5

combineByKey方法需要org.apache.spark.api.java.function.Function2而不是java.util.function.BiFunction。所以你可以这样写:

java.util.function.Function<Double, Tuple2<Double, Integer>> createAcc =
    x -> new Tuple2<Double, Integer>(x, 1);

Function2<Tuple2<Double, Integer>, Double, Tuple2<Double, Integer>>  addAndCount = 
    (Tuple2<Double, Integer> x, Double y) -> {  return new Tuple2(x._1()+y, x._2()+1 );   };

Function2<Tuple2<Double, Integer>, Tuple2<Double, Integer>, Tuple2<Double, Integer>>  combine = 
    (Tuple2<Double, Integer> x, Tuple2<Double, Integer> y) -> {  return new Tuple2(x._1()+y._1(), x._2()+y._2() );   };

JavaPairRDD<Integer, Tuple2<Double, Integer>> avgCounts = 
    pairs.combineByKey(createAcc, addAndCount, combine);

请更新第一个,应该是org.apache.spark.api.java.function.Function。 - Wassim
你只是把所有数字加起来然后计数吗?你在哪里计算平均值,在哪里除以计数? - Peter

1
解释在内联注释中
List<List<Integer>> intList = new ArrayList<>(); 
intList.add(Arrays.asList(1,2));
intList.add(Arrays.asList(5,6));
intList.add(Arrays.asList(3,8));
intList.add(Arrays.asList(5,7));
intList.add(Arrays.asList(3,4));
System.out.println(intList);  //[[1, 2], [5, 6], [3, 8], [5, 7], [3, 4]]

JavaRDD<List<Integer>> intRdd = jsc.parallelize(intList);

JavaPairRDD<Integer, Tuple2<Integer,Integer>> key_ValueSumsAndEncounters = intPairRdd.combineByKey(
            /* Lambda 1 argument: a combiner for newly encountered key
               - When key k is first time encountered, combiner returns 
                 tuple with content: (value associated k, 1).
               - Second element in the tuple is 1, since we have encountered first time.
               - For example, when we encounter (k,v) = (1,4), combiner will return (1,(4,1))
            */ 
            v -> new Tuple2<Integer, Integer>(v, 1)
            /* Lambda 2 argument: a combiner for combining value for subsequent encounters (2nd and afterwards) of key
               - When key k is encountered for 2nd (or more) time, combiner returns
                 tuple with content: (earlier addition result from combiner + value associated with k 
                                 , earlier number of encounters from combiner + 1)
               - For example, when we encounter (1,6) and we have earlier combiner result (1,(4,1)), this combiner 
                 will return (1,(4+6,1+1)) = (1,(10,2))
            */
            , (c1, v) -> new Tuple2<Integer,Integer>(c1._1 + v, c1._2 + 1)
            /* Lambda 3 argument: a combiner for combining two combiners across different partitions 
               - Combiner returns tuple with content:
                 (addition result from combiner1 + addition result from combiner2
                 , number of encounters from combiner1 + number of encounters from combiner2)
               - For example, if we have 
                 combiner1 from partition1 = (1,(10,2))
                 combiner2 from partition2 = (1,(15,4))
                 then this combiner will return (1,(10+15,2+4)) = (1,(25,6))
            */
            , (c1, c2) -> new Tuple2<Integer,Integer>(c1._1 + c2._1, c1._2 + c2._2)
           );

System.out.println(key_ValueSumsAndEncounters.collect()); //[(1,(2,1)), (3,(12,2)), (5,(13,2))]

//kse for (key,(sum,encounters))
JavaRDD<Tuple2<Integer, Double>> key_avg = key_ValueSumsAndEncounters.map(kse -> new Tuple2<Integer, Double>(kse._1, ((double)kse._2._1/kse._2._2)));
System.out.println(key_avg.collect()); //[(1,2.0), (3,6.0), (5,6.5)]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接