在RxJs 5中,分享Angular Http网络调用的结果的正确方法是什么?

351

通过使用 Http,我们调用执行网络调用并返回 http observable 的方法:

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

如果我们将此可观察对象添加多个订阅者:

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);
我们想要做的是确保这不会导致多个网络请求。
这可能看起来像一个不寻常的场景,但实际上很常见:例如,如果调用者订阅可观察对象以显示错误消息,并使用异步管道将其传递到模板中,则我们已经有了两个订阅者。
在 RxJs 5 中正确的做法是什么?
也就是说,这似乎还不错:
getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

但是在RxJs 5中,这是惯用方法吗?还是我们应该选择其他方法?

注意:根据Angular 5的新HttpClient,所有示例中的.map(res => res.json())部分现在已经无用了,因为默认情况下假定为JSON结果。


1
share()与publish().refCount()并不完全相同。请参考以下讨论:https://github.com/ReactiveX/rxjs/issues/1363 - Christian
1
修改问题,根据问题看起来代码文档需要更新 -> https://github.com/ReactiveX/rxjs/blob/master/src/operator/share.ts - Angular University
我认为“这取决于情况”。但是对于那些无法在本地缓存数据的调用,因为由于参数的更改/组合可能没有意义,使用.share()似乎绝对是正确的选择。但是,如果您可以在本地缓存某些内容,则关于ReplaySubject / BehaviorSubject的其他答案也是不错的解决方案。 - JimB
我认为我们不仅需要缓存数据,还需要更新/修改缓存的数据。这是一个常见的情况。例如,如果我想要向缓存的模型中添加一个新字段或更新字段的值。也许创建一个带有CRUD方法的单例__DataCacheService__是更好的方式?就像Redux的__store__一样。你觉得呢? - Lin Du
你可以简单地使用 ngx-cacheable!它更适合你的场景。请参考我下面的回答。 - Tushar Walzade
请考虑给@Arlo的答案点赞。当我使用多个订阅和combineLatest()时,我只需要在我的管道中添加shareReplay(1)作为最后一个操作符,请求就会在单个“调用堆栈”内共享。 - al-bex
22个回答

3

很好的答案。

或者你可以这样做:

以下来自最新版本的rxjs。 我正在使用5.5.7 版本的RxJS

import {share} from "rxjs/operators";

this.http.get('/someUrl').pipe(share());

3
我们要做的是确保这不会导致多次网络请求。
我的个人偏好是使用异步方法来进行网络请求。这些方法本身不返回值,而是在同一服务中更新一个BehaviorSubject,组件将会订阅它。
那么为什么要使用BehaviorSubject而不是Observable呢?
  • 在订阅时,BehaviorSubject会返回最后一个值,而普通的Observable只有在接收到onnext时才会触发。
  • 如果您想在非Observable代码(即没有订阅)中检索BehaviorSubject的最后一个值,您可以使用getValue()方法。
示例: customer.service.ts
public customers$: BehaviorSubject<Customer[]> = new BehaviorSubject([]);

public async getCustomers(): Promise<void> {
    let customers = await this.httpClient.post<LogEntry[]>(this.endPoint, criteria).toPromise();
    if (customers) 
        this.customers$.next(customers);
}

然后,在需要的地方,我们只需订阅 customers$

public ngOnInit(): void {
    this.customerService.customers$
    .subscribe((customers: Customer[]) => this.customerList = customers);
}

也许你希望直接在模板中使用它。
<li *ngFor="let customer of customerService.customers$ | async"> ... </li>

现在,在您再次调用getCustomers之前,数据将保留在customers$ BehaviorSubject中。

那么,如果您想刷新此数据怎么办?只需调用getCustomers()即可。

public async refresh(): Promise<void> {
    try {
      await this.customerService.getCustomers();
    } 
    catch (e) {
      // request failed, handle exception
      console.error(e);
    }
}

使用这种方法,我们不需要显式地保留网络调用之间的数据,因为 BehaviorSubject 已经处理了它。 PS:通常情况下,当组件被销毁时,最好取消订阅,可以使用答案中建议的方法。

3
我假设@ngx-cache/core对于维护HTTP调用的缓存特性很有用,特别是当HTTP调用在浏览器服务器平台上都进行时。
假设我们有以下方法:
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

