7得票1回答
BackpressureStrategy.BUFFER和onBackpressureBuffer操作符在RxJava 2中有什么区别?

我是reactive programming的新手,正在尝试使用rxjava 2创建一个简单的反压感知消息处理。 以下是我想要实现的工作流程: 持续字符串流的Flowable。 执行费时操作并将消息更改为另一个字符串。 执行另一个费时操作。 现在我正在使用以下代码: { F...

11得票2回答
Spark结构化流处理如何处理背压?

我正在分析Spark结构化流的背压特性。有人知道详情吗?是否可以通过代码调整处理传入记录的速率?谢谢。

13得票1回答
Node.JS无限并发/ TCP流反压

据我理解,Node的事件驱动IO模型的一个后果是,在您连接了接收事件处理程序(或以其他方式开始监听数据)后,无法告诉正在通过TCP套接字接收数据的Node进程阻塞。如果接收方无法快速处理传入的数据,则可能会产生“无限并发”,Node在底层会继续尽可能快地从套接字读取数据,调度新的数据事件到事件...

9得票1回答
当mapcat在core.async中破坏了反压时,内存泄漏在哪里?

我在Clojure中编写了一些core.async代码,当我运行它时,它消耗了所有可用的内存并以错误失败。似乎在core.async管道中使用mapcat会破坏反压(这是不幸的,原因超出了本问题的范围)。 以下是一些演示问题的代码,通过计算“:x”进入和离开“mapcat”转换器来说明问题:...

30得票5回答
使用BlockingCollection<T>: OperationCanceledException,有更好的方法吗?

我正在使用(非常优秀的)BlockingCollection&lt;T&gt; 类型来开发一个高度多线程、高性能的应用程序。每个“批次”都将通过标记取消令牌来结束,这会导致任何等待的 Take 调用都会抛出异常。虽然这很好,但我更希望返回值或输出参数来表示它,因为a) 异常有明显的开销,b) ...

7得票1回答
Spark Streaming中的背压属性是如何工作的?

我有一个CustomReceiver用于接收单个事件(String)。在Spark应用程序运行时,使用接收到的单个事件从nosql读取数据并应用转换。当每个批次的处理时间大于批处理间隔时,我设置了以下属性: spark.streaming.backpressure.enabled=true ...

58得票1回答
Spring Web-Flux中的反压机制

我是一个Spring Web-Flux的初学者。我编写了一个控制器,如下所示: @RestController public class FirstController { @GetMapping("/first") public Mono&lt;String&gt; ge...

7得票1回答
流处理中的背压是什么意思?

我开始学习NodeJS,流似乎是人们经常使用的东西。在我阅读的大部分文档中都提到了“背压问题”,这个问题在处理大文件时会出现,但我还没有找到一个清晰的解释来说明这个问题究竟是什么。我也读到过使用管道可以帮助解决这个问题,但是管道到底如何解决背压问题呢? 提前感谢任何解释。

18得票3回答
在Node.js中,从可写流中暂停已管道化的可读流的正确方法是什么?

我正在编写一个模块,它是一个可写流。我想为我的用户实现管道接口。 如果发生错误,我需要暂停可读流并发出错误事件。然后,用户将决定 - 如果他可以处理这个错误,他应该能够恢复数据处理。var writeable = new BackPressureStream(); writeable.on(...

8得票5回答
RxJs:zip运算符的有损形式

考虑使用zip运算符将两个无限的Observables压缩在一起,其中一个以双倍频率发出项。 当前实现是无损的,即如果我让这些Observables发出一个小时,然后在它们的发射速率之间切换,第一个Observable最终会赶上另一个Observable。 这将导致内存爆炸,因为缓冲区变得越来...