如何从具有Observable订阅的函数中返回值?

153
我不知道如何从 Observable 中提取值并将其返回给存在 Observable 的函数。我只需要从中返回一个值,没有其他要求。

目前可用的版本

function getValueFromObservable() {
    this.store.subscribe(
        (data:any) => {
            console.log(data)
        }
    )
}
getValueFromObservable()

我需要这个工作,函数返回值后执行以下操作:

function getValueFromObservable() {
    this.store.subscribe(
        (data:any) => {
            return data
        }
    )
}
console.log(getValueFromObservable())

我在这里做错了什么?


2
当你的 Observable 被解决时,你应该返回一个 Observable/Promise 并通过它传递数据。 - galvan
2
你能放一些简单的代码吗? - Teddy
8
你试图实现的是一种反模式:你试图"同步"一个异步任务。这不是Observable应该工作的方式。简而言之,在大多数情况下,一个函数如果输入是一个Observable,它应该返回一个Observable,或者什么也不返回。当你需要处理输出时,请订阅它。在这种情况下,如果你想将数据打印到控制台,请在subscribe内部进行。 - Can Nguyen
4
请注意!'解决方案'部分的代码绝对不正确,不要使用它! 只有当以下部分完成执行并且返回之后,它才能正常工作:this.store.subscribe( (data:any) => { output = data } ).unsubscribe()否则它会返回 undefined。 - Rodion Golovushkin
1
我有Java编程背景,但异步编程让我很头疼。例如,我期望从一个函数中返回一个值,但它却永远不会返回。你能推荐一些关于JavaScript异步问题的资料吗? - Muhammed Ozdogan
显示剩余2条评论
13个回答

89

编辑:更新了代码,以反映RXJS更近版本中管道的工作方式的更改。现在所有操作符(例如我的例子中的take)都被包装到pipe()操作符中。

我意识到这个问题已经过去了一段时间,你肯定已经有一个适当的解决方案,但是对于任何正在寻找解决方案的人,我建议使用Promise来保持异步模式。

更详细的版本是创建一个新的Promise:

function getValueFromObservable() {
    return new Promise(resolve=>{
        this.store.pipe(
           take(1) //useful if you need the data once and don't want to manually cancel the subscription again
         )
         .subscribe(
            (data:any) => {
                console.log(data);
                resolve(data);
         })
    })
}

在接收端,您需要像这样“等待”承诺解析:

getValueFromObservable()
   .then((data:any)=>{
   //... continue with anything depending on "data" after the Promise has resolved
})

更精简的解决方案是使用 RxJS 的 .toPromise() 方法:
function getValueFromObservable() {
    return this.store.pipe(take(1))
       .toPromise()   
}

接收方当然与上面保持相同。

你的 getValueFromObservable 函数的返回类型是什么? - hardywang
17
你仍然正在返回一个需要被解析的 Promise,而不是直接返回一个值。 - Rogelio
1
类型 'Observable<>' 上不存在属性 'take'。 - Memmo
1
@Memmo 尝试使用 .pipe(take(1)) 代替。 - Thibault
5
这个答案很令人困惑。问题是关于Observable的,但答案使用了Promise?请进行翻译。 - djangofan
显示剩余4条评论

27

这并不是使用Observable的完全正确的想法。

在组件中,您需要声明一个类成员来保存一个对象(您将在组件中使用的某些东西)。

export class MyComponent {
  name: string = "";
}

接下来,一个Service将会向您返回一个Observable

getValueFromObservable():Observable<string> {
    return this.store.map(res => res.json());
}

组件 应该准备好自己以能够从中检索一个值:

OnInit(){
  this.yourServiceName.getValueFromObservable()
    .subscribe(res => this.name = res.name)
}

您需要将 Observable 中的值赋给一个变量:

然后您的模板将使用变量 name:

<div> {{ name }} </div>

使用 Observable 的另一种方式是通过 async 管道http://briantroncone.com/?p=623

注意:如果这不是你询问的内容,请更新你的问题并提供更多细节。


