我有一个rxjs观察者(实际上是Subject),可以像tail -f一样无限制地读取文件。这对于监控日志文件非常有用。
这种“无限制”的行为对我的应用程序非常有用,但对测试来说却很糟糕。目前我的应用程序可以正常工作,但我的测试会永远挂起。
我想强制观察者提前完成变化,因为我的测试代码知道文件中应该有多少行。我该怎么做?
我尝试在返回的Subject句柄上调用onCompleted,但此时它基本上被转换为观察者,你无法强制关闭它,错误信息如下:
“对象#没有方法'onCompleted'”
以下是源代码:
这种“无限制”的行为对我的应用程序非常有用,但对测试来说却很糟糕。目前我的应用程序可以正常工作,但我的测试会永远挂起。
我想强制观察者提前完成变化,因为我的测试代码知道文件中应该有多少行。我该怎么做?
我尝试在返回的Subject句柄上调用onCompleted,但此时它基本上被转换为观察者,你无法强制关闭它,错误信息如下:
“对象#没有方法'onCompleted'”
以下是源代码:
function ObserveTail(filename) {
source = new Rx.Subject();
if (fs.existsSync(filename) == false) {
console.error("file doesn't exist: " + filename);
}
var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);
tail.on("line", function(line) {
source.onNext(line);
});
tail.on('close', function(data) {
console.log("tail closed");
source.onCompleted();
});
tail.on('error', function(error) {
console.error(error);
});
this.source = source;
}
以下是无法强制 forever 结束(录音带样式测试)的测试代码。请注意 "ILLEGAL" 行:
test('tailing a file works correctly', function(tid) {
var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);
handle.source
.filter(function (x) {
try {
JSON.parse(x);
return true;
} catch (error) {
tid.pass("correctly caught illegal JSON");
return false;
}
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
function(name) {
tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
i++;
if (i >= lines) {
handle.onCompleted(); // XXX ILLEGAL
}
},
function(err) {
console.error(err)
tid.fail("err leaked through to subscriber");
},
function() {
tid.end();
console.log("Completed");
}
);
})
onCompleted
,2. 如果该主题被用作观察者,并且该观察者的源已完成。使用更合适的那个。 - user3743222