如何使用Spring WebClient进行同步调用?

4

Spring FrameworkRestTemplate文档中有一个注意事项:

注意:自5.0版本起,该类进入维护模式,只接受对变更和错误的较小请求。请考虑使用具有更现代化API并支持同步、异步和流式场景的org.springframework.web.reactive.client.WebClient

当我尝试使用WebClient并进行同步调用时:

@RestController
@RequestMapping("/rating")
public class Controller {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private WebClient.Builder webClientBuilder;

    @RequestMapping("/{userId}")
    public UserRating getUserRating(@PathVariable("userId") String userId) {

//      return restTemplate.getForObject("http://localhost:8083/ratingsdata/user/" + userId, UserRating.class);
        return webClientBuilder.build()
                .get().uri("http://localhost:8083/ratingsdata/user/" + userId)
                .retrieve()
                .bodyToMono(UserRating.class)
                .block();

    }
}

项目 pom.xml 文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.daren.tutorial.movie</groupId>
    <artifactId>catalog</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>catalog</name>
    <description>Movie catalog</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

我遇到了以下错误:
    2020-11-15 10:40:15.359 ERROR 7175 --- [or-http-epoll-2] a.w.r.e.AbstractErrorWebExceptionHandler : [8776f4a6-1]  500 Server Error for HTTP GET "/rating/15"

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0.jar:3.4.0]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP GET "/rating/15" [ExceptionHandlingWebHandler]
Stack trace:
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.block(Mono.java:1679) ~[reactor-core-3.4.0.jar:3.4.0]
        at com.daren.tutorial.movie.catalog.resources.Controller.getUserRating(Controller.java:39) ~[classes/:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:564) ~[na:na]
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:146) ~[spring-webflux-5.3.1.jar:5.3.1]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1784) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:251) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:336) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:99) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:191) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3972) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:154) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2346) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2154) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2028) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:152) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.Mono.subscribe(Mono.java:3987) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.0.jar:3.4.0]
        at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:611) ~[reactor-netty-http-1.0.1.jar:1.0.1]
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:612) ~[reactor-netty-core-1.0.1.jar:1.0.1]
        at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:453) ~[reactor-netty-core-1.0.1.jar:1.0.1]
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:510) ~[reactor-netty-http-1.0.1.jar:1.0.1]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.1.jar:1.0.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:185) ~[reactor-netty-http-1.0.1.jar:1.0.1]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.54.Final-linux-x86_64.jar:4.1.54.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.54.Final.jar:4.1.54.Final]
        at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

使用带有RestTemplate对象的注释行,应用程序正常工作。 我必须使用过时的RestTemplate来进行同步REST调用吗?


1
实际上它显示服务器错误500。如果您使用浏览器发出相同的请求,它是否有效? - Vlad L
是的,从浏览器调用http://localhost:8083/ratingsdata/user/的请求可以正常工作。当我使用RestTemplate时,它也能正常工作(请参见编辑后的问题)。 - Openroad
1个回答

5
我找到了问题的原因并解决了它:
因为类路径上只有webflux,所以应用程序在Netty Web服务器上启动。在这种配置下,请求是由reactor-http-epoll-2线程处理的,该线程不是reactor.core.scheduler.NonBlocking的实例。这导致了IllegalStateException的出现。
为了修复这个问题,我不得不添加这个依赖项:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

应用程序现在默认在Tomcat上启动,处理请求的线程http-nio-8081-exec-1NonBlocking的实例。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接