不完全是这样。问题在于数据被捕获在可观察对象内,我只能在控制台记录它。我想通过调用所在函数中的函数来返回该值并进行console.log或其他操作。 - Teddy
Andrei指出了如何通过将其分配给组件的“name”变量,使“name”在回调之外可用。在您的情况下,无法同步返回“name”。 - Jan B.
@Matt:我不能在 Oninit 中像那样使用它,如果我需要显式返回怎么办?我的调用代码看起来像这样 this.actions$.ofType(SearchActions.SEARCH_MULTIPLE_NEW_QUERY).map(toPayload).fnWithMultipleAsyncReturns() - ishandutta2007
@ishandutta2007 你好。你最好在SO上创建一个关于你的问题的新问题。 - Jan B.
@Matt:已创建,如果您想查看(http://stackoverflow.com/questions/43381922/return-subject-from-inside-map) - ishandutta2007
显示剩余3条评论

7
问题在于数据被捕获在可观察对象内,我只能将其记录在控制台上。我想通过调用它所在的函数来返回该值并进行console.log或其他操作。

看起来你正在寻找一个observable内的“当前值”getter,在它发出后和发出之后。

SubjectObservable没有这样的东西。当一个值被发出时,它被传递给它的订阅者,Observable就完成了它的任务。

你可以使用BehaviorSubject,它存储最后发出的值,并立即将其发出到新的订阅者。

它还有一个getValue()方法来获取当前值;

进一步阅读:

RxJS BehaviorSubject

如何获取RxJS Subject或Observable的当前值?


7
如果你想预订相同的 Observable 并返回它,只需要使用 `.do()`。
function getValueFromObservable() {
    return this.store.do(
        (data:any) => {
            console.log("Line 1: " +data);
        }
    );
}

getValueFromObservable().subscribe(
        (data:any) => {
            console.log("Line 2: " +data)
        }
    );

3
你还可以使用其他操作符,比如 .map(data => data),它的作用相同,并且可以在你期望得到结果的任何地方进行订阅。 - ashok_khuman
我同意ashok_khuman的观点。这里是指南https://angular.io/guide/pipes - Armando Perea
1
这可能是一个好答案,但事实上你没有解释任何关于它的内容,使它成为一个糟糕的答案。"预订"是什么意思?它应该解决线程发起者的问题吗? - Florian Leitgeb
请注意,在 RxJS 6 中,do 现在被称为 tap,您必须在管道中使用它。此外,请注意,tap 接受多个参数以处理不同的处理程序,例如 nextcompleteerror - Simon_Weaver

3

虽然之前的回答也可能行得通,但如果您想继续使用可观察对象,我认为使用BehaviorSubject是正确的方法。

示例:

    this.store.subscribe(
        (data:any) => {
            myService.myBehaviorSubject.next(data)
        }
    )

服务中:

let myBehaviorSubject = new BehaviorSubjet(value);

在 component.ts 中:

this.myService.myBehaviorSubject.subscribe(data => this.myData = data)

我希望这可以帮助你!

2

可观察值可以从任何位置获取。源序列首先被“推送”到一个特殊的“观察者”,该观察者能够在其他地方发出信号。这是通过Reactive Extensions(RxJS)中的Subject类实现的。

var subject = new Rx.AsyncSubject();  // store-last-value method

将值存储到观察者中。

subject.next(value); // store value
subject.complete(); // publish only when sequence is completed

要从其他地方检索值,请像这样订阅观察者:

subject.subscribe({
  next: (response) => {
      //do stuff. The property name "response" references the value
  }
});

Subjects既是Observables也是Observers。还有其他Subject类型,例如BehaviourSubject和ReplaySubject,适用于其他使用场景。

别忘了导入RxJS。

var Rx = require('rxjs');

1

我想确认数据是否存储在客户端,或者我是否需要通过API调用从服务器获取它。然后返回一个新的观察者并订阅它就可以了。第一次从服务器获取数据,之后就会返回this.allUsers,因为我在调用API并返回数据后设置了这个数据。

    private _allUsers: EnconUser[] = undefined;
    get allUsers(): EnconUser[]
    {
      return this._allUsers;
    }
    set allUsers(value: EnconUser[])
    {
      this.storage.store('allUsers', value);
      this._allUsers = value;
    }

    public getAllUsers() : Observable<EnconUser[]>
    {
      if (this.allUsers == undefined)
      {
        return new Observable((obs) => {this.httpClient.get<EnconUser[]>(`http://api/getallusers`).subscribe(data=>{this.allUsers=data; obs.next(data); })});
      }
      else
      {
       return new Observable((obs) => {
        obs.next(this.allUsers);
        });
      }
    }

1
合适的方法是从函数中返回可观察对象并在需要时进行订阅,因为可观察对象是惰性的,只有在订阅时才会开始发出值。
这里我还有一个有趣的事件驱动解决方案,最初我用它来玩。下面的示例使用Node.js的“events”模块来实现。您可以在其他类似模块存在的框架中使用它(注意:语法和样式可能会根据所使用的模块而变化)。
var from =require("rxjs").from;
var map = require("rxjs/operators").map;
var EventEmitter = require("events");

function process(event) {
    from([1,2,3]).pipe(
        map(val => `The number is:: ${val}`)
    ).subscribe((data) => {
       event.emit("Event1", data); //emit value received in subscribe to the "Event1" listener
    });
}

function main() {
   class Emitter extends EventEmitter{};
    var event = new Emitter(); //creating an event
    event.on("Event1", (data)=>{ //listening to the event of name "Event1" and callback to log returned result
        console.log(data); //here log, print, play with the data you receive
    });
    process(event); //pass the event to the function which returns observable.
}

main(); //invoke main function

这只是一个示例,展示了我们可以通过发射和监听的方式从不同的地方传递数据的想法。这也被称为事件驱动的代码。


这对我有用,不使用node,而是在管道内使用map记录值。然后可以订阅上游的Observable - Chris Ritchie

0
例如,这是我的 HTML 模板:

<select class="custom-select d-block w-100" id="genre" name="genre"
                  [(ngModel)]="film.genre"
                  #genreInput="ngModel"
                  required>
            <option value="">Choose...</option>
            <option *ngFor="let genre of genres;" [value]="genre.value">{{genre.name}}</option>
          </select>

这是与我的组件模板绑定的字段:

  // Genres of films like action or drama that will populate dropdown list.
  genres: Genre[];

我动态地从服务器获取电影类型。为了与服务器通信,我创建了FilmService

这是与服务器通信的方法:

 fetchGenres(): Observable<Genre[]> {
    return this.client.get(WebUtils.RESOURCE_HOST_API + 'film' + '/genre') as Observable<Genre[]>;
  }

为什么这个方法返回的是Observable<Genre[]>而不是像Genre[]这样的东西?
JavaScript是异步的,它不会等待一个方法在执行昂贵的操作后返回值。所谓昂贵的操作是指需要一定时间才能返回值的操作,比如从服务器获取数据。因此,你必须返回Observable的引用并订阅它。
例如,在我的组件中:
ngOnInit() {
    this.filmService.fetchGenres().subscribe(
      val => this.genres = val
    );
  }

0

从函数中返回一个可观察对象。

rxjsFunction.ts

import { Observable } from 'rxjs'

export function getValueFromObservable() {

    return new Observable( (obs) => {
        obs.next(5);
    })
}

main.ts

import { getValueFromObservable } from "./rxjsFunction";

getValueFromObservable().subscribe((value) => {
    next: console.log(value);
});

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接