使用Clojure core.async管道处理错误

7

我正在尝试了解使用core.async/pipeline处理错误的正确方法,我的流程如下:

input     --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out

xf-run-computation将执行Http调用并返回响应。然而,其中一些响应会返回错误。如何处理这些错误是最好的方法?我的解决方案是将输出通道拆分为success-valueserror-values,然后将它们合并回一个通道中:

(let [[success-values1 error-values1] (split fn-to-split first-out)
      [success-values2 error-values2] (split fn-to-split last-out)
      errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out  xf-run-computation success-values1)
[last-out errors])

因此,我的函数将返回最后的结果和错误。

谢谢,那是一个错误,应该是:errors (merge [error-values error-values2] - Michel Uncini
你的问题还有点混淆。你把所有的 first-out 都放入到第二阶段处理中,包括成功和不成功的结果。难道第二阶段的输入应该只有 success-values1 吗? - Alan Thompson
我将 first-out 设置为输出通道,因此每当 xf-run-computation 发布某些结果时,谓词 fn-to-split 将被评估并将结果发布到 success-values1error-values1 - Michel Uncini
2个回答

7
一般来说,“正确”的方式可能取决于您的应用需求,但鉴于您的问题描述,我认为您需要考虑三件事情:
1. `xf-run-computation` 返回业务逻辑视为错误的数据, 2. `xf-run-computation` 抛出异常,以及 3. 鉴于涉及 HTTP 调用,某些运行 `xf-run-computation` 的操作可能永远不会完成(或在时间上无法完成)。
关于第三点,您应该首先考虑使用 `pipeline-blocking` 而不是 `pipeline`。
我认为您的问题主要与第一点有关。基本思路是,`xf-run-computation` 的结果需要返回一个数据结构(例如 map 或记录),其中清楚地标记了结果是错误还是成功,例如 `{:title nil:body nil:status“error”}`。这将为您提供一些处理此情况的选项:
1. 所有后续代码都简单地忽略具有 `:status“error”` 的输入数据。即,您的 `xf-run-computation` 将包含类似 `(when (not (= (:status input) "error")) (run-computation input))` 的代码。 2. 您可以对 `pipeline` 调用和 `filter` 之间的所有结果运行过滤器,并根据需要对它们进行过滤(请注意,`filter` 也可以在管道中用作转换器,从而消除了 core.async 的旧 `filter>` 和 `filter<` 函数)。 3. 您可以像您建议的/Alan Thompson 在他的回答中所示的那样使用 `async/split` 来将错误值过滤到单独的错误通道。如果您打算合并这些值,那么您的第二个管道没有真正需要第二个错误通道,您可以简单地重用您的错误通道。
对于第二点,问题在于 `xf-run-computation` 中的任何异常都发生在另一个线程中,并且不会简单地传播回调用代码。但是,您可以利用 `pipeline`(和 `pipeline-blocking`)的 `ex-handler` 参数。您可以过滤掉所有异常,将结果放在单独的异常通道上,或者尝试捕获它们并将它们转换为错误(可能将它们放回结果或另一个错误通道),后者仅在异常提供足够信息时才有意义,例如允许将异常与导致异常的输入相关联的 ID 或其他内容。您可以在 `xf-run-computation` 中安排此操作(即从第三方库(如 HTTP 调用)捕获抛出的任何异常)。
对于第三点,在core.async中的标准答案是指向一个timeout通道,但这与pipeline关系不大。更好的做法是确保在您的http调用中设置超时时间,例如http-kit的:timeout选项或clj-http的:socket-timeout:conn-timeout选项。请注意,这些选项通常会在超时时引发异常。

说实话,我没有任何应用需求,只是想找出最好的方法来传播错误-指示失败。我将尝试使用映射/记录而不是拆分通道,另外关于第2点和第3点,肯定需要添加!谢谢! - Michel Uncini
我将补充第三点 - 您可以为任何网络故障添加重试机制(Diehard?)。在尝试了一些重试后,它会将异常抛出到您的管道异常处理程序。 - danfromisrael

2
这里有一个例子可以实现你所建议的功能。从(range 10)开始,它首先过滤掉5的倍数,然后再过滤掉3的倍数。
(ns tst.clj.core
  (:use clj.core
        clojure.test )
  (:require
    [clojure.core.async :as async]
    [clojure.string :as str]
  )
)

(defn err-3 [x]
  "'fail' for multiples of 3"
  (if (zero? (mod x 3))
    (+ x 300)       ; error case
    x))             ; non-error

(defn err-5 [x]
  "'fail' for multiples of 5"
  (if (zero? (mod x 5))
    (+ x 500)       ; error case
    x))             ; non-error

(defn is-ok?
  "Returns true if the value is not 'in error' (>=100)"
  [x]
  (< x 100))

(def ch-0  (async/to-chan (range 10)))
(def ch-1  (async/chan 99))
(def ch-2  (async/chan 99))

(deftest t-2
  (let [
        _                         (async/pipeline 1 ch-1 (map err-5) ch-0)
        [ok-chan-1 fail-chan-1]   (async/split is-ok? ch-1 99 99)
        _                         (async/pipeline 1 ch-2 (map err-3) ok-chan-1)
        [ok-chan-2 fail-chan-2]   (async/split is-ok? ch-2 99 99)

        ok-vec-2                  (async/<!! (async/into [] ok-chan-2))
        fail-vec-1                (async/<!! (async/into [] fail-chan-1))
        fail-vec-2                (async/<!! (async/into [] fail-chan-2))
  ]
    (is (= ok-vec-2 [1 2 4 7 8]))
    (is (= fail-vec-1 [500 505]))
    (is (= fail-vec-2 [303 306 309]))))

我会在检测到错误后立即将其记录下来,而不是返回错误信息,然后忘记它们。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接