RxJS并行队列与并发工作者?

4
假设我想下载 10,000 个文件。我可以轻松地建立一个包含这些文件的队列(如果有更好的方法,欢迎提供建议)。
import request from 'request-promise-native';
import {from} from 'rxjs';

let reqs = [];
for ( let i = 0; i < 10000; i++ ) {
  reqs.push(
    from(request(`http://bleh.com/${i}`))
  )
};

现在我有一个由Promise创建的Rx.JS observable数组,代表我的队列。现在我想要实现以下行为:
  • 向服务器发出三个并发请求
  • 在一个请求完成之后,我希望发出一个新的请求。
我可以解决这个问题,但是考虑到诸如我从未使用过的Rxjs队列等因素,我想知道最正确的Rxjs方法是什么。

1
请查看 https://github.com/cartant/rxjs-etc/blob/master/source/observable/forkJoinConcurrent.ts 。 - cartant
@cartant 那段代码真是太棒了,你能否稍微注释一下并解释一下其中的运作原理——如果不在 Github 上的话?我想我可以从中学到很多东西。 - Evan Carroll
2个回答

9
看起来你想要一个等效于支持调用者指定的最大并发订阅数的forkJoin
可以使用mergeMap重新实现forkJoin,并公开concurrent参数,像这样
import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";

export function forkJoinConcurrent<T>(
  observables: Observable<T>[],
  concurrent: number
): Observable<T[]> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap((observable, observableIndex) => observable.pipe(
      // Like forkJoin, we're interested only in the last value:
      last(),
      // Combine the value with the index so that the stream of merged
      // values - which could be in any order - can be sorted to match
      // the order of the source observables:
      map(value => ({ index: observableIndex, value }))
    ), concurrent),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
  );
}

干得好。现在正在审核以确保我理解你所做的一切。 - Evan Carroll

2

我在2021年也遇到了同样的问题,并且能够利用@cartant的答案解决,所以我想分享一下:

index.ts

import request from 'request-promise-native';
import { from, defer } from "rxjs";
import { forkJoinConcurrent } from './forkJoinConcurrent';

const handleRequest = async (id: string) => await request.get(`http://bleh.com/${id}`, { json: true });

const ids: string[] = [...Array(10000).keys()].map((k: number) => k.toString());

const concurrent: number = 3;

/* use `defer` instead of `from` to generate the Observables. 
 `defer` uses a factory to generate the promise and it will execute 
 the factory only when it is subscribed to */

const observables = ids.map((id: string) => defer(() => from(handleRequest(id))))

forkJoinConcurrent<any>(observables, concurrent).subscribe(value => console.log(value));

forkJoinConcurrent.ts

import { from, Observable } from "rxjs";
import { last, map, mergeMap, toArray } from "rxjs/operators";

export function forkJoinConcurrent<T>(
  observables: Observable<T>[],
  concurrent: number
): Observable<T[]> {
  // Convert the array of observables to a higher-order observable:
  return from(observables).pipe(
    // Merge each of the observables in the higher-order observable
    // into a single stream:
    mergeMap((observable, observableIndex) => observable.pipe(
      // Like forkJoin, we're interested only in the last value:
      last(),
      // Combine the value with the index so that the stream of merged
      // values - which could be in any order - can be sorted to match
      // the order of the source observables:
      map(value => ({ index: observableIndex, value }))
    ), concurrent),
    // Convert the stream of last values to an array:
    toArray(),
    // Sort the array of value/index pairs by index - so the value
    // indices correspond to the source observable indices and then
    // map the pair to the value:
    map(pairs => pairs.sort((l, r) => l.index - r.index).map(pair => pair.value))
  );
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接