Clojure异步/Go如何挂起阻塞代码

3

我使用一些Java库来进行非异步的get和post请求。我过去会将这样的请求包装成Future,这解决了我的“等待问题”(即等待响应)。

(defn unchangeable-lib-request [n]
  (Thread/sleep 1000)
  n)

(defn process [n]
  (let [res (atom [])]
    (dotimes [i n]
      (future (swap! res conj (unchangeable-lib-request i))))
    (loop []
      (if (> n (count @res))
        (recur)
        @res))))

(time (process 9))

;; "Elapsed time: 1000.639079 msecs"
;; => [8 7 5 6 4 3 2 1 0]

但是我需要创建数百个请求,这会导致性能问题。我了解了core.async和go块。但如果我将使用这个库中的go块,它不会解决“等待问题”。

(defn unchangeable-lib-request [n]
  (Thread/sleep 1000)
  n)

(defn process [n]
  (let [c (async/chan 10)]
    (dotimes [i n]
      (async/go
        (async/>! c (unchangeable-lib-request i))))
  (loop [result []]
    (if (> n (count result))
      (recur (conj result (async/<!! c)))
      result))))

(time (process 9))

;; "Elapsed time: 2001.770183 msecs"
;; => [0 4 1 6 7 2 5 3 8]

Go块只能同时处理8个请求。是否有可能编写一些异步包装器,将go-block停放并提供异步进行100多个请求的能力,而不会相互阻塞?

(defn process [n]
  (let [c (async/chan 10)]
    (dotimes [i n]
      (async/go
        (async/>! c (magic-async-parking-wrapper
                      (unchangeable-lib-request i))))
  (loop [result []]
    (if (> n (count result))
      (recur (conj result (async/<!! c)))
      result))))

(time (process 9))

;; "Elapsed time: 1003.2563 msecs"

我了解异步/线程,但似乎这与(future ...)相同。

这可能吗?


你读过这个了吗? - akond
你指的是什么?我在那里没有找到任何有用的东西。 你的意思是使用(Thread/sleep _)不是一个好主意?所以,这就是关键。我已经有一个阻塞操作,无法更改它。而且我需要对它做些什么。 - Defake
2个回答

2
我建议:
  • 使用 futures 来创建线程,并让它们从任何 go 块之外使用 put! 将结果放回到核心 async 通道中,类似于:(future (put! chan (worker-function)))
  • 然后使用一个 go 块来等待(单个)通道,并在获取结果时将其放入。

嗯,为什么要用put!而不是>!!呢?我并不完全理解这两者之间的区别,但我认为put!的重点是允许您在通道没有缓冲空间时进行更高级的错误处理。既然你只想等待空间,我本来期望使用>!! - amalloy
我的第一次实现和你的答案有什么区别?如果我想要同时发起100个请求,程序将会创建100个future,并等待它们自己的请求响应。这不是异步的。我有什么地方理解不了吗? - Defake
@amalloy,区别在于如果通道c没有空间,>!!将阻塞线程并等待机会将值放入c,但put!不会阻塞线程,并且将在有机会时异步地放置一个值。但是,是的,在这种情况下没有区别。 - Defake
“put!” 可以在“go”块之外使用,而箭头函数则不能。 - tarmes
@tarmes >!<! 箭头函数无法在 go 块之外使用。但是 >!!<!! 可以使用,但会阻塞线程。 - Defake

1
这是你使用 clojure.core.async/pipeline-blocking 的地方。
(require '[clojure.core.async :as a :refer [chan pipeline-blocking]])


(let [output-chan (chan 100)
      input-chan (chan 1000)]
  (pipeline-blocking 4 ; parallelism knob
                     output-chan 
                     (map unchangeable-lib-request) 
                     input-chan)
  ;; Consume results from output-chan, put operations on input-chan
  [output-chan input-chan]
  )

这将产生 n 个线程(在本例中为 4),它们忙于执行“unchangeable-lib-request”。使用“output-chan”的缓冲区大小来微调您想要提前发生多少请求。使用“input-chan”的缓冲区大小来微调您想要安排多少请求,而不进行反向传播(阻塞的“input-chan”)。

1
太好了!很棒的解决方案,谢谢!但是使用这种方法和使用 futures 之间有性能差异吗?据我所知,将创建 N 个线程(=futures),来处理阻塞操作。只使用 1-2 个线程并停放等待阻塞请求是不可能的吗? - Defake
@Defake 不确定我是否理解了你的问题,但是你可以将“4”更改为“2”或“1”,然后只会有2个线程或1个线程。 - Leon Grapenthin
1
如果你想的话,可以添加一个回传通道。将请求放入 input-chan 中,像这样:{:request-data "whatever-the-java-thing-needs", :response (promise-chan)}。然后让 unchangeable-lib-request 通过 >!! 将响应放在 :response 上。接着执行 (go (let [resp (promise-chan)] (>! input-chan {:request-data ..., :response resp}) (<! resp))。这个块会一直等待结果计算完成。确保为 output-chan 使用 dropping-buffer - Leon Grapenthin
你的解决方案会创建与正在处理的请求数量相同的线程(不完全是这样,但就这么说吧)。我的问题是:这种解决方案在性能上是否等同于单线程异步解决方案?我只知道JVM会在每个现有线程上花费一定的内存和CPU资源。因此,如果你创建了很多线程,程序将会出现延迟。我错了吗? - Defake
1
示例程序运行4个线程,不能多也不能少。一旦关闭input-chan,它们就会被关闭。这在JVM方面是完全可以的。有问题的是每个请求一个线程,因为它们会相互阻塞。 - Leon Grapenthin
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接