在Clojure中,我如何使用transducers实现`frequencies`的高性能版本?

6
(问题来源:Fernando Abrao。)
我听说过Clojure中使用转换器的性能优势,但我不知道该如何使用它们。
假设我有一个名为qos/device-qos-range的函数,它返回一系列的映射,其中包含一些十进制的:samplevalue,例如:
[
  { :samplevalue 1.3, ... },
  { :othervalue -27.7, ... },
  { :samplevalue 7.5, ... },
  { :samplevalue 1.9, ... },
]

我想看到每个整数箱中有多少:samplevalue,例如:

(frequencies
  (reduce #(if (not (nil? (:samplevalue %2)))
             (conj %1 (.intValue (:samplevalue %2))))
          []
          (qos/device-qos-range origem device qos alvo inicio fim)))

;; => {1 2, 7 1}

如何使用转换器将此内容转换为快速版本,以消除中间数据结构(例如reduce返回的结构)?如果能编写可利用多个核心进行并行处理的代码,则可获得额外加分。

1个回答

7

(答案来源: Renzo Borgatti (@reborg).)

首先,让我们设置一些示例数据,稍后我们将用于性能测试。这个向量包含500k个具有相同键的映射。值重叠了1/5的时间。

(def data 
 (mapv hash-map 
       (repeat :samplevalue) 
       (concat (range 1e5)
               (range 1e5)
               (range 1e5)
               (range 1e5)
               (range 1e5))))

现在让我们使用转换器进行转换。请注意,此解决方案不是并行的。我将您的.intValue缩短为int,它们具有相同的功能。另外,条件地从每个映射中获取:samplevalue可以缩短为(keep :samplevalue sequence),这等效于(remove nil? (map :samplevalue sequence))。我们将使用Criterium进行基准测试。
(require '[criterium.core :refer [quick-bench]])
(quick-bench
  (transduce
    (comp
      (keep :samplevalue)
      (map int))
    (completing #(assoc! %1 %2 (inc (get %1 %2 0))) persistent!)
    (transient {})
    data))
;; My execution time mean: 405 ms

请注意,这次我们不再将frequencies作为外部步骤调用。相反,我们将其编入操作中。就像frequencies所做的一样,我们在临时哈希图上执行了操作以获得额外的性能。我们通过使用瞬态哈希图作为种子,并在其中调用persistent!来完成最终值来实现这一点。
我们可以使其并行化。为了达到最佳性能,我们使用可变的Java ConcurrentHashMap而不是不变的Clojure数据结构。
(require '[clojure.core.reducers :as r])
(import '[java.util HashMap Collections Map]
        'java.util.concurrent.atomic.AtomicInteger
        'java.util.concurrent.ConcurrentHashMap)

(quick-bench
  (let [concurrency-level (.availableProcessors (Runtime/getRuntime))
        m (ConcurrentHashMap. (quot (count data) 2) 0.75 concurrency-level)
        combinef (fn ([] m) ([_ _]))  ; just return `m` from the combine step
        rf (fn [^Map m k]
             (let [^AtomicInteger v (or (.get m k) (.putIfAbsent m k (AtomicInteger. 1)))]
               (when v (.incrementAndGet v))
               m))
        reducef ((comp (keep :samplevalue) (map int)) rf)]
    (r/fold combinef reducef data)
    (into {} m)))
;; My execution time mean: 70 ms

在这里,我们使用 clojure.core.reducers 库中的 fold 来实现并行。请注意,在并行上下文中使用的任何 transducers 都需要是无状态的。还要注意,ConcurrentHashMap 不支持将 nil 用作键或值;幸运的是,我们不需要在这里这样做。
输出最终被转换为一个不可变的 Clojure hashmap。您可以删除该步骤,并只使用 ConcurrentHashMap 实例以获得额外的加速 - 在我的机器上,删除 into 步骤使整个 fold 花费约 26ms。 编辑 2017-11-20: 用户 @clojuremostly 正确指出,此答案的早期版本在初始化并发哈希映射实例的 let 块内调用了 quick-bench,这意味着基准测试对其所有运行都使用相同的实例。我将调用 quick-bench 移到 let 块之外。它并没有显著影响结果。

1
我认为你不应该在第二个基准测试中重复使用ConcurrentHashMap。 - ClojureMostly
1
@ClojureMostly - 很好的发现,谢谢!已更新答案,请参见最后一段。时间没有显著变化。 - Jeff Terrell Ph.D.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接