强制完成一个 RxJS 观察者

5
我有一个rxjs观察者(实际上是Subject),可以像tail -f一样无限制地读取文件。这对于监控日志文件非常有用。
这种“无限制”的行为对我的应用程序非常有用,但对测试来说却很糟糕。目前我的应用程序可以正常工作,但我的测试会永远挂起。
我想强制观察者提前完成变化,因为我的测试代码知道文件中应该有多少行。我该怎么做?
我尝试在返回的Subject句柄上调用onCompleted,但此时它基本上被转换为观察者,你无法强制关闭它,错误信息如下:
“对象#没有方法'onCompleted'”
以下是源代码:
function ObserveTail(filename) {

source = new Rx.Subject();

if (fs.existsSync(filename) == false) {
    console.error("file doesn't exist: " + filename);
}

var lineSep = /[\r]{0,1}\n/;
tail = new Tail(filename, lineSep, {}, true);

tail.on("line", function(line) {
        source.onNext(line);
});
tail.on('close', function(data) {
    console.log("tail closed");
    source.onCompleted();
});     
tail.on('error', function(error) {
    console.error(error);
});     

this.source = source;
}           

以下是无法强制 forever 结束(录音带样式测试)的测试代码。请注意 "ILLEGAL" 行:

test('tailing a file works correctly', function(tid) {

var lines = 8;
var i = 0;
var filename = 'tape/tail.json';
var handle = new ObserveTail(filename);
touch(filename);

handle.source
.filter(function (x) {
    try {
        JSON.parse(x);
        return true;
    } catch (error) {
        tid.pass("correctly caught illegal JSON");
        return false;
    }
})
.map(function(x) { return JSON.parse(x) })
.map(function(j) { return j.name })
.timeout(10000, "observer timed out")
.subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
        i++;
        if (i >= lines) {
            handle.onCompleted();   // XXX ILLEGAL
        }
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

})

使用您喜欢的调试器并查看“handle”。您应该找出自己的错误。您花费的时间越多,您在调试方面变得越好,这是一项非常有价值的技能。否则,JavaScript不是一种类型化的语言,这是一种遗憾,因此您必须注意从一个地方传递到另一个地方的对象,因为编译器在您使用它们时不会发出警告。需要一些实践和很多错误才能习惯它,所以经验越丰富越好。 - user3743222
但这是否是解决此问题的正确模式? - Paul S
好的,就是handle.source.onCompleted()。但愿我能够在JavaScript中得到异常处理程序,并默认完全打印出相关对象。 - Paul S
但总体问题仍然存在:这是 RXJS 中“告诉上游源完成”的正确模式吗?我有另一个上游源,它在 setTimeout 循环中(轮询网站),我必须设置一个神奇的布尔值来告诉它退出循环...我扩展了 Subject 来实现这一点,但那可能是糟糕的 hackery...我在这里发布的原因是缺乏使用 RXJS 的好的真实世界示例,希望这能帮助其他人。 - Paul S
我不明白你的问题。无论如何,主题将在两种情况下完成:1. 如果您自己调用 onCompleted,2. 如果该主题被用作观察者,并且该观察者的源已完成。使用更合适的那个。 - user3743222
显示剩余2条评论
1个回答

5

听起来你已经解决了你的问题,但是关于你最初的问题:

我想强制让观察者早早地完成更改,因为我的测试代码知道文件中应该有多少行。我该如何做?

一般来说,当有更好的替代方案时,使用 Subject 是不鼓励的,因为它们往往是人们用熟悉的编程风格的救生圈。相反,建议您考虑在 Observable 生命周期中每个事件的含义。

包装事件发射器

已经存在 Observable.fromEvent 的形式作为 EventEmitter#on/off 模式的包装器。它处理清理和仅在有侦听器时保持订阅有效。因此,可以将 ObserveTail 重构为

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var close = Rx.Observable.fromEvent(tail, "close");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line.takeUntil(close).merge(error).subscribe(observer);
  });
} 

相比于原始使用 Subjects 的方式,这种方法有几个优点:一、您现在可以看到下游的错误;二、当您完成事件处理时,它将处理事件清理。

避免使用 *Sync 方法

