58得票1回答
Spring Web-Flux中的反压机制

我是一个Spring Web-Flux的初学者。我编写了一个控制器,如下所示: @RestController public class FirstController { @GetMapping("/first") public Mono<String> ge...

30得票5回答
使用BlockingCollection<T>: OperationCanceledException,有更好的方法吗?

我正在使用(非常优秀的)BlockingCollection&lt;T&gt; 类型来开发一个高度多线程、高性能的应用程序。每个“批次”都将通过标记取消令牌来结束,这会导致任何等待的 Take 调用都会抛出异常。虽然这很好,但我更希望返回值或输出参数来表示它,因为a) 异常有明显的开销,b) ...

28得票3回答
在RxJava中,“backpressure”一词是什么意思?

我是RxJava的初学者,对"backpressure"的含义很好奇。 这是否意味着生产者在消费者背后施加压力? 还是指消费者对生产者施加压力?(相反的方向)

18得票3回答
在Node.js中,从可写流中暂停已管道化的可读流的正确方法是什么?

我正在编写一个模块,它是一个可写流。我想为我的用户实现管道接口。 如果发生错误,我需要暂停可读流并发出错误事件。然后,用户将决定 - 如果他可以处理这个错误,他应该能够恢复数据处理。var writeable = new BackPressureStream(); writeable.on(...

15得票2回答
Kafka中的反压机制

我在Kafka中遇到这样一个情况,生产者发布消息的速度比消费者的消费速度高得多。我需要为后续的消费和处理实现Kafka的反向压力机制。 请告诉我如何在Spark和普通Java API中实现。

13得票1回答
Node.JS无限并发/ TCP流反压

据我理解,Node的事件驱动IO模型的一个后果是,在您连接了接收事件处理程序(或以其他方式开始监听数据)后,无法告诉正在通过TCP套接字接收数据的Node进程阻塞。如果接收方无法快速处理传入的数据,则可能会产生“无限并发”,Node在底层会继续尽可能快地从套接字读取数据,调度新的数据事件到事件...

11得票2回答
Spark结构化流处理如何处理背压?

我正在分析Spark结构化流的背压特性。有人知道详情吗?是否可以通过代码调整处理传入记录的速率?谢谢。

9得票1回答
当mapcat在core.async中破坏了反压时,内存泄漏在哪里?

我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并以错误失败。似乎在core.async管道中使用mapcat会破坏反压(这是不幸的,原因超出了本问题的范围)。 以下是一些演示问题的代码,通过计算“:x”进入和离开“mapcat”转换器来说明问题:...

8得票5回答
RxJs:zip运算符的有损形式

考虑使用zip运算符将两个无限的Observables压缩在一起,其中一个以双倍频率发出项。 当前实现是无损的,即如果我让这些Observables发出一个小时,然后在它们的发射速率之间切换,第一个Observable最终会赶上另一个Observable。 这将导致内存爆炸,因为缓冲区变得越来...

7得票1回答
流处理中的背压是什么意思?

我开始学习NodeJS,流似乎是人们经常使用的东西。在我阅读的大部分文档中都提到了“背压问题”,这个问题在处理大文件时会出现,但我还没有找到一个清晰的解释来说明这个问题究竟是什么。我也读到过使用管道可以帮助解决这个问题,但是管道到底如何解决背压问题呢? 提前感谢任何解释。