如何在Clojure中分段生成惰性序列?

4
我有一个数据库服务器,从中获取数据。有时数据有数百万行以上,因此我使用惰性下载。我使用clojure.jdbc库的服务器端游标https://funcool.github.io/clojure.jdbc/latest/#cursor-queries来惰性地获取数据。
现在我遇到了一个问题。我需要从惰性序列中生成前500个元素,然后程序必须等待10分钟以获取报告信号,该信号通知程序生成下一个500个元素,依此类推,直到接收到所有来自服务器的数据。但是,如果10分钟内没有收到报告,程序必须关闭连接。
我写了一个示例:
(def lazyseq_maps (atom {:seq_1 {:next_500 false :data nil} :seq_2 {:next_500 false :data nil}})) ; here is a collection of all unfinished lazy sequences waiting for signal to continue produce elements

(jdbc/atomic conn
 (with-open [cursor (jdbc/fetch-lazy conn sql]
   (let [lazyseq (jdbc/cursor->lazyseq cursor)]
     (swap! lazyseq_maps assoc seq_id {:next_500 true :data nil})
     (loop [lazyseq_rest lazyseq
            count 1]
            (if (:next_500 (seq_id @lazyseq_maps))
              (do
                (swap! lazyseq_maps update-in [seq_id :data] conj (first lazyseq_rest))
                (when (= 0 (mod count 500))
                  (swap! lazyseq_maps assoc-in [seq_id :next_500] false))
                (recur (rest lazyseq) (inc count)))
              ;
              (func-for-waiting-signal)))) ; here I don`t know how to create function waiting signal to continue fetching data
   (seq_id @lazyseq_maps)))

你能帮我确定我应该使用哪些Clojure工具来解决我的问题吗?我认为我应该使用core.async来创建循环通道。我是对的吗?如果我收到适当的信号,我应该如何创建停止执行循环10分钟或继续执行的函数?


十分钟对于保持连接开放且未使用来说确实很长,您可能需要考虑使用类似键集分页的东西。 - Mark Rotteveel
1个回答

1
确实,你应该使用core.async来完成这个任务,他们有timeout通道来处理超时,而使用alt!可以从任意数量的通道中"等待"一个值弹出。完整的代码会有点复杂,输出行将被"推送"到一个输出通道中。
据我所知,with-open和惰性序列不能很好地组合,因为游标会过早关闭。但是我不熟悉clojure.jdbc库。

你能解释一下你所说的“with-open和lazy sequences不太好组合”的意思吗?我在这个问题https://stackoverflow.com/questions/47323454/clojure-core-async-how-to-download-lazily-with-go-block-inside-with-open中提出了一个问题。这是关于你之前所说的内容吗? - Verbery

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接