然后,您可以将此功能整合到文件存在性检查中,而不需要使用 readSync

//If it doesn't exist then we are done here
//You could also throw from the filter if you want an error tracked
var source = Rx.Observable.fromNodeCallback(fs.exists)(filename)
    .filter(function(exists) { return exists; })
    .flatMap(ObserveTail(filename));

接下来,您可以使用flatMap来简化您的过滤器/映射/映射序列。
var result = source.flatMap(function(x) {
  try {
    return Rx.Observable.just(JSON.parse(x));
  } catch (e) {
    return Rx.Observable.empty();
  }
}, 
//This allows you to map the result of the parsed value
function(x, json) {
  return json.name;
})
.timeout(10000, "observer timed out");

不要发信号,取消订阅

当流只在一个方向上传输时,如何停止“发信号”?我们很少想让观察者直接与可观测对象通信,因此更好的模式是不实际“发信号”而是从可观测对象中简单地取消订阅,并将其留给可观测对象的行为来确定它应该做什么。

基本上,您的观察者确实不应该关心可观测对象,只需要说“我在这里完成了”。

要做到这一点,您需要声明停止时要达到的条件。

在这种情况下,由于你仅仅是在测试用例中设置了一个固定数字后停止,你可以使用take来取消订阅。因此,最终的订阅块看起来像:

result
 //After lines is reached this will complete.
 .take(lines)
 .subscribe (
    function(name) {
        tid.equal(name, "AssetMgr", "verified name field is AssetMgr");
    },
    function(err) {  
        console.error(err)
        tid.fail("err leaked through to subscriber");
    },
    function() {
        tid.end();
        console.log("Completed");
    }
);

编辑1

正如评论中指出的那样,在这个特定的API中,实际上不存在真正的“关闭”事件,因为Tail基本上是一个无限操作。在这种意义上,它与鼠标事件处理程序没有什么不同,当人们停止监听时,我们将停止发送事件。因此,您的代码块可能会像下面这样:

function ObserveTail(filename) {

  return Rx.Observable.create(function(observer) {
    var lineSep = /[\r]{0,1}\n/;
    tail = new Tail(filename, lineSep, {}, true);
    var line = Rx.Observable.fromEvent(tail, "line");
    var error = Rx.Observable.fromEvent(tail, "error")
                  .flatMap(function(err) { return Rx.Observable.throw(err); });
    //Only take events until close occurs and wrap in the error for good measure
    //The latter two are terminal events in this case.
    return line
            .finally(function() {  tail.unwatch(); })
            .merge(error).subscribe(observer);
  }).share();
} 

添加finallyshare操作符将创建一个对象,该对象将在新的订阅者到达时附加到尾部,并且只要至少有一个订阅者仍在侦听,就会保持附加。 但是,一旦所有订阅者都完成,我们可以安全地unwatch尾巴。

非常感谢,我找不到如何正确执行任何这些操作的示例。一个问题是'Tail'没有关闭,就像tail -f一样,你必须使用ctrl-c退出它。这对于永远监听例如日志文件的程序很方便,但对于测试来说不方便。我会尝试您的示例,但我怀疑它是否会正确退出测试。基本上,所有示例(包括此示例)都假定发射器自行结束事物,但实际上我发现有很多情况下我希望使用发射器的程序强制结束。另请参阅定时器轮询网站的示例。 - Paul S
事实上,在包括上述在内的所有示例中,普遍的假设是“源代码会自我清理”。但是,在轮询网站或在文件上执行tail -f时,情况并非如此。close()方法实际上只在侦听器控制下发生。这违反了RXJS的意图,但真实世界的工作方式也经常如此。那么正确的处理方式是什么?像我一样使用Subject扩展一个close API,还是通过扩展可观察对象来添加API?还是其他方法?我认为这是问题的根源。 - Paul S
我假设你正在使用这个库?我认为解决方案将涉及到在所有观察者都离开后调用unwatch。我会更新我的建议的解决方案。 - paulpdaniels
是的,请调用unwatch。但这只适用于这种情况。那么周期性回调呢?我注意到https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/callbacks.md仅处理单个回调,而不处理周期性回调。提供一个周期性计时器的示例将会很有帮助。 - Paul S

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接