在Spark的groupByKey和countByKey中使用JodaTime

5

我有一个非常简单的 Spark 程序(使用 Clojure 中的 Flambo,但应该很容易跟进)。这些都是JVM上的对象。我正在一个 local 实例上测试(尽管我猜想 Spark 仍然进行序列化和反序列化)。

(let [dt (t/date-time 2014)
      input (f/parallelize sc [{:the-date dt :x "A"}
                               {:the-date dt :x "B"}
                               {:the-date dt :x "C"}
                               {:the-date dt :x "D"}])
      by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))

输入是一个由四个元组组成的RDD,每个元组都有相同的日期对象。第一个map生成一个键值对RDD,其中键为日期,值为x。

input的内容如预期所示:

=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]

需要明确的是,相等性和 .hashCode 适用于日期对象:

=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926

他们是JodaTime的DateTime实例,实现了预期的相等性比较
当我尝试使用countByKey时,得到了预期的结果:
=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}

但是当我使用groupByKey时,它似乎不起作用。
=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]

所有的键都是相同的,因此我期望结果是一个具有日期为键和值为["A", "B", "C", "D"]的单个条目。由于所有的值都是列表,所以发生了一些事情。
某种方式上,groupByKey没有正确地将键进行比较。但是countByKey可以做到。这两者之间有什么区别?我该如何使它们行为相同?
有任何想法吗?
1个回答

3
我正在接近一个答案。我认为这应该放在答案部分而不是问题部分。
按键分组,转换为本地收集,提取第一项(日期)。
=> (def result-dates (map first (f/collect (f/group-by-key by-date))))
=> result-dates
(#<DateTime 2014-01-01T00:00:00.000Z>
 #<DateTime 2014-01-01T00:00:00.000Z>
 #<DateTime 2014-01-01T00:00:00.000Z>
 #<DateTime 2014-01-01T00:00:00.000Z>)

哈希码都是相同的

=> (map #(.hashCode %) result-dates)
(1260848926
 1260848926
 1260848926 
 1260848926)

毫秒数都是相同的:
=> (map #(.getMillis %) result-dates)
(1388534400000
 1388534400000
 1388534400000
 1388534400000)

equals 失败了,但 isEquals 成功了

=> (.isEqual (first result-dates) (second result-dates))
true

=> (.equals (first result-dates) (second result-dates))
false

.equals文档中写道:

基于毫秒级别的瞬间和年代,将此对象与指定对象进行相等比较

它们的毫秒数都相等,年代似乎是:

=> (map #(.getChronology %) result-dates)
(#<ISOChronology ISOChronology[UTC]>
 #<ISOChronology ISOChronology[UTC]>
 #<ISOChronology ISOChronology[UTC]>
 #<ISOChronology ISOChronology[UTC]>)

然而,这些时间表并不等同。
=> (def a (first result-dates))
=> (def b (second result-dates))

=> (= (.getChronology a) (.getChronology b))
false

虽然哈希码会发生变化,但是
=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b)))
true

但是joda.time.Chronology没有提供自己的equals方法,而是继承了Object类的equals方法,该方法仅使用引用相等性。

我的理论是这些日期都被反序列化为它们自己的不同的构造的Chronology对象,但JodaTime有自己的序列化器可能会处理这个问题。也许一个自定义的Kryo序列化器可以帮助解决这个问题。

目前,我在Spark中使用JodaTime的解决方案是通过调用toInstant使用org.joda.time.Instantjava.util.Date而不是org.joda.time.DateTime

这两种方法都涉及到抛弃时区信息,这并不理想,所以如果有更多的信息,将非常受欢迎!


也许你可以使用毫秒级别的时间戳来代替日期/时间对象。看起来这是更安全的选择。我们在使用基于内存位置的其他哈希键入数据时遇到了问题,比如Java枚举类型。它们在分布式环境中无法正常工作。 - maasg
谢谢,这就是我(认为我)用Instant建议的内容。很高兴知道我不是唯一有这个问题的人! - Joe
你是否在RDD中使用了异构的年代和时区混合?如果没有,我建议将该信息保留在RDD级别,并节省每个记录的内存占用(就像你使用“Instant”一样)。 - Mike Park
这些都在同一个时区,即使是相同的原始日期对象,也会出现这种行为。我可以使用Instant(或者甚至是一组数字)来处理,但这仍然很奇怪。 - Joe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接