在了解了Observables之后,我发现它们与Node.js流非常相似。两者都有一个机制,在新数据到达、出现错误或没有更多数据(EOF)时通知消费者。
我很想了解这两者之间的概念/功能差异。谢谢!
在了解了Observables之后,我发现它们与Node.js流非常相似。两者都有一个机制,在新数据到达、出现错误或没有更多数据(EOF)时通知消费者。
我很想了解这两者之间的概念/功能差异。谢谢!
Observables和node.js的Streams都可以用来解决异步处理值序列的问题。我认为它们之间的主要区别在于出现的背景和相关的术语和API。
Observables是EcmaScript的扩展,引入了响应式编程模型,试图用Observer
和Observable
这些简约且可组合的概念填补值生成和异步性之间的差距。
而在node.js和Streams一侧,你想要创建一个接口,用于异步高效地处理网络流和本地文件。术语来源于那个最初的上下文,因此你会得到pipe
、chunk
、encoding
、flush
、Duplex
、Buffer
等。通过提供明确支持特定用例的实用方法,你会失去一些组合能力,因为它并不那么统一。例如,在Readable
流上使用push
,在Writable
上使用write
,虽然从概念上讲,你正在做同样的事情:发布一个值。
因此,在实践中,如果你看一下这些概念,并使用选项{ objectMode: true }
,你可以将Observable
与Readable
流匹配,并将Observer
与Writable
流匹配。甚至可以在两种模型之间创建一些简单的适配器。
var Readable = require('stream').Readable;
var Writable = require('stream').Writable;
var util = require('util');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on('data', nextFn);
readable.on('end', returnFn);
readable.on('error', throwFn);
return new Subscription(function() {
readable.removeListener('data', nextFn);
readable.removeListener('end', returnFn);
readable.removeListener('error', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
您可能已经注意到我更改了一些名称,并使用了此处介绍的更简单的Observer
和Subscription
概念,以避免在 Generator
中由Observables负责的过载。基本上,Subscription
允许您取消对 Observable
的订阅。无论如何,通过上述代码,您可以使用 pipe
。
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
与 process.stdin.pipe(process.stdout)
相比,你可以使用一种方式来组合、过滤和转换流,并适用于任何其他数据序列。你可以通过 Readable
、Transform
和 Writable
流实现它,但 API 更倾向于子类化而不是链接 Readable
并应用函数。例如,在 Observable
模型中,转换值对应于将转换器函数应用于流。这不需要 Transform
的新子类型。
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
结论?在任何地方介绍反应式模型和Observable
概念都很容易。但是要围绕这个概念实现整个库就比较困难了。所有这些小函数都需要始终协同工作。毕竟,ReactiveX 项目仍在继续开发。但如果您确实需要将文件内容发送到客户端、处理编码并将其压缩,则在NodeJS中提供了支持,并且它的效果非常好。
read()
方法按需从流中读取。
而drain事件
可以表示可写流可以接收更多数据。 - Buggy