使用RxJS链接承诺

13

我对RxJS和FRP不太熟悉。 我想将我的ExpressJS应用程序中现有的Promise链转换为Observable进行练习。 我知道这可能不是最好的例子,但也许有人可以帮助解决问题。

我的目标:

  1. 我有两个Promise - prom1和prom2
  2. 我想要在 prom2 运行之前先运行 prom1
  3. 如果 prom1 发送 reject(err),我想在它开始之前取消 prom2。
  4. 我希望 prom1 返回的错误消息能够在观察者的 onError 方法中使用。

var prom1 = new Promise(function(resolve, reject) {
    if (true) {
       reject('reason');
    }
    resolve(true);
});

var prom2 = new Promise(function(resolve, reject) {
    resolve(true);
});

// What do I do here? This is what I've tried so far...
var source1 = Rx.Observable.fromPromise(prom1);
var source2 = source1.flatMap(Rx.Observable.fromPromise(prom2));

var subscription = source2.subscribe(
    function (result) { console.log('Next: ' + result); },

    // I want my error 'reason' to be made available here
    function (err) { console.log('Error: ' + err); },

    function () { console.log('Completed'); });

承诺(Promises)不是被“运行”的,它们也不能被“启动”。你的意思是什么? - Bergi
你使用哪个 Promise 库,原生的 Promise 吗?它如何支持取消操作? - Bergi
@Bergi - 我正在使用 https://github.com/then/promise。当我说“运行”或“启动”时,我想这是我的方式表达可观察对象创建并向订阅者广播的时间点。 - Pathsofdesign
flatMap 确实需要一个 回调函数,而不是一个可观察对象 (或者 promise) ,就像 then 方法一样。 - Bergi
@Bergi - 看起来flatMap()确实接受一个observable。http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_instance_methods/flatmap.html。我有什么遗漏吗?我不是说flatMap()就是答案,只是我知道要尝试的唯一方法。这就是为什么我需要帮助。 - Pathsofdesign
啊,我明白了,不知道那个。我想你应该仍然使用回调函数。 - Bergi
3个回答

19

如果我理解你想做的事情 - 你需要从返回 promises 的函数创建两个延迟 observable 并将它们连接:

var shouldFail = false;

function action1() {
    return new Promise(function (resolve, reject) {    
        console.log('start action1');
        if (shouldFail) {
            reject('reason');
        }
        resolve(true);
    });
}

function action2() {
    return new Promise(function (resolve, reject) {    
        console.log('start action2');
        resolve(true);
    });
}

var source1 = Rx.Observable.defer(action1);
var source2 = Rx.Observable.defer(action2);

var combination = Rx.Observable.concat(source1, source2);

var logObserver = Rx.Observer.create(

function (result) {
    console.log('Next: ' + result);
},

function (err) {
    console.log('Error: ' + err);
},

function () {
    console.log('Completed');
});

然后对于正常情况:

combination.subscribe(logObserver);
// start action1
// Next: true
// start action2
// Next: true
// Completed

如果第一个承诺失败的情况:

shouldFail = true;
combination.subscribe(logObserver);
// start action1
// Error: reason

http://jsfiddle.net/cL37tgva/


谢谢!我发现我不需要使用defer(),而是可以将两个promise函数传递给concat()并获得所需的结果。这样做有什么遗漏吗? - Pathsofdesign
2
在这种情况下,两个Promise将立即创建(在调用concat的点上,但实际订阅时不会创建),第二个Promise在开始之前不会等待第一个Promise解决。 - Bogdan Savluk
使用defer(),如果第二个promise在第一个之前返回怎么办?这时concat()会保持它们的顺序吗? - Pathsofdesign
看起来 concat(a,b,c) 足够聪明,可以同步运行事物。即使 b 先完成,它仍然按照 a、b、c 的顺序运行。 - Pathsofdesign

5
flatMap可以将Observables的Observable转化为一个Observable。在很多Promise示例中会使用它,因为通常你都有一个observable,而在map函数中,你希望每个“item”都可以创建一个promise。由于每个fromPromise调用都创建一个新的Observable,所以它就变成了“Observables的Observable”。而flatMap则将其减少为一个“flat”的observable。
在你的例子中,你执行的是一个不同的操作,即将单个promise转化为observable,并希望将其与另一个observable链接起来(也是由单个promise创建的)。Concat可以实现你需要的功能,它可以将两个observables链接在一起。
错误情况将按照预期工作。

-1

Observable.forkJoin 在这里非常好用,可以接收其他 Observables 的数组。

Rx.Observable.forkJoin([this.http.get('http://jsonplaceholder.typicode.com/posts'), this.http.get('http://jsonplaceholder.typicode.com/albums')]).subscribe((data) => {
      console.log(data);
    });

forkJoin 可能不是这里的正确选择,因为 forkJoin 允许所有可观察对象并行运行。原帖作者想要可观察对象按顺序运行。 - piccy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接