流编程:订阅者和发布者如何跟踪计数?

11

我看到一篇关于Java9中新的与Flow相关的接口的文章。以下是其中的示例代码:

public class MySubscriber<T> implements Subscriber<T> {  
  private Subscription subscription;    
  @Override  
  public void onSubscribe(Subscription subscription) {  
    this.subscription = subscription;  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
  }        
  @Override  
  public void onNext(T item) {  
    System.out.println("Got : " + item);  
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  
   }  

正如您所见,onNext()请求推送一个新项目。现在我想知道:
  • 如果onSubscribe()请求了5个项目
  • 并且在第一个项目传递后,像上面那样调用request(1)
服务器现在是否需要发送:
  • 五个项目(请求5个,发送1个,请求1个)
  • 还是一个项目(因为该新请求使先前的请求“被丢弃”)
换句话说:当多次调用request()时,这些数字会相加吗?还是先前的请求被“丢弃”了?
这导致了问题标题 - 订阅者是否需要跟踪接收到的项目,以避免在某些时候请求“太多”的项目。

6
似乎会增加:[将给定数量 n 的项目添加到当前未满足的此订阅的需求中]. - Sotirios Delimanolis
@SotiriosDelimanolis 我认为你应该写更多的回答,单独的评论已经非常有帮助了。我在其他地方表达了我的感激之情;-) - GhostCat
2个回答

7
正如Sotirios所指出的,request方法的Javadoc(重点是我的)声明:将给定数量n的项目添加到此订阅的当前未满足的需求中。如果n小于或等于零,则订阅者将收到一个带有IllegalArgumentException参数的onError信号。否则,订阅者将收到多达n个附加的onNext调用(如果终止,则少于n个)。
因此,答案显然是“是的,订阅者需要跟踪项目”。实际上,这就是该机制的全部意义。一些背景信息:request方法旨在允许订阅者应用反压力,通知上游组件它已经超载并且“需要休息”。因此,仔细审核何时以及要请求多少新项目是订阅者(而且只有订阅者)的任务。在这条线上,它不能“重新考虑”并降低接收的项目数。
降低数量也会使发布者和订阅者之间的通信“非单调”,因为完全请求的项目数可能会突然减少(因为它只能增加)。这不仅在抽象意义上很烦人,而且还会带来具体的一致性问题:当订阅者突然将请求的项目数量降至1时,发布者可能正在传递一些项目-现在该怎么办?

4
尽管Java 9没有实现Reactive Streams API,但它提供了几乎相同的API并定义了相应的行为。在响应流规范中,以下内容定义了添加操作:对于1.1中的发布者:发布者发送给订阅者的onNext信号总数必须始终小于或等于订阅者订阅时请求的元素总数。对于3.8中的订阅者:只要订阅未被取消,Subscription.request(long n)就必须向相应的订阅者注册给定数量的附加元素。因此,Java遵循了在Reactive Streams API中给出的规范。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接