使用RxJava限制吞吐量

6

我现在遇到的案例很难解释,因此我将写一个更简单的版本来解释这个问题。

我有一个通过Observable.from()发出文件序列的方法,这些文件由文件ArrayList定义。所有这些文件都应该上传到服务器上。为此,我有一个执行任务并返回Observable的函数。

Observable<Response> uploadFile(File file);

当我运行这段代码时,它变得很疯狂,Observable.from() 发出了所有的文件,它们一次性上传或者最多只有线程池能够处理的数量。
我想要最多同时上传2个文件。是否有任何操作符可以为我处理这个问题?
我尝试了bufferwindow和其他一些操作符,但它们似乎只会一起发出两个项目,而不是保持两个并行文件上传。我还尝试在上传部分设置了一个最大线程池,但在我的情况下无法使用。
应该有一个简单的操作符来解决这个问题,对吗?我错过了什么吗?
1个回答

7

我认为所有文件都是并行上传的,因为您使用了flatMap(),它会同时执行所有转换。相反,您应该使用concatMap(),它会一个接一个地运行转换。如果要运行两个并行上传,则需要在文件可观察对象上调用window(2),然后像在代码中那样调用flatMap()

Observable<Response> responses = 
    files
      .window(2)
      .concatMap(windowFiles ->
        windowFiles.flatMap(file -> uploadFile(file));
      );

更新:

我找到了一个更好的解决方案,完全符合您的要求。有一个重载的flatMap()函数,接受最大并发线程数。

Observable<Response> responses = 
    files
      .onBackpressureBuffer()
      .flatMap(index -> {
        return uploadFile(file).subscribeOn(Schedulers.io());
      }, 2);

这听起来很完美!我会尝试一下并告诉你。 - Ben Groot
很好,窗口操作符现在完美运作!我该如何让窗口移动呢?目前,如果窗口发出文件1和文件2进行上传,它会等待两个文件都完成。如果文件2已经完成而文件1仍在进行中,那么文件3的上传是否可以立即执行呢? - Ben Groot
我不确定使用默认运算符是否可能,因此您可能需要编写自己的运算符。 - Michael
1
窗口(2,1)运算符似乎正在发挥作用。这定义了在跳过1个项目后将创建一个新窗口。您能否验证以确认此操作? - Ben Groot
不,window(2, 1) 不会按照你的期望工作。但是我找到了一个更好的解决方案并更新了答案。 - Michael
非常感谢!太棒了! - Ben Groot

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接