一般来说,“正确”的方式可能取决于您的应用需求,但鉴于您的问题描述,我认为您需要考虑三件事情:
1. `xf-run-computation` 返回业务逻辑视为错误的数据,
2. `xf-run-computation` 抛出异常,以及
3. 鉴于涉及 HTTP 调用,某些运行 `xf-run-computation` 的操作可能永远不会完成(或在时间上无法完成)。
关于第三点,您应该首先考虑使用 `pipeline-blocking` 而不是 `pipeline`。
我认为您的问题主要与第一点有关。基本思路是,`xf-run-computation` 的结果需要返回一个数据结构(例如 map 或记录),其中清楚地标记了结果是错误还是成功,例如 `{:title nil:body nil:status“error”}`。这将为您提供一些处理此情况的选项:
1. 所有后续代码都简单地忽略具有 `:status“error”` 的输入数据。即,您的 `xf-run-computation` 将包含类似 `(when (not (= (:status input) "error")) (run-computation input))` 的代码。
2. 您可以对 `pipeline` 调用和 `filter` 之间的所有结果运行过滤器,并根据需要对它们进行过滤(请注意,`filter` 也可以在管道中用作转换器,从而消除了 core.async 的旧 `filter>` 和 `filter<` 函数)。
3. 您可以像您建议的/Alan Thompson 在他的回答中所示的那样使用 `async/split` 来将错误值过滤到单独的错误通道。如果您打算合并这些值,那么您的第二个管道没有真正需要第二个错误通道,您可以简单地重用您的错误通道。
对于第二点,问题在于 `xf-run-computation` 中的任何异常都发生在另一个线程中,并且不会简单地传播回调用代码。但是,您可以利用 `pipeline`(和 `pipeline-blocking`)的 `ex-handler` 参数。您可以过滤掉所有异常,将结果放在单独的异常通道上,或者尝试捕获它们并将它们转换为错误(可能将它们放回结果或另一个错误通道),后者仅在异常提供足够信息时才有意义,例如允许将异常与导致异常的输入相关联的 ID 或其他内容。您可以在 `xf-run-computation` 中安排此操作(即从第三方库(如 HTTP 调用)捕获抛出的任何异常)。
对于第三点,在core.async中的标准答案是指向一个
timeout
通道,但这与
pipeline
关系不大。更好的做法是确保在您的http调用中设置超时时间,例如http-kit的
:timeout
选项或clj-http的
:socket-timeout
和
:conn-timeout
选项。请注意,这些选项通常会在超时时引发异常。
errors (merge [error-values error-values2]
。 - Michel Uncinifirst-out
都放入到第二阶段处理中,包括成功和不成功的结果。难道第二阶段的输入应该只有success-values1
吗? - Alan Thompsonfirst-out
设置为输出通道,因此每当xf-run-computation
发布某些结果时,谓词fn-to-split
将被评估并将结果发布到success-values1
或error-values1
。 - Michel Uncini