"tee" Scala流/迭代器

4
我有一个表示为简单迭代器(或流)的顺序数据源。 数据非常庞大,无法放入内存中。此外,该源只能遍历一次,获取数据的成本很高。 这个源在某些重要过程(黑盒子)中被使用,该过程以Iterator(或Stream)作为其参数来线性消耗数据。 好的,很简单。但是如果我有两个不同的这样的消费过程怎么办?如我所说,我不想像List一样将输入数据读入集合中。我还可以通过从其非常开始重新读取源两次来完成我的任务,但我不喜欢这种方法,因为它并不有效。 事实上,我需要“tee”(一种克隆)Iterator(或Stream)来两次并行处理单个源,而不将其缓存到内存集合中。我认为这种方法应该对兄弟姐妹们进行反压或限制速度,如果他们太快地消耗源流,则应采用有效的解决方案,可能具有某种并行安全队列缓冲区。 是否有人知道如何在Scala(或使用任何外部流库/框架)上执行此操作?

PS:我发现了一个四年前类似的问题:一个上游流向多个下游流的问题。不同之处在于我询问如何使用标准的Scala迭代器(或流),或者更好地使用一些现有的库来执行它。

1个回答

0

你应该看一下fs2 streams。这个例子使用恒定的内存从一个文件读取并增量地写入另一个文件。以下是如何修改他们的示例以写入两个文件:

...

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
  .through(text.utf8Decode)
  .through(text.lines)
  .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
  .map(line => fahrenheitToCelsius(line.toDouble).toString)
  .intersperse("\n")
  .through(text.utf8Encode)
  .observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
  .through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))

...

不错,看起来我需要它!:-) 注意,我已经有Iterator[MyStructure]。你能给我一个示例,如何提供两个函数,这两个函数以Iterator或Stream作为参数 - 而不是两个文件(希望不太复杂)? 是否可以将简单的Scala Streams转换为fs2 streams,并进行还原? - Michael Shestero
有很多种方法可以创建fs2流,其中许多方法在指南中有概述。如果其他方法都失败了,您始终可以通过unfold构建一个流。 - codenoodle
尝试中: import fs2._ import cats.effect.{IO, Sync} val st2 = Stream.fromIterator(it0) // 从我的源迭代器[NginxRec]创建fs2流 // 先尝试的临时消费者(只打印前两条记录): def sink1: Sink[IO, NginxRec] = _.take(2).evalMap(nr => IO.delay(println(nr.datetime))) def sink2: Sink[IO, NginxRec] = _.take(2).evalMap(nr => IO.delay(println(nr.datetime))) st2.observe(sink1).observe(sink2) // 我这里对吗?我在Stream.fromIterator处遇到了错误:Error: diverging implicit expansion for type cats.effect.Sync[F] ... - Michael Shestero
我在这里找到了类似于我想要的东西:https://fs2.io/concurrency-primitives.html#single-publisher--multiple-subscriber(部分“单发布者/多订阅者”)。但是我仍然很难看出这如何解决我的任务。 - Michael Shestero
你能否发布另一个问题,并附上你正在处理的代码示例? - codenoodle
大家好。 我发明了自行车,想分享我的解决方案。但不幸的是,由于“过去发布了许多低质量答案”,我被StackOverflow禁止回答问题。我真的不记得以前在这里发布过任何答案...因为“自动封禁永远不会过期或超时”,对不起我的朋友们! :-( - Michael Shestero

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接