Spring Boot 2 Reactor Flux API的Angular客户端

9
如何为Java Project Reactor反应式Flux API创建Angular 4客户端?以下示例有两个API:一个Mono API和一个Flux API。两者都可以使用curl工作,但在Angular 4(4.1.2)中,只有Mono API可以工作;有什么想法可以让Angular 4与Flux API一起工作吗?
这是一个微不足道的Spring Boot 2.0.0-SNAPSHOT应用程序,它具有Mono API和Flux API:
@SpringBootApplication
@RestController
public class ReactiveServiceApplication {

    @CrossOrigin
    @GetMapping("/events/{id}")
    public Mono<Event> eventById(@PathVariable long id) {
        return Mono.just(new Event(id, LocalDate.now()));
    }

    @CrossOrigin
    @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Event> events() {
        Flux<Event> eventFlux = Flux.fromStream(
            Stream.generate(
                ()->new Event(System.currentTimeMillis(), LocalDate.now()))
            );

        Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));

        return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactiveServiceApplication.class);
    }
}

使用Lombok的事件:
@Data
@AllArgsConstructor
public class Event {
    private final long id;
    private final LocalDate when;
}

这些响应式API与我期望的一样可以从curl中使用:

jan@linux-6o1s:~/src> curl -s http://localhost:8080/events/123
{"id":123,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}

对于非终止的Flux API同样适用:

jan@linux-6o1s:~/src> curl -s http://localhost:8080/events
data:{"id":1494887783347,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}

data:{"id":1494887784348,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}

data:{"id":1494887785347,"when":{"year":2017,"month":"MAY","monthValue":5,"dayOfMonth":15,"dayOfWeek":"MONDAY","era":"CE","dayOfYear":135,"leapYear":false,"chronology":{"calendarType":"iso8601","id":"ISO"}}}

...

使用RxJS的同样简单的Angular 4客户端:

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent  implements OnInit, OnDestroy {
  title = 'app works!';
  event: Observable<Event>;
  subscription: Subscription;

  constructor(
    private _http: Http
    ) {
  }

  ngOnInit() {
    this.subscription = this._http
      .get("http://localhost:8080/events/322")
      .map(response => response.json())
      .subscribe(
        e => { 
          this.event = e;
        }
      );
  }

  ngOnDestroy() {
    this.subscription.unsubscribe();
  }
}

对于 Mono API,运作良好:

"http://localhost:8080/events/322"

但是 Flux API:
"http://localhost:8080/events"

不像 curl,它从不触发事件处理程序。


我很乐意澄清,特别是对于投票关闭的选民,因为它是“不清楚你在问什么” - 帮助我理解在评论中不清楚的地方? - Jan Nielsen
没有一个解决方案是令人满意的。我们需要的是一个新版本的Angular! - ieXcept
3个回答

5

以下是一个可用的 Angular 4 SSE 示例,就像 Simon 在他的回答中所描述的那样。这花了一些时间才拼凑起来,所以也许对其他人有用。关键在于 Zone -- 没有 Zone,SSE 更新将不会触发 Angular 的变更检测。

import { Component, NgZone, OnInit, OnDestroy } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { Subscription } from 'rxjs/Subscription';
import 'rxjs/add/operator/map';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent  implements OnInit {
  event: Observable<MyEvent>;
  private _eventSource: EventSource;
  private _events: BehaviorSubject<MyEvent> = new BehaviorSubject<MyEvent>(null);
  constructor(private _http: Http, private _zone: NgZone) {}
  ngOnInit() {
    this._eventSource = this.createEventSource();
    this.event = this.createEventObservable();
  }

  private createEventObservable(): Observable<MyEvent> {
    return this._events.asObservable();
  }

  private createEventSource(): EventSource {
      const eventSource = new EventSource('http://localhost:8080/events');
      eventSource.onmessage = sse => {
        const event: MyEvent = new MyEvent(JSON.parse(sse.data));
        this._zone.run(()=>this._events.next(event));
      };
      eventSource.onerror = err => this._events.error(err);
      return eventSource;
  }
}

相应的HTML代码很简单,如下所示:

<b>Observable of sse</b>
<div *ngIf="(event | async); let evt; else loading">
  <div>ID: {{evt.id}} </div>
</div>
<ng-template #loading>Waiting...</ng-template>

该事件是微不足道的:
export class MyEvent {
  id: number;
  when: any;

  constructor(jsonData) {
    Object.assign(this, jsonData);
  }
}

由于我的TS没有包括EventSourceCallback,所以我对它们进行了桩测试:

interface Callback { (data: any): void; }

declare class EventSource {
    onmessage: Callback;
    onerror: Callback;
    addEventListener(event: string, cb: Callback): void;
    constructor(name: string);
    close: () => void;
}

1
基于Flux的控制器正在生成服务器发送事件(SSE)。我认为Angular2的Http客户端不允许您消费SSE...
编辑:看起来EventSource是您需要的,参见这个类似的问题/答案:https://dev59.com/8FoV5IYBdhLWcg3wA62x#36815231

啊,当然!curl 输出中的 data: 表示这是一个 SSE 文本流。谢谢,Simon! - Jan Nielsen
谢谢你的指点,Simon。请看下面的回答获取详细信息。如果需要,可以将我的回答复制到你的回答中,我会接受你的回答。 - Jan Nielsen

-5

我猜测这里的问题在于/events的URL,因为它应该生成JSON以便处理。

@SpringBootApplication
@RestController
public class ReactiveServiceApplication {

    @CrossOrigin
    @GetMapping("/events/{id}")
    public Mono<Event> eventById(@PathVariable long id) {
        return Mono.just(new Event(id, LocalDate.now()));
    }

    @CrossOrigin
    @GetMapping(value = "/events", produces = MediaType.APPLICATION_JSON_VALUE)
    public Flux<Event> events() {
        Flux<Event> eventFlux = Flux.fromStream(
            Stream.generate(
                ()->new Event(System.currentTimeMillis(), LocalDate.now()))
            );

        Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(1));

        return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactiveServiceApplication.class);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接