你可以使用@ngx-cache/coreCached装饰器将从发起HTTP调用的方法返回的值存储在缓存存储中(存储可以配置,请查看ng-seed/universal的实现)- 第一次执行时就可以。下一次调用该方法(无论是在浏览器还是服务器平台上),该值将从缓存存储中检索。
import { Cached } from '@ngx-cache/core';

...

@Cached('get-customer') // the cache key/identifier
getCustomer() {
  return this.http.get('/someUrl').map(res => res.json());
}

还有一种可能性是使用缓存方法(hasgetset),使用caching API

anyclass.ts

...
import { CacheService } from '@ngx-cache/core';

@Injectable()
export class AnyClass {
  constructor(private readonly cache: CacheService) {
    // note that CacheService is injected into a private property of AnyClass
  }

  // will retrieve 'some string value'
  getSomeStringValue(): string {
    if (this.cache.has('some-string'))
      return this.cache.get('some-string');

    this.cache.set('some-string', 'some string value');
    return 'some string value';
  }
}

以下是客户端和服务器端缓存的包列表:

2

您可以构建简单的Cacheable<>类,以帮助管理从具有多个订阅者的HTTP服务器检索的数据:

declare type GetDataHandler<T> = () => Observable<T>;

export class Cacheable<T> {

    protected data: T;
    protected subjectData: Subject<T>;
    protected observableData: Observable<T>;
    public getHandler: GetDataHandler<T>;

    constructor() {
      this.subjectData = new ReplaySubject(1);
      this.observableData = this.subjectData.asObservable();
    }

    public getData(): Observable<T> {
      if (!this.getHandler) {
        throw new Error("getHandler is not defined");
      }
      if (!this.data) {
        this.getHandler().map((r: T) => {
          this.data = r;
          return r;
        }).subscribe(
          result => this.subjectData.next(result),
          err => this.subjectData.error(err)
        );
      }
      return this.observableData;
    }

    public resetCache(): void {
      this.data = null;
    }

    public refresh(): void {
      this.resetCache();
      this.getData();
    }

}

使用方法

声明可缓存的<>对象(可能作为服务的一部分):

list: Cacheable<string> = new Cacheable<string>();

以及处理程序:

this.list.getHandler = () => {
// get data from server
return this.http.get(url)
.map((r: Response) => r.json() as string[]);
}

从组件中调用:

//gets data from server
List.getData().subscribe(…)

您可以让多个组件对其进行订阅。

更多详细信息和代码示例请参见:http://devinstance.net/articles/20171021/rxjs-cacheable


1

这是因为Angular Http observables在请求完成后会自动完成(.publishReplay(1).refCount();.publishLast().refCount();)。

这个简单的类可以缓存结果,这样您就可以多次订阅.value,并且只进行一次请求。您还可以使用.reload()进行新请求并发布数据。

您可以像这样使用它:

let res = new RestResource(() => this.http.get('inline.bundleo.js'));

res.status.subscribe((loading)=>{
    console.log('STATUS=',loading);
});

res.value.subscribe((value) => {
  console.log('VALUE=', value);
});

并且源代码:

export class RestResource {

  static readonly LOADING: string = 'RestResource_Loading';
  static readonly ERROR: string = 'RestResource_Error';
  static readonly IDLE: string = 'RestResource_Idle';

  public value: Observable<any>;
  public status: Observable<string>;
  private loadStatus: Observer<any>;

  private reloader: Observable<any>;
  private reloadTrigger: Observer<any>;

  constructor(requestObservableFn: () => Observable<any>) {
    this.status = Observable.create((o) => {
      this.loadStatus = o;
    });

    this.reloader = Observable.create((o: Observer<any>) => {
      this.reloadTrigger = o;
    });

    this.value = this.reloader.startWith(null).switchMap(() => {
      if (this.loadStatus) {
        this.loadStatus.next(RestResource.LOADING);
      }
      return requestObservableFn()
        .map((res) => {
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.IDLE);
          }
          return res;
        }).catch((err)=>{
          if (this.loadStatus) {
            this.loadStatus.next(RestResource.ERROR);
          }
          return Observable.of(null);
        });
    }).publishReplay(1).refCount();
  }

  reload() {
    this.reloadTrigger.next(null);
  }

}

