配置Spring.codec.max-in-memory-size在使用ReactiveElasticsearchClient时

25

我正在使用spring-data-elasticsearch 3.2.3中的ReactiveElasticsearchClient与spring-boot 2.2.0进行操作。当我升级到spring-boot 2.2.2时,出现了org.springframework.core.io.buffer.DataBufferLimitException:超过了最大缓冲字节数的限制:262144。

建议通过使用spring.codec.max-in-memory-size来修复此问题,但我仍然遇到相同的异常。

以下是完整的异常信息:

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer : 262144
    at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCollect] :
    reactor.core.publisher.Flux.collect(Flux.java:3273)
    org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
Error has been observed at the following site(s):
    |_     Flux.collect ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:553)
    |_      Mono.filter ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:554)
    |_         Mono.map ⇢ at org.springframework.core.io.buffer.DataBufferUtils.join(DataBufferUtils.java:555)
    |_         Mono.map ⇢ at org.springframework.core.codec.AbstractDataBufferDecoder.decodeToMono(AbstractDataBufferDecoder.java:96)
    |_       checkpoint ⇢ Body from POST http://localhost:9200/_bulk?timeout=1m [DefaultClientResponse]
    |_         Mono.map ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:669)
    |_    Mono.doOnNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:670)
    |_     Mono.flatMap ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.readResponseBody(DefaultReactiveElasticsearchClient.java:671)
    |_ Mono.flatMapMany ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.sendRequest(DefaultReactiveElasticsearchClient.java:591)
    |_ Flux.publishNext ⇢ at org.springframework.data.elasticsearch.client.reactive.DefaultReactiveElasticsearchClient.bulk(DefaultReactiveElasticsearchClient.java:448)
    |_     Flux.flatMap ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:32)
    |_         Flux.map ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:33)
    |_      Flux.reduce ⇢ at com.energisme.ds.reactive.aggregation.service.SensorAggregationService.save(SensorAggregationService.java:34)
    |_         Mono.zip ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:178)
    |_         Mono.map ⇢ at com.energisme.ds.reactive.aggregation.service.AggregateSensorFlowService.nonIndexDifferenceAggregateSensorData(AggregateSensorFlowService.java:179)
Stack trace:
        at org.springframework.core.io.buffer.LimitedDataBufferList.raiseLimitException(LimitedDataBufferList.java:101)
        at org.springframework.core.io.buffer.LimitedDataBufferList.updateCount(LimitedDataBufferList.java:94)
        at org.springframework.core.io.buffer.LimitedDataBufferList.add(LimitedDataBufferList.java:59)
        at reactor.core.publisher.MonoCollect$CollectSubscriber.onNext(MonoCollect.java:119)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:571)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:287)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:313)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:427)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
        at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)

有人能告诉我我做错了什么,还是这是一个bug吗?

谢谢


2
嘿Farid,我的问题有点不同:我有一个 Spring Web Flux HTTP 服务器,我把它升级到了2.2.2版本,然后当客户端发出带有大型正文的HTTP POST请求时,我开始收到相同的异常(超过最大字节缓冲限制:262144)。将spring.codec.max-in-memory-size属性(在application.properties中)配置为一个较大的值(5242880,即5 MB)解决了这个问题。你做了同样的改变吗? - Haimke
1
嘿,Haimke,是的,我已经进行了更改,仍然遇到相同的问题。 - farid
6个回答

56

使用原始反应式WebClient时,我遇到了相同的问题(从2.1.9升级到2.2.1)。我尝试设置spring.codec.max-in-memory-size,但没有成功,并且后来发现这并不是正确的方法:

… 在客户端,可以在WebClient.Builder中更改限制。

来源,包括已失效的链接 :-S)

我仍然不知道WebClient.Builder从哪里获取默认的256K限制1。但是,以下方法使我能够将缓冲区大小限制提高到16M:

WebClient.builder()
  .…
  .exchangeStrategies(ExchangeStrategies.builder()
    .codecs(configurer -> configurer
      .defaultCodecs()
      .maxInMemorySize(16 * 1024 * 1024))
    .build())
  .build();

我认为(不了解“spring-data-elasticsearch”的复杂性)如果您能够以某种方式获取从WebClientProvider返回的WebClient,您应该能够对其进行修改,以包含上面提到的ExchangeStrategies

也许您可以提供自己的DefaultWebClientProvider覆盖,如下所示(未经测试!):

class MyDefaultWebClientProvider extends DefaultWebClientProvider {
  @Override
  public WebClient get(InetSocketAddress endpoint) {
    return super.get(endpoint)
      .mutate() // Obtain WebClient.Builder instance.
      .exchangeStrategies(ExchangeStrategies.builder()
        .codecs(configurer -> configurer
          .defaultCodecs()
          .maxInMemorySize(16 * 1024 * 1024))
        .build())
      .build();
  }
}

你的体验可能会有所不同。


更新 #1:

1)现在我找到了它。 并且 它解释了为什么设置spring.codec.max-in-memory-size没有任何效果;该属性在所有默认编解码器中使用的基类中硬编码为256K,参见BaseDefaultCodecs


