Node.js Streams与Observables的区别

87

在了解了Observables之后,我发现它们与Node.js流非常相似。两者都有一个机制,在新数据到达、出现错误或没有更多数据(EOF)时通知消费者。

我很想了解这两者之间的概念/功能差异。谢谢!


1
@BenjaminGruenbaum,我想知道你为什么在这个标签上加了rxjs和bacon?OP似乎是指来自[标签:ecmascript-harmony]的observable。 - Bergi
@Bergi 先前关于 OP 和问题的了解。基本上。 - Benjamin Gruenbaum
1
恭喜你获得了赞,但我不知道为什么这个问题没有被关闭。这怎么是一个真正的问题或者适合放在SO上呢? - Alexander Mills
6
@AlexanderMills 这不是一个不适合在SO上提问的问题吗?这不是一个 "你最喜欢哪个" 的问题,而是在询问JS/Node中两种常用反应式模式的差异。 - Michael Martin-Smucker
1个回答

116

Observables和node.js的Streams都可以用来解决异步处理值序列的问题。我认为它们之间的主要区别在于出现的背景和相关的术语和API。

Observables是EcmaScript的扩展,引入了响应式编程模型,试图用ObserverObservable这些简约且可组合的概念填补值生成和异步性之间的差距。

而在node.js和Streams一侧,你想要创建一个接口,用于异步高效地处理网络流和本地文件。术语来源于那个最初的上下文,因此你会得到pipechunkencodingflushDuplexBuffer等。通过提供明确支持特定用例的实用方法,你会失去一些组合能力,因为它并不那么统一。例如,在Readable流上使用push,在Writable上使用write,虽然从概念上讲,你正在做同样的事情:发布一个值。

因此,在实践中,如果你看一下这些概念,并使用选项{ objectMode: true },你可以将ObservableReadable流匹配,并将ObserverWritable流匹配。甚至可以在两种模型之间创建一些简单的适配器。

var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');

var Observable = function(subscriber) {
    this.subscribe = subscriber;
}

var Subscription = function(unsubscribe) {
    this.unsubscribe = unsubscribe;
}

Observable.fromReadable = function(readable) {
    return new Observable(function(observer) {
        function nop() {};

        var nextFn = observer.next ? observer.next.bind(observer) : nop;
        var returnFn = observer.return ? observer.return.bind(observer) : nop;
        var throwFn = observer.throw ? observer.throw.bind(observer) : nop;

        readable.on('data', nextFn);
        readable.on('end', returnFn);
        readable.on('error', throwFn);

        return new Subscription(function() {
            readable.removeListener('data', nextFn);
            readable.removeListener('end', returnFn);
            readable.removeListener('error', throwFn);
        });
    });
}

var Observer = function(handlers) {
    function nop() {};

    this.next = handlers.next || nop;
    this.return = handlers.return || nop;
    this.throw = handlers.throw || nop;
}

Observer.fromWritable = function(writable, shouldEnd, throwFn) {
    return new Observer({
        next: writable.write.bind(writable), 
        return: shouldEnd ? writable.end.bind(writable) : function() {}, 
        throw: throwFn
    });
}

您可能已经注意到我更改了一些名称,并使用了此处介绍的更简单的ObserverSubscription概念,以避免在 Generator 中由Observables负责的过载。基本上,Subscription 允许您取消对 Observable 的订阅。无论如何,通过上述代码,您可以使用 pipe

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

process.stdin.pipe(process.stdout) 相比,你可以使用一种方式来组合、过滤和转换流,并适用于任何其他数据序列。你可以通过 ReadableTransformWritable 流实现它,但 API 更倾向于子类化而不是链接 Readable 并应用函数。例如,在 Observable 模型中,转换值对应于将转换器函数应用于流。这不需要 Transform 的新子类型。

Observable.just = function(/*... arguments*/) {
    var values = arguments;
    return new Observable(function(observer) {
        [].forEach.call(values, function(value) {
            observer.next(value);
        });
        observer.return();
        return new Subscription(function() {});
    });
};

Observable.prototype.transform = function(transformer) {
    var source = this;
    return new Observable(function(observer) {
        return source.subscribe({
            next: function(v) {
                observer.next(transformer(v));
            },
            return: observer.return.bind(observer),
            throw: observer.throw.bind(observer)
        });
    });
};

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
  .subscribe(Observer.fromWritable(process.stdout))

结论?在任何地方介绍反应式模型和Observable概念都很容易。但是要围绕这个概念实现整个库就比较困难了。所有这些小函数都需要始终协同工作。毕竟,ReactiveX 项目仍在继续开发。但如果您确实需要将文件内容发送到客户端、处理编码并将其压缩,则在NodeJS中提供了支持,并且它的效果非常好。


8
我对“Ecmascript扩展”的整个概念并不确定。RxJS和RxJava等只是库而已。在ES7或ES8中可能会有一些与Observables相关的关键字,但它们肯定不是该语言的一部分,特别是当你在2015年回答问题时更不是。 - Alexander Mills
1
RX实现是否支持无损背压? 例如,如果Node.js读取流处于暂停模式,则可以使用read()方法按需从流中读取。 而drain事件可以表示可写流可以接收更多数据。 - Buggy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接