何时使用非阻塞/线程和阻塞/ goroutines 与clojure core.async相关?

10

我正在编写一个ETL进程,从产品数据库中读取事件级别的数据,进行转换/聚合,并将其写入分析数据仓库。我使用Clojure的core.async库将这些进程分开为并发执行的组件。下面是我现在代码的主要部分:

    (ns data-staging.main
        (:require [clojure.core.async :as async])
        (:use [clojure.core.match :only (match)]
              [data-staging.map-vecs]
              [data-staging.tables])
        (:gen-class))

    (def submissions (make-table "Submission" "Valid"))
    (def photos (make-table "Photo"))
    (def videos (make-table "Video"))
    (def votes (make-table "Votes"))

    ;; define channels used for sequential data processing
    (def chan-in (async/chan 100))
    (def chan-out (async/chan 100))

    (defn write-thread [table]
        "infinitely loops between reading subsequent 10000 rows from 
         table and ouputting a vector of the rows(maps) 
         into 'chan-in'"
        (while true
            (let [next-rows (get-rows table)]
                (async/>!! chan-in next-rows)
                (set-max table (:max-id (last next-rows))))))

    (defn aggregator []
        "takes output from 'chan-in' and aggregates it by coupon_id, date.
         then adds / drops any fields that are needed / not needed and inputs
         into 'chan-out'"
        (while true
            (->>
                (async/<!! chan-in)
                aggregate
                (async/>!! chan-out))))

    (defn read-thread []
        "reads data from chan out and interts into Analytics DB" 
        (while true 
            (upsert (async/<!! chan-out))))

    (defn -main []
        (async/thread (write-thread submissions))
        (async/thread (write-thread photos))
        (async/thread (write-thread videos))
        (async/thread-call aggregator)
        (async/thread-call read-thread))

正如您所看到的,我将每个操作系统组件放在自己的线程上,并在通道上使用阻塞 !! 调用。对于这种情况,感觉使用非阻塞 ! 调用以及 Go 协程可能更好,特别是对于数据库读取,它们大部分时间都在执行 I/O 并等待产品数据库中的新行。如果是这种情况,那么实现最佳方法是什么?我对两种方法之间的所有权衡和如何有效地使用 Go 协程还有一些不清楚。此外,任何关于如何改进整体架构的建议都将不胜感激!


如果这个问题难以回答,还请有人指出我如何编辑它以使其更清晰。此外,如果有人知道一个好的Clojure go routines在线教程,我也会很感激的。 - Sean Geoffrey Pietz
2个回答

17

我个人认为你在这里使用线程可能是正确的选择。go-blocks 的非阻塞特性来自于“parking”,这是 core.async 状态机使用的一种特殊的伪阻塞方式 —— 但由于你的数据库调用真正地阻塞,而不是将状态机置于停放状态,因此你只需要从 core.async 线程池中阻塞某个线程。这取决于同步调用需要花费多长时间,因此这是一种需要基准测试的情况,但我强烈怀疑在这里使用线程是正确的方法。

唯一的例外是您的聚合器函数。在我看来,它似乎可以与 chan-out 的定义合并,如 (def chan-out (map< aggregate chan-in))

关于 go-blocks 和线程的概述,Martin Trojer 写了一篇很好的 对这两种方法的分析,以及哪种情况下哪种方法更快。简要概括一下,go-blocks 适用于将已经异步的库适配到 core.async 中使用,而线程则适用于将同步部分转换为异步过程。例如,如果你的数据库有一个回调式的 API,那么 go-blocks 就是一个明显的胜利者。但由于它是同步的,所以不适合使用。


非常好的回答!感谢分享,我在这个网站上学到了很多。 - tangrammer

3

我认为在这个ETL情况下使用“go”宏来实现非阻塞线程会是一种更好的方法。

我编写了一个非常简单的代码,以实现所涉及的提取转换和加载任务的同步序列。

在repl上键入以下代码:

(require '[clojure.core.async :as async :refer [<! >! <!! timeout chan alt! go]])

(def output(chan))

(defn extract [origin]
  (let [value-extracted (chan)
        value-transformed (chan)
        value-loaded (chan)]
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! value-extracted  (str origin " > extracted  ")))
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! value-transformed  (str (<! value-extracted) " > transformed " )))
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! value-loaded  (str (<! value-transformed) " > loaded " )))
    (go
     (<! (timeout (+ 100 (* 100 (rand-int 20))))) ; wait a little
     (>! output  [origin (<! value-loaded)]))))

(go
 (loop [origins-already-loaded []]
   (let [[id message] (<! output)
         origins-updated (conj origins-already-loaded id)]
     (println message)
     (println origins-updated)
     (recur origins-updated)
     )
   ))

在repl上键入:

(doseq [example (take 10 (range))] (extract example))

1 > extracted   > transformed  > loaded 
[1]
7 > extracted   > transformed  > loaded 
[1 7]
0 > extracted   > transformed  > loaded 
[1 7 0]
8 > extracted   > transformed  > loaded 
[1 7 0 8]
3 > extracted   > transformed  > loaded 
[1 7 0 8 3]
6 > extracted   > transformed  > loaded 
[1 7 0 8 3 6]
2 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2]
5 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2 5]
9 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2 5 9]
4 > extracted   > transformed  > loaded 
[1 7 0 8 3 6 2 5 9 4]

更新:
修复的错误是在已删除的“等一会”函数中使用了 <!! (timeout (+ 100 (* 100 (rand-int 20))))),导致其他非阻塞go进程被阻止。


1
好的,我会试一下。这种方法有什么优势吗?我还不太明白使用 goroutines 和线程在这种情况下的优缺点。 - Sean Geoffrey Pietz
再次你好 @SeanGeoffreyPietz ,关于您的评论,我发现了代码中的错误并进行了修复。不使用阻塞线程的原因是为了提高性能。通过"go"宏,您可以拥有数千个独立进程(伪线程)http://swannodette.github.io/2013/08/02/100000-processes/ ,在上面提供的脚本中,您还可以(doseq [example (take 1000 (range))] (extract example))来观察其行为。 - tangrammer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接