如何在Clojure中执行同步并行函数?

3
我有一个应用程序,其初始流程可以并行完成:
  1. 获取两个JSON文档(我使用clj-http来完成)
  2. 解析这些文档(仅提取所需数据)
  3. 合并结果
  4. 将它们转储到一个文件中
因此,大致如下:
some-entry-point
      /\
     /  \
    /    \
   /      \
fetchA   fetchB
  |         |
  |         |
parseA   parseB
  \        /
   \      /
    \    /
     \  /
      \/
     join
      |
      |
     dump

如何正确且最新地实现这一目标?

我所发现的方法有:

1个回答

4

既然你在这里只有两个分支,最好使用future函数将并行作业分派到单独的线程中。future将返回一个未来对象(一个特殊的承诺,当作业完成时将自动解决)。

以下是它的样子:

(defn some-entry-point
  [obja objb]
  (let [pa (-> (fetchA obja)
               parseA
               future)
        pb (-> (fetchB objb)
               parseB
               future)]
    (-> (join @pa @pb)
        dump)))

@deref 函数的快捷方式(读取宏)。

当然,您可以在主线程中执行其中一个分支,仅创建一个 future 对象:

(defn some-entry-point
  [obja objb]
  (let [p (-> (fetchB objb)
              parseB
              future)]
    (-> (fetchA obja)
        parseA
        (join @p)
        dump)))

你可以推广这种方法,使用一些通用的fetchparse函数来获取和解析多个对象,使用pmap进行并行处理:

(defn some-entry-point
  "Fetch and parse multiple objects in parallel"
  [objs]
  (->>  objs
        (pmap (comp parse fetch))
        (apply join)
        dump))

好的,看起来它正在工作,但是为什么我需要(shutdown-agents)让我的程序立即退出?我在这里描述了一个问题:https://dev59.com/bYXca4cB1Zd3GeqPGlGN - Kamil Lelonek
1
@squixy 我在我的Ubuntu 14.04.2 LTS上用Clojure 1.6.0Leiningen 2.5.1 on Java 1.8.0_45 Java HotSpot(TM) 64-Bit Server VM测试了一下,没有使用(shutdown-agents)一切都正常。看起来是某个平台/JVM特定的问题。尝试更新你的JVM。如果不行,那么你将不得不显式地调用(shutdown-agents),虽然这对我来说似乎不是一个大问题。 - Leonid Beschastny
lein -v => Leiningen 2.5.1在Java 1.8.0_40 Java HotSpot(TM) 64位服务器虚拟机上运行,所以我不知道为什么会这样。 - Kamil Lelonek
1
更正一下:future 不会返回一个 promise,它会返回一个 future。请参考 future? 谓词。 - noisesmith
我以为他们已经改变了这个,但Clojure agents使用非守护线程(在这里:http://clojure.org/agents 搜索“daemon”)。如果存在非守护线程运行,JVM将不会关闭。 - BillRobertson42
@Bill,关于它有一个问题(http://dev.clojure.org/jira/browse/CLJ-124),不过由于某种原因,并不是所有系统都受到影响。我的JVM能正常关闭。 - Leonid Beschastny

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接