我正在编写一个ETL进程,从产品数据库中读取事件级别的数据,进行转换/聚合,并将其写入分析数据仓库。我使用Clojure的core.async库将这些进程分开为并发执行的组件。下面是我现在代码的主要部分:
(ns data-staging.main
(:require [clojure.core.async :as async])
(:use [clojure.core.match :only (match)]
[data-staging.map-vecs]
[data-staging.tables])
(:gen-class))
(def submissions (make-table "Submission" "Valid"))
(def photos (make-table "Photo"))
(def videos (make-table "Video"))
(def votes (make-table "Votes"))
;; define channels used for sequential data processing
(def chan-in (async/chan 100))
(def chan-out (async/chan 100))
(defn write-thread [table]
"infinitely loops between reading subsequent 10000 rows from
table and ouputting a vector of the rows(maps)
into 'chan-in'"
(while true
(let [next-rows (get-rows table)]
(async/>!! chan-in next-rows)
(set-max table (:max-id (last next-rows))))))
(defn aggregator []
"takes output from 'chan-in' and aggregates it by coupon_id, date.
then adds / drops any fields that are needed / not needed and inputs
into 'chan-out'"
(while true
(->>
(async/<!! chan-in)
aggregate
(async/>!! chan-out))))
(defn read-thread []
"reads data from chan out and interts into Analytics DB"
(while true
(upsert (async/<!! chan-out))))
(defn -main []
(async/thread (write-thread submissions))
(async/thread (write-thread photos))
(async/thread (write-thread videos))
(async/thread-call aggregator)
(async/thread-call read-thread))
正如您所看到的,我将每个操作系统组件放在自己的线程上,并在通道上使用阻塞 !! 调用。对于这种情况,感觉使用非阻塞 ! 调用以及 Go 协程可能更好,特别是对于数据库读取,它们大部分时间都在执行 I/O 并等待产品数据库中的新行。如果是这种情况,那么实现最佳方法是什么?我对两种方法之间的所有权衡和如何有效地使用 Go 协程还有一些不清楚。此外,任何关于如何改进整体架构的建议都将不胜感激!