是的,你说得对,这就是我发现的问题,我也不知道该如何解决。 - farid
它是硬编码的,但可以被配置文件覆盖。“每当输入流需要聚合时,可以缓冲的字节数限制。默认情况下未设置,此时适用于各个编解码器的默认值。大多数编解码器默认限制为256K。” https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-application-properties.html - amertkara
以上解决方案并未解决我的问题,仍然出现org.springframework.core.io.buffer.DataBufferLimitException异常:超过最大缓冲字节数限制:262144。 - Abdul Basith
我正在使用以下代码:@Bean public WebClient webClient(ReactorClientHttpConnector reactorClientHttpConnector) { return WebClient.builder().defaultHeader("accept-encoding", "gzip, deflate") .exchangeStrategies(ExchangeStrategies.builder() .codecs(codecConfigurer -> { ClientCodecConfigurer.ClientDefaultCodecs clientDefaultCodecs = codecConfigurer.defaultCodecs(); clientDefaultCodecs.maxInMemorySize(1024 * 1024 * 10); clientDefaultCodecs.enableLoggingRequestDetails(true); }) .build()) .clientConnector(reactorClientHttpConnector).build(); } - Abdul Basith
1
这解决了我的问题。在我的批处理作业中,属性文件设置根本不起作用。编解码器设置确实解决了它。 - Brent Thoenen
显示剩余2条评论

6

几天前,我实现了自定义 WebClient 的可能性,请查看相应的Jira问题. 这将在Spring Data Elasticsearch 3.2.4中提供,并已经存在于当前的主分支中。

配置代码如下:

@Configuration
public class ReactiveRestClientConfig extends AbstractReactiveElasticsearchConfiguration {
    @Override
    public ReactiveElasticsearchClient reactiveElasticsearchClient() {
        final ClientConfiguration clientConfiguration = ClientConfiguration.builder() //
                .connectedTo("localhost:9200") //
                .withWebClientConfigurer(webClient -> {
                    ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                            .codecs(configurer -> configurer.defaultCodecs()
                                    .maxInMemorySize(-1))
                            .build();
                    return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
                })
                .build();
        return ReactiveRestClients.create(clientConfiguration);

    }
}

保持“-1”有什么缺点吗?通常情况下,我看到它被设置为10-20 MB,而不是“-1”。 - Satish Patro

5

从Spring Boot 2.3.0开始,现在有一个专门的配置属性用于Reactive Elasticsearch REST客户端。

您可以使用以下配置属性为客户端设置特定的内存限制。

spring.data.elasticsearch.client.reactive.max-in-memory-size=

已经存在的spring.codec.max-in-memory-size属性是独立的,并且仅影响应用程序中的其他WebClient实例。


我使用2.5版本进行测试。已经存在的spring.codec.max-in-memory-size不起作用。只有使用构建器配置才有效。有什么想法吗? - Amir Choubani
只有构建器配置起作用的原因可能是因为该设置通过注入Webclient.Builder来传播自身。它是一个原型范围,而不是单例范围。所以如果你自己使用WebClient.builder()方法,你将无法应用通用设置。但是如果你从DI容器中注入WebClient.Builder,那么它将起作用。 - Dave Ankin

4
或者:
    final Consumer<ClientCodecConfigurer> consumer = configurer -> {
        final ClientCodecConfigurer.ClientDefaultCodecs codecs = configurer.defaultCodecs();
        codecs.maxInMemorySize(maxBufferMb * 1024 * 1024);
    };

    WebClient.builder().codecs(consumer).build();

1
简洁明了的解决方案。我认为你甚至可以这样做:返回WebClient.builder() .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(maxBufferMb * 1024 * 1024)) .build(); - Akah

0

.withWebClientConfigurer已被弃用。我使用了.withClientConfigurer,这对我起作用了。以下是代码 -

.withClientConfigurer(
                        ReactiveRestClients.WebClientConfigurationCallback.from(webClient -> {
                                                ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                            .codecs(configurer -> configurer.defaultCodecs()
                                    .maxInMemorySize(-1))
                            .build();
                    return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
                        }))

参考资料


0
在我的情况下,使用Spring Boot 2.5.6,我不得不同时使用两者来解决这个问题。
创建了一个配置类;
@Configuration
public class WebfluxConfig implements WebFluxConfigurer {

    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.defaultCodecs().maxInMemorySize(5000 * 1024);
    }
    @Bean("webClient")
    public WebClient getSelfWebClient(WebClient.Builder builder) {
        return builder.baseUrl("url").build();
    }
}

在.properties文件中;

spring.codec.max-in-memory-size=5MB

我使用WebClient的类;

@Autowired
@Qualifier("webClient")
private WebClient webClient;

private void doSomething() {
   String response = webClient.post()
                .uri(uri)
                .accept(MediaType.APPLICATION_JSON)
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(requestJson)
                .retrieve()
                .bodyToMono(String.class).block();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接