在Spring webFlux Java中从内部订阅flux,请订阅。

5

我使用Spring Reactor库编写了一段逻辑,以异步模式获取所有操作员,然后为每个操作员获取所有设备(分页)。

创建了一个Flux来获取所有操作员,然后对其进行订阅。

    final Flux<List<OperatorDetails>> operatorDetailsFlux = reactiveResourceProvider.getOperators();
    operatorDetailsFlux
        .subscribe(operatorDetailsList -> {
          for (final OperatorDetails operatorDetails : operatorDetailsList) {
            getAndCacheDevicesForOperator(operatorDetails.getId());
          }
        });

现在,对于每个运算符,我正在获取需要多个订阅以获取设备mono的设备,通过订阅MONO异步获取所有页面。
private void getAndCacheDevicesForOperator(final int operatorId) {
    Mono<DeviceListResponseEntity> deviceListResponseEntityMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
        operatorId, 0);

    deviceListResponseEntityMono.subscribe(deviceListResponseEntity -> {
      final PaginatedResponseEntity PaginatedResponseEntity = deviceListResponseEntity.getData();
      final long totalDevicesInOperator = PaginatedResponseEntity.getTotalCount();


      int deviceCount = PaginatedResponseEntity.getCount();
      while (deviceCount < totalDevicesInOperator) {
        final Mono<DeviceListResponseEntity> deviceListResponseEntityPageMono = reactiveResourceProvider.getConnectedDeviceMonoWithRetryAndErrorSpec(
            operatorId, deviceCount);

        deviceListResponseEntityPageMono.subscribe(deviceListResponseEntityPage -> {
          final List<DeviceDetails> deviceDetailsList = deviceListResponseEntityPage.getData()
              .getItems();
          // work on devices
        });

        deviceCount += DEVICE_PAGE_SIZE;
      }
    });
  }

这段代码可以正常运行。但我的问题是从subscribe内部订阅mono是否明智?

你可以调用 flatMap - Nick
请提供一个例子,@Nick。 - quintin
1个回答

3

我将其分为两个流程:首先获取所有运营商,然后获取每个运营商的所有设备。

对于分页,我使用 Flux.expand 提取所有页面。

public Flux<OperatorDetails> getAllOperators() {
  return getOperatorsMonoWithRetryAndErrorSpec(0)
      .expand(paginatedResponse -> {
        final PaginatedEntity operatorDetailsPage = paginatedResponse.getData();
        if (morePagesAvailable(operatorDetailsPage) {
          return getOperatorsMonoWithRetryAndErrorSpec(operatorDetailsPage.getOffset() + operatorDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}


public Flux<Device> getAllDevices(final int opId, final int offset) {
  return getConnectedDeviceMonoWithRetryAndErrorSpec(opId, offset)
      .expand(paginatedResponse -> {
        final PaginatedEntity deviceDetailsPage = paginatedResponse.getData();
        if (morePagesAvailabile(deviceDetailsPage)) {
          return getConnectedDeviceMonoWithRetryAndErrorSpec(opId,
              deviceDetailsPage.getOffset() + deviceDetailsPage.getCount());
        }
        return Mono.empty();
      })
      .flatMap(responseEntity -> fromIterable(responseEntity.getData().getItems()))
      .subscribeOn(apiScheduler);
}

最终我正在创建一个管道并订阅它来触发该管道。

operatorDetailsFlux
    .flatMap(operatorDetails -> {
        return reactiveResourceProvider.getAllDevices(operatorDetails.getId(), 0);
    })
    .subscribe(deviceDetails -> {
      // act on devices
    });


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接