如何使用Spring测试服务器发送事件?

8

我已经实现了一个控制器,其中包含返回SseEmitter的方法,现在我想对其进行测试。目前我找到的唯一方法是:

@RunWith(SpringJUnit4ClassRunner.class)
@SpringApplicationConfiguration(classes = {SsePaymentReceivedController.class, AutomatBackendContextInitializer.class, EventBusImpl.class})
@WebAppConfiguration
public class SsePaymentReceivedControllerIntegrationTest {

@Inject
WebApplicationContext context;
@Inject
SsePaymentReceivedController sseCoinController;
@Inject
EventBusImpl eventBus;

MockMvc mockMvc;

@Before
public void setUpMockMvc() {
    this.mockMvc = MockMvcBuilders.webAppContextSetup(this.context).build();
}

@Test
public void subscriptionToSseChannelIsFine() throws Exception {
    MvcResult result = mockMvc.perform(get("/sse/payment"))
            .andExpect(status().isOk())
            .andReturn();
    eventBus.fireNotification(new PaymentReceivedNotification("50", Currency.EURO));
    LinkedHashSet<ResponseBodyEmitter.DataWithMediaType> emitters =
            (LinkedHashSet<ResponseBodyEmitter.DataWithMediaType>)Whitebox.getInternalState(((SseEmitter)result.getModelAndView().getModel().get("sseEmitter")), "earlySendAttempts");
    final Iterator<ResponseBodyEmitter.DataWithMediaType> iterator = emitters.iterator();

    ResponseBodyEmitter.DataWithMediaType dataField = iterator.next();
    assertEquals("data:", dataField.getData());

    ResponseBodyEmitter.DataWithMediaType valueField = iterator.next();
    assertEquals("{\"remainingAmount\":\"50\",\"currency\":\"EURO\"}", valueField.getData());

    ResponseBodyEmitter.DataWithMediaType lastField = iterator.next();
    assertEquals("\n\n", lastField.getData());
}
}

一定有比检查返回的模型内部更好的方法,我正在寻找这种方法-有任何想法吗?

4个回答

1

1
只能在响应式 Web 堆栈中使用(Spring WebFlux)。 - da-sha1
@da-sha1 不是的。WebFlux中的WebClientWebTestClient可以作为通用的HTTP客户端在任何地方使用。 - Hantsy

1

Spring 5.0 提供 WebTestClient,ProjectReactor 提供步骤验证器用于测试服务器推送事件 (SSE)。

需要创建 WebTestClient 对象并调用端点。以下是一个示例:

//Example in Kotlin
        val testResult = webTestClient
                .get()
                .uri(URI.create("/api/sse"))
                .accept(MediaType.TEXT_EVENT_STREAM)
                .exchange()
                .expectStatus().isOk
                .expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
                .returnResult(ServerSentEvent::class.java)
                .responseBody  //It results FluxExchangeResult<ServerSentEvent<*>>

        StepVerifier
                .create(testResult.map { it.id() })
                .expectSubscription()
                .expectNext("1", "2", "3")
                .verifyComplete()


StepVerifier提供了丰富的方法集,可用于使用Flux发布器进行事件流处理的不同情况。
注意:ServerSentEvent是SpringWebFlux框架提供的模型,用于实现SSE。它具有与SSE规范相对应的字段,如ID、数据等。

0
一个好的想法可能是使用SSE客户端。您可以使用JAX WS RS Client和Jersey EventSource来实现这一点。
使用javax.ws.rs-api2.0.1jersey-media-sse2.25.1,以下是如何打开SSE连接的方法:
Client client = ClientBuilder
        .newBuilder()
        .register(SseFeature.class)
        .build();

EventSource eventSource = EventSource
        .target(client.target("http://localhost:8080/feed/123456"))
        .reconnectingEvery(1, TimeUnit.SECONDS)
        .build();

EventListener listener = inboundEvent -> {

    log.info("Event received: id -> {}, name -> {}, data -> {}", inboundEvent.getId(),
             inboundEvent.getName(), inboundEvent.readData(String.class));

    // Process events ...
};
eventSource.register(listener);
eventSource.open();

所以,如果你想创建用于测试的工具,你可以编写一个像这样的小助手类:

package org.demo.service.sse;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;

import org.glassfish.jersey.media.sse.EventListener;
import org.glassfish.jersey.media.sse.EventSource;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.media.sse.SseFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SseTestClient {

    private static final Logger log = LoggerFactory.getLogger(SseTestClient.class);

    private final String targetUrl;
    private final List<InboundEvent> receivedEvents = new ArrayList<>();

    private String clientName = "SSE client";
    private EventSource eventSource;
    private int currentEventPos = 0;

    public SseTestClient( String targetUrl ) {

        this.targetUrl = targetUrl;
    }

    public SseTestClient setClientName( String clientName ) {

        this.clientName = clientName;
        return this;
    }

    public SseTestClient connectFeed() {

        log.info("[{}] Initializing EventSource...", clientName);

        // Create SSE client and server

        Client client = ClientBuilder
                .newBuilder()
                .register(SseFeature.class)
                .build();

        eventSource = EventSource
                .target(client.target(targetUrl))
                .reconnectingEvery(300, TimeUnit.SECONDS) // Large number so that any disconnection will break tests
                .build();

        EventListener listener = inboundEvent -> {

            log.info("[{}] Event received: name -> {}, data -> {}",
                     clientName, inboundEvent.getName(), inboundEvent.readData(String.class));

            receivedEvents.add(inboundEvent);
        };
        eventSource.register(listener);
        eventSource.open();

        log.info("[{}] EventSource connection opened", clientName);

        return this;
    }

    public int getTotalEventCount() {

        return receivedEvents.size();
    }

    public int getNewEventCount() {

        return receivedEvents.size() - currentEventPos;
    }

    public boolean hasNewEvent() {

        return currentEventPos < receivedEvents.size();
    }

    public InboundEvent getNextEvent() {

        if (currentEventPos >= receivedEvents.size()) {
            return null;
        }
        InboundEvent currentEvent = receivedEvents.get(currentEventPos);
        ++currentEventPos;
        return currentEvent;
    }

    public SseTestClient closeFeed() {

        if (eventSource != null) {
            eventSource.close();
        }
        return this;
    }
}

这使得编写测试变得更加容易:

SseTestClient sseClient = new SseTestClient("http://localhost:8080/feed/123456").connectFeed();

Thread.sleep(1000);

assertTrue(sseClient.hasNewEvent());
assertEquals(2, sseClient.getNewEventCount());
// ...

希望能对你有所帮助!


很好,但是依赖关系问题会阻碍它的实现... java.lang.NoClassDefFoundError: jersey/repackaged/com/google/common/util/concurrent/ThreadFactoryBuilder编辑:看起来你还需要添加org.glassfish.jersey.media:jersey-media-sse:2.25.1... - spyro
仍然没有收到任何事件或错误提示... - spyro

0
我发现最好的选择是获取响应的内容:获取内容
String str1 = result.getResponse().getContentAsString()
// remove data: and \n\n
String str2 = str1.substring(5, str1.length() - 2);
new JsonPathExpectationsHelper("$.remainingAmount").assertValue(str2, "50");
new JsonPathExpectationsHelper("$.currency").assertValue(str2, "EURO");

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接