1
大多数上面的答案适用于不需要输入的http请求。每次使用一些输入来进行api调用时,都需要重新创建请求。唯一可以处理这种情况的是@Arlo's reply。我创建了一个稍微简单的装饰器,您可以使用它来共享对于具有相同输入的每个调用者的响应。与Arlo的回复不同,这不会向延迟的订阅者重放响应,但会将同时请求处理为一个请求。如果目标是向延迟的观察者重放响应(即缓存的响应),则可以修改下面的代码并将share()替换为shareReplay(1)

https://gist.github.com/OysteinAmundsen/b97a2359292463feb8c0e2270ed6695a

import { finalize, Observable, share } from 'rxjs';

export function SharedObservable(): MethodDecorator {
  const obs$ = new Map<string, Observable<any>>();
  return (target: any, propertyKey: string | symbol, descriptor: PropertyDescriptor) => {
    const originalMethod = descriptor.value;
    descriptor.value = function (...args: any[]) {
      const key = JSON.stringify(args);
      if (!obs$.has(key)) {
        // We have no observable for this key yet, so we create one
        const res = originalMethod.apply(this, args).pipe(
          share(), // Make the observable hot
          finalize(() => obs$.delete(key)) // Cleanup when observable is complete
        );
        obs$.set(key, res);
      }
      // Return the cached observable
      return obs$.get(key);
    };
    return descriptor;
  };
}

用法:
@SharedObservable()
myFunc(id: number): Observable<any> {
  return this.http.get<any>(`/api/someUrl/${id}`);
}

1

rxjs 5.3.0

我对 .map(myFunction).publishReplay(1).refCount() 不满意。

在某些情况下,.map() 在多个订阅者中执行 myFunction 两次(我希望它只执行一次)。一种解决方法似乎是 publishReplay(1).refCount().take(1)

你可以做的另一件事是,不使用 refCount() 并立即使 Observable 成为热的:

let obs = this.http.get('my/data.json').publishReplay(1);
obs.connect();
return obs;

这将启动HTTP请求,无论订阅者是否存在。我不确定在HTTP GET完成之前取消订阅是否会取消请求。

0
你可以简单地使用ngx-cacheable!它更适合你的场景。

使用它的好处:

  • 仅调用REST API一次,缓存响应并返回相同的内容以供后续请求使用。
  • 在创建/更新/删除操作后,可以根据需要调用API。
因此,你的服务类将如下所示 -
import { Injectable } from '@angular/core';
import { Cacheable, CacheBuster } from 'ngx-cacheable';

const customerNotifier = new Subject();

@Injectable()
export class customersService {

    // relieves all its caches when any new value is emitted in the stream using notifier
    @Cacheable({
        cacheBusterObserver: customerNotifier,
        async: true
    })
    getCustomer() {
        return this.http.get('/someUrl').map(res => res.json());
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    addCustomer() {
        // some code
    }

    // notifies the observer to refresh the data
    @CacheBuster({
        cacheBusterNotifier: customerNotifier
    })
    updateCustomer() {
        // some code
    }
}

这里是更多参考的链接。


0

我写了一个缓存类,

/**
 * Caches results returned from given fetcher callback for given key,
 * up to maxItems results, deletes the oldest results when full (FIFO).
 */
export class StaticCache
{
    static cachedData: Map<string, any> = new Map<string, any>();
    static maxItems: number = 400;

    static get(key: string){
        return this.cachedData.get(key);
    }

    static getOrFetch(key: string, fetcher: (string) => any): any {
        let value = this.cachedData.get(key);

        if (value != null){
            console.log("Cache HIT! (fetcher)");
            return value;
        }

        console.log("Cache MISS... (fetcher)");
        value = fetcher(key);
        this.add(key, value);
        return value;
    }

    static add(key, value){
        this.cachedData.set(key, value);
        this.deleteOverflowing();
    }

    static deleteOverflowing(): void {
        if (this.cachedData.size > this.maxItems) {
            this.deleteOldest(this.cachedData.size - this.maxItems);
        }
    }

    /// A Map object iterates its elements in insertion order — a for...of loop returns an array of [key, value] for each iteration.
    /// However that seems not to work. Trying with forEach.
    static deleteOldest(howMany: number): void {
        //console.debug("Deleting oldest " + howMany + " of " + this.cachedData.size);
        let iterKeys = this.cachedData.keys();
        let item: IteratorResult<string>;
        while (howMany-- > 0 && (item = iterKeys.next(), !item.done)){
            //console.debug("    Deleting: " + item.value);
            this.cachedData.delete(item.value); // Deleting while iterating should be ok in JS.
        }
    }

