Angular 2 和 Spring Boot 服务器端事件

17

请问有人能为我提供一个Spring Boot服务器端事件的示例吗?

基本上我需要将服务器端事件推送到浏览器。我正在使用Angular 2和Spring Boot后端。 请为我提供一个示例,我找不到好的示例。

@Controller
public class SSEController {

    private final List<SseEmitter> emitters = new ArrayList<>();

    @RequestMapping(path = "/stream", method = RequestMethod.GET)
    public SseEmitter stream() throws IOException {

        SseEmitter emitter = new SseEmitter();

        emitters.add(emitter);
        emitter.onCompletion(() -> emitters.remove(emitter));

        return emitter;
    }
}

如何从服务器实时推送数据,并在Angular 2中订阅此事件?

4个回答

31

编写一个Spring Rest控制器

SseController.java

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@RestController
public class SSEController {

    public static final List<SseEmitter> emitters = Collections.synchronizedList( new ArrayList<>());

    @RequestMapping(path = "/stream", method = RequestMethod.GET)
    public SseEmitter stream() throws IOException {

        SseEmitter emitter = new SseEmitter();

        emitters.add(emitter);
        emitter.onCompletion(() -> emitters.remove(emitter));

        return emitter;
    }
}

服务类(ServiceClass).java

public void sendSseEventsToUI(Notification notification) { //your model class
        List<SseEmitter> sseEmitterListToRemove = new ArrayList<>();
        SSEController.emitters.forEach((SseEmitter emitter) -> {
            try {
                emitter.send(notification, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                emitter.complete();
                sseEmitterListToRemove.add(emitter);
                e.printStackTrace();
            }
        });
        SSEController.emitters.removeAll(sseEmitterListToRemove);
    }

最后在Angular2组件中执行以下操作:

notification.component.ts

import {Component, OnInit} from '@angular/core';

declare let EventSource:any;

@Component({
    selector: 'notification-cmp',
    templateUrl: 'notification.component.html'  
})

export class NotificationComponent implements OnInit {
   connect(): void {
        let source = new EventSource('http://localhost:8080/stream');
        source.addEventListener('message', message => {
            let n: Notification; //need to have this Notification model class in angular2
            n = JSON.parse(message.data);
            console.log(message.data); 
        });
    }
}

1
你有任何可用的工作示例吗?我正在使用RabbitMQ进行基于队列的重操作,但遇到了问题。现在,当消费者完成消息消费时,我想向UI发送事件。我找不到创建serviceImpl或serviceComponent的方法,以便自动装配并发送事件。 - Nagendra Singh
@NagendraSingh,您可以为此发布一个新问题。不清楚您的期望是什么。 - Pratap A.K

3

Pratap A.K的回答非常好。但为了使它更加简洁,你应该创建一个实现接口的NotificationService。就像这样:

NotificationServiceImpl.java

public class NotificationServiceImpl implements NotificationService {

public static final List<SseEmitter> emitters = Collections.synchronizedList(new ArrayList<>());

@Override
public SseEmitter initSseEmitters() {

    SseEmitter emitter = new SseEmitter();
    emitters.add(emitter);
    emitter.onCompletion(() -> emitters.remove(emitter));

    return emitter;
}

@Override
public void sendSseEventsToUI(WebSource notification) {
    List<SseEmitter> sseEmitterListToRemove = new ArrayList<>();
    this.emitters.forEach((SseEmitter emitter) -> {
        try {
            emitter.send(notification, MediaType.APPLICATION_JSON);
        } catch (IOException e) {
            emitter.complete();
            sseEmitterListToRemove.add(emitter);
            e.printStackTrace();
        }
    });
    this.emitters.removeAll(sseEmitterListToRemove);
  }
}

NotificationService.java

public interface NotificationService {

public SseEmitter initSseEmitters();
public void sendSseEventsToUI(WebSource notification);

}

SSEController.java

@RestController
@RequestMapping("/mystream")
public class SSEController {

@Autowired
NotificationServiceImpl INotificationServiceImpl;

@CrossOrigin
@RequestMapping(path = "/streamsource", method = RequestMethod.GET)
public SseEmitter stream() throws IOException {

    return INotificationServiceImpl.initSseEmitters();
  }
}

sendSSeEvetnsToUI 应该在哪里调用?使用这段代码,如果我调用 /streamsource 控制器,什么也不会发生!我该如何创建消息并发送它?你能更清楚地解释一下吗? - CoderJammer

1
上面的回答非常有帮助。
并且..
要接收实际推送的数据..
代码应该是
source.onmessage = (message)=>{
   let n:Notification = JSON.parse(message.data);
}


source.addEventListener('message', message => {
// There is no data property available on 'message' here
   let n: Notification; 
   n = JSON.parse(message.data);
   console.log(message.data); 
});

0

现在使用Spring Webflux更容易完成这项任务,只需使用MediaTypes即可:

    @GetMapping(value = "/queue/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<EventNotification> streamEvents() {
        return managerService.streamEvents();
    }

因此,您可以创建以下架构: {{link1:输入图像描述}}

您可以在https://github.com/htenjo/vqueue中检查工作实现,还有一个RSocket示例。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接