如何将Node.js可读流转换为RX Observable

9
如果我有一个Node.js流,例如来自process.stdinfs.createReadStream,我该如何使用RxJs5将其转换为RxJs可观察流? 我看到 RxJs-Node有一个fromReadableStream 方法,但这似乎已经近一年没有更新了。

它能工作吗?如果它能工作,那更新频率有多高又有谁在乎呢? - smnbbrv
@smnbbrv 毫无疑问,它可以正常工作,但它是RxJS4,不兼容RxJS5。 - cartant
3
你可以查看源代码,了解如何自行转换 - 实现非常简单。 - cartant
RxJS在过去的几个版本中发生了很多变化,如果有人正在阅读这篇文章,请尝试下面的答案。[https://dev59.com/yJ7ha4cB1Zd3GeqPdAMt#74925985] - Andrew Philips
7个回答

18

对于任何寻求此内容的人,根据Mark的建议,我为rxjs5修改了rx-node的fromStream实现。

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}

请注意,它本质上破坏了流的所有背压功能。Observables是一种推送技术。所有输入块都将尽可能快地被读取并推送到观察者中。根据您的情况,这可能不是最佳解决方案。


我已经不在我之前的工作中测试过这个,但如果其他人测试过并且有效,我会接受这个答案 :) - JuanCaicedo
我正在使用它,目前它运行良好。不过我还没有进行单元测试。 - Quentin Roy

5

3

上面的答案虽然能够正常工作,但不支持反压(backpressure)。如果您想使用createReadStream读取大文件,它们将在内存中读取整个文件。

这是我带有反压支持的实现: rxjs-stream


3
以下内容适用于v4和v5(免责声明:未经测试):
fromStream: function (stream, finishEventName, dataEventName) {
    stream.pause();

    finishEventName || (finishEventName = 'end');
    dataEventName || (dataEventName = 'data');

    return Observable.create(function (observer) {

      // This is the "next" event
      const data$ = Observable.fromEvent(stream, dataEventName);

      // Map this into an error event
      const error$ = Observable.fromEvent(stream, 'error')
        .flatMap(err => Observable.throw(err));

      // Shut down the stream
      const complete$ = Observable.fromEvent(stream, finishEventName);

      // Put it all together and subscribe
      const sub = data$
        .merge(error$)
        .takeUntil(complete$)
        .subscribe(observer);

      // Start the underlying node stream
      stream.resume();

      // Return a handle to destroy the stream
      return sub;
    })

    // Avoid recreating the stream on duplicate subscriptions
    .share();
  },

1

谢谢,所以听起来你需要实现一个转换? - JuanCaicedo
目前似乎还没有rxjs5版本,所以现在是的。 - Mark van Straten

1

最近来这里的人(RxJS 7&Node 18+)应该使用以下代码。

为什么这个代码有效? RxJS已更新以处理类似流的对象。 当您将ReadStream传递给RxJS时,它会测试它是否为ReadableStreamLike,然后将其转换为AsyncGenerator

import { from } from 'rxjs';

const file = fs.createReadStream(fileName);

const file$ = from(file).subscribe({
  next:  (dat) => { ... },
  error: (err) => { ... },
  complete: () => { ... }
});


1

假设您正在尝试从csv文件中读取数据,以下方法是我发现的最干净的实现方式,可以将数据解析为对象并返回observable。

在此示例中,我使用了一个制表符分隔的文件,但您也可以将此方法用于csv文件。它使用csv-parse将数据映射到正确的接口上。

import * as fs from 'fs';
import { parse } from 'csv-parse';
import type { Parser } from 'csv-parse';
import { Observable } from 'rxjs';

interface Columns {
  columnA: string;
  columnB: string;
}

function readTabFile(): Observable<Columns[]> {
  const parser: Parser = parse({
    delimiter: '\t',
    columns: ['columnA', 'columnB'],
  });
  return new Observable((observer) => {
    const lines: Columns[] = [];
    const stream = fs.createReadStream('./file.TAB', {
      encoding: 'utf8',
    });

    parser.on('data', (row: Columns) => {
      lines.push(row);
    });

    parser.on('end', () => {
      observer.next(lines);
      observer.complete();
    });

    stream.pipe(parser);
  });
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接