    static clear(): void {
        this.cachedData = new Map<string, any>();
    }

}

由于我们的使用方式,它都是静态的,但你可以随意将其变成普通类和服务。不过我不确定 Angular 是否会在整个时间内保持单个实例(对 Angular2 不太熟悉)。

这就是我的使用方式:

            let httpService: Http = this.http;
            function fetcher(url: string): Observable<any> {
                console.log("    Fetching URL: " + url);
                return httpService.get(url).map((response: Response) => {
                    if (!response) return null;
                    if (typeof response.json() !== "array")
                        throw new Error("Graph REST should return an array of vertices.");
                    let items: any[] = graphService.fromJSONarray(response.json(), httpService);
                    return array ? items : items[0];
                });
            }

            // If data is a link, return a result of a service call.
            if (this.data[verticesLabel][name]["link"] || this.data[verticesLabel][name]["_type"] == "link")
            {
                // Make an HTTP call.
                let url = this.data[verticesLabel][name]["link"];
                let cachedObservable: Observable<any> = StaticCache.getOrFetch(url, fetcher);
                if (!cachedObservable)
                    throw new Error("Failed loading link: " + url);
                return cachedObservable;
            }

我认为可能有更聪明的方法,可以使用一些Observable技巧,但对于我的目的来说,这已经足够了。


0

只需使用此缓存层,它可以执行您所需的所有操作,甚至管理 Ajax 请求的缓存。

http://www.ravinderpayal.com/blogs/12Jan2017-Ajax-Cache-Mangement-Angular2-Service.html

使用起来非常简单

@Component({
    selector: 'home',
    templateUrl: './html/home.component.html',
    styleUrls: ['./css/home.component.css'],
})
export class HomeComponent {
    constructor(AjaxService:AjaxService){
        AjaxService.postCache("/api/home/articles").subscribe(values=>{console.log(values);this.articles=values;});
    }

    articles={1:[{data:[{title:"first",sort_text:"description"},{title:"second",sort_text:"description"}],type:"Open Source Works"}]};
}

该层(作为可注入的Angular服务)是

import { Injectable }     from '@angular/core';
import { Http, Response} from '@angular/http';
import { Observable }     from 'rxjs/Observable';
import './../rxjs/operator'
@Injectable()
export class AjaxService {
    public data:Object={};
    /*
    private dataObservable:Observable<boolean>;
     */
    private dataObserver:Array<any>=[];
    private loading:Object={};
    private links:Object={};
    counter:number=-1;
    constructor (private http: Http) {
    }
    private loadPostCache(link:string){
     if(!this.loading[link]){
               this.loading[link]=true;
               this.links[link].forEach(a=>this.dataObserver[a].next(false));
               this.http.get(link)
                   .map(this.setValue)
                   .catch(this.handleError).subscribe(
                   values => {
                       this.data[link] = values;
                       delete this.loading[link];
                       this.links[link].forEach(a=>this.dataObserver[a].next(false));
                   },
                   error => {
                       delete this.loading[link];
                   }
               );
           }
    }

    private setValue(res: Response) {
        return res.json() || { };
    }

    private handleError (error: Response | any) {
        // In a real world app, we might use a remote logging infrastructure
        let errMsg: string;
        if (error instanceof Response) {
            const body = error.json() || '';
            const err = body.error || JSON.stringify(body);
            errMsg = `${error.status} - ${error.statusText || ''} ${err}`;
        } else {
            errMsg = error.message ? error.message : error.toString();
        }
        console.error(errMsg);
        return Observable.throw(errMsg);
    }

    postCache(link:string): Observable<Object>{

         return Observable.create(observer=> {
             if(this.data.hasOwnProperty(link)){
                 observer.next(this.data[link]);
             }
             else{
                 let _observable=Observable.create(_observer=>{
                     this.counter=this.counter+1;
                     this.dataObserver[this.counter]=_observer;
                     this.links.hasOwnProperty(link)?this.links[link].push(this.counter):(this.links[link]=[this.counter]);
                     _observer.next(false);
                 });
                 this.loadPostCache(link);
                 _observable.subscribe(status=>{
                     if(status){
                         observer.next(this.data[link]);
                     }
                     }
                 );
             }
            });
        }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接