使用RxJs和Angular 2来处理服务器推送事件

12

我正在尝试在Angular 2 / RxJs应用程序中显示服务器发送的事件发出的值。

后端通过服务器发送的事件定期向客户端发送单个字符串。

我不确定如何在Angular 2 / RxJs方面处理检索到的值。

这是我的客户端(一个ng组件):

import {Component, OnInit} from 'angular2/core';
import {Http, Response} from 'angular2/http';
import 'rxjs/Rx';
import {Observable}     from 'rxjs/Observable';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings | async">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    constructor(private http:Http) {
    }

    errorMessage:string;
    someStrings:string[];

    ngOnInit() {
        this.getSomeStrings()
            .subscribe(
                aString => this.someStrings.push(aString),
                error => this.errorMessage = <any>error);
    }

    private getSomeStrings():Observable<string> {
        return this.http.get('interval-sse-observable')
            .map(this.extractData)
            .catch(this.handleError);
    }

    private extractData(res:Response) {
        if (res.status < 200 || res.status >= 300) {
            throw new Error('Bad response status: ' + res.status);
        }
        let body = res.json();
        return body || {};
    }

    private handleError(error:any) {
        // In a real world app, we might send the error to remote logging infrastructure
        let errMsg = error.message || 'Server error';
        console.error(errMsg); // log to console instead
        return Observable.throw(errMsg);
    }
}

后端的方法如下(并使用了RxJava):

   @ResponseStatus(HttpStatus.OK)
   @RequestMapping(method = RequestMethod.GET, path = "interval-sse-observable")
    public SseEmitter tickSseObservable() {
        return RxResponse.sse(
                Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                        .map(tick -> randomUUID().toString())
        );
    }

我注意到该应用在请求时停止响应,并且页面上没有任何内容显示。

我怀疑我的map方法使用存在问题,即.map(this.extractData)

我只想将传入的字符串添加到数组中,并在模板中显示该数组,随着字符串的到来而更新该数组。

有人能帮忙吗?

编辑:以下是一个可行方案(感谢下面Thierry的回答):

import {Component, OnInit} from 'angular2/core';
import 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>My second Angular 2 App</h1>
    <ul>
        <li *ngFor="#s of someStrings">
           a string: {{ s }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit {

    someStrings:string[] = [];

    ngOnInit() {
        let source = new EventSource('/interval-sse-observable');
        source.addEventListener('message', aString => this.someStrings.push(aString.data), false);
    }
}

请问您能否告诉我您是否已经成功地在Angular 4中实现了这个功能?我使用了import { Component, OnInit } from '@angular/core';,但它无法找到EventSource类。请问您是否安装了其他自定义包?如果您能在GitHub或其他平台上分享完整的代码以供参考,我将不胜感激。谢谢。 - csharpnewbie
3个回答

28

这里有一个可工作的示例:

SseService

import {Injectable} from '@angular/core';
import {Observable} from 'rxjs/Observable';

declare var EventSource;

@Injectable()
export class SseService {

    constructor() {
    }

    observeMessages(sseUrl: string): Observable<string> {
        return new Observable<string>(obs => {
            const es = new EventSource(sseUrl);
            es.addEventListener('message', (evt) => {
                console.log(evt.data);
                obs.next(evt.data);
            });
            return () => es.close();
        });
    }
}

AppComponent

import {Component, OnDestroy, OnInit} from '@angular/core';
import {SseService} from './shared/services/sse/sse.service';
import {Observable, Subscription} from 'rxjs/Rx';

@Component({
    selector: 'my-app',
    template: `<h1>Angular Server-Sent Events</h1>
    <ul>
        <li *ngFor="let message of messages">
             {{ message }}
        </li>
    </ul>
    `
})
export class AppComponent implements OnInit, OnDestroy {
    private sseStream: Subscription;
    messages:Array<string> = [];

    constructor(private sseService: SseService){
    }

    ngOnInit() {
        this.sseStream = this.sseService.observeMessages('https://server.com/mysse')
                        .subscribe(message => {
                            messages.push(message);
                        });
    }

    ngOnDestroy() {
        if (this.sseStream) {
            this.sseStream.unsubscribe();
        }
    }
}

一点解释会更好,但我无论如何都会点赞 :) - pulsejet
3
非常性感!不言而喻,非常漂亮的abahet。完美,谢谢! - user3777549
你会如何将这个添加到NGRX Effects中? - FussinHussin
如何使用返回的“close()”函数,有任何示例吗? - Ben Racicot

11

1
嗨,Thierry,这是一个略微偏题的问题:今天是否鼓励使用SSE?SSE将被http2取代吗?有比SSE更好的替代品吗?我注意到一些浏览器根本不支持SSE... - balteo

1

补充Thierry的回答,默认情况下事件类型为“message”。但是基于服务器端实现,事件类型可以是任何东西,例如('chat','log'等)。 在我的情况下,服务器的前两个事件是“message”,其余的都是“log”。我的代码如下:

var source = new EventSource('/...');
source.addListener('message', message => {
  (...)
});
source.addListener('log', log => {
  (...)
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接