Clojure消息处理/异步、多线程

5

我有一个小的Clojure消费者/发布者,通过RabbitMQ接收消息、处理它们并将其发送到其他消费者。

我定义了一个消息处理程序,它在一个单独的线程(与主线程分离)中处理消息。
如下面的代码所示,该线程同步接收和发送消息,在由函数启动的事件循环中发生。

那么问题是,什么是“Clojure方式”来创建一个N大小的此类同步消息处理程序的线程池呢?我想非Clojure方式可能是通过Java互操作手动生成一些线程。

此外,考虑到处理不是非常CPU密集型,这样做是否会加速消息处理过程?再次考虑到发布时间比处理时间更长,将这些消息处理程序设置为异步是否更好?

最后,我该如何衡量这些竞争方法的性能(我来自Ruby / JavaScript世界,在那里没有多线程)?

注意:我知道所有这些都可以通过水平扩展和生成更多监听消息总线的JVM进程来避免,但由于应用程序将部署在Heroku上,我希望尽可能地利用每个dyno/process的资源。

(defn message-handler
  [ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch "e.events" "" processed-message)))

(defn -main
  [& args]
  (let [conn          (rmq/connect {:uri (System/getenv "MSGQ")})
        ch            (lch/open conn)
        q-name        "q.events.tagger"
        e-sub-name    "e.events.preproc"
        e-pub-name    "e.events"
        routing-key   "tasks.taggify"]
    (lq/declare ch q-name :exclusive false :auto-delete false)
    (le/declare ch e-pub-name "fanout" :durable false)
    (lq/bind ch q-name e-sub-name :routing-key routing-key)
    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name message-handler :auto-ack true))))))

更基础的问题是...我该如何重构这段代码以支持使用额外参数注册消息处理程序回调函数,就像这样:
    (.start (Thread. (fn []
                       (lcm/subscribe ch q-name (message-handler pub-name) :auto-ack true))))))

然后带有一个引用发布:

    (lb/publish ch pub-name "" processed-message)))

使用替代文本而不是字面量:

    (lb/publish ch "e.events" "" processed-message)))
2个回答

2

对于第二个问题,您可以使用部分应用程序,如下所示:

(defn message-handler
  [pub-name ch metadata ^bytes payload]
  (let [msg (json/parse-string (String. payload "UTF-8"))
        processed-message (process msg)] 
    (lb/publish ch pub-name "" processed-message)))



(.start 
  (Thread. 
     (fn []
       (lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))

1
这是一个非常广泛的话题,你可能需要将这个问题分成几个不同的问题来考虑,但简洁的答案是:使用代理

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接