我有一个表示为简单迭代器(或流)的顺序数据源。 数据非常庞大,无法放入内存中。此外,该源只能遍历一次,获取数据的成本很高。
这个源在某些重要过程(黑盒子)中被使用,该过程以Iterator(或Stream)作为其参数来线性消耗数据。
好的,很简单。但是如果我有两个不同的这样的消费过程怎么办?如我所说,我不想像List一样将输入数据读入集合中。我还可以通过从其非常开始重新读取源两次来完成我的任务,但我不喜欢这种方法,因为它并不有效。
事实上,我需要“tee”(一种克隆)Iterator(或Stream)来两次并行处理单个源,而不将其缓存到内存集合中。我认为这种方法应该对兄弟姐妹们进行反压或限制速度,如果他们太快地消耗源流,则应采用有效的解决方案,可能具有某种并行安全队列缓冲区。
是否有人知道如何在Scala(或使用任何外部流库/框架)上执行此操作?
PS:我发现了一个四年前类似的问题:一个上游流向多个下游流的问题。不同之处在于我询问如何使用标准的Scala迭代器(或流),或者更好地使用一些现有的库来执行它。