如何在Node中逐行从标准输入读取数据

266

我希望使用类似以下命令行调用的方式,在node中处理文本文件:

node app.js < input.txt

需要逐行处理文件内容,但处理完一行后,就可以忘记输入行。

使用stdin的on-data监听器,我将输入流分块为字节大小,因此我进行了设置。

process.stdin.resume();
process.stdin.setEncoding('utf8');

var lingeringLine = "";

process.stdin.on('data', function(chunk) {
    lines = chunk.split("\n");

    lines[0] = lingeringLine + lines[0];
    lingeringLine = lines.pop();

    lines.forEach(processLine);
});

process.stdin.on('end', function() {
    processLine(lingeringLine);
});

但这样做似乎很不简洁。需要围绕行数组的第一个和最后一个项目进行调整。难道没有更优雅的方法吗?

11个回答

311

您可以使用readline模块逐行从stdin读取内容:

const readline = require('readline');

const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
  terminal: false
});

rl.on('line', (line) => {
    console.log(line);
});

rl.once('close', () => {
     // end of input
 });

6
在控制台手动输入时,这似乎很有效,但是当我将文件传递到命令时,文件会被发送到标准输出。这是一个错误吗?目前readline被认为是不稳定的。 - Matt R. Wilson
1
我认为你只需要将process.stdout更改为不同的可写流 - 这可能很简单,例如output: new require('stream').Writable() - Jeff Sisson
5
很遗憾,我需要标准输出(stdout)。我在问题中遗漏了它,但我正尝试让应用程序可以像node app.js < input.txt > output.txt一样使用。 - Matt R. Wilson
显然这是“按设计要求”的https://github.com/joyent/node/issues/4243#issuecomment-10133900。因此,我最终按照您所说的方式提供了输出选项的虚拟可写流,然后直接写入了stdout流。我不喜欢它,但它确实有效。 - Matt R. Wilson
22
看起来,如果您将参数 terminal: false 传递给 createInterface 方法,它可以解决这个问题。 - jasoncrawford
显示剩余4条评论

139
// Work on POSIX and Windows
var fs = require("fs");
var stdinBuffer = fs.readFileSync(0); // STDIN_FILENO = 0
console.log(stdinBuffer.toString());

4
可以提供一些细节吗?已经有一个受欢迎的采纳答案了。 - jhhoff02
3
这对我不起作用(Node v9.2.0,Windows)。错误:EISDIR:目录上的非法操作,fstat于'tryStatSync(fs.js:534:13)' - AlexChaffee
5
在Windows系统中(截至v9.10.1仍然存在),如果没有标准输入或标准输入已关闭,则似乎存在一个错误 - 请参见此GitHub问题。但是,除此之外,该解决方案在Windows上确实有效。 - mklement0
8
功能非常好,而且迄今为止最短,可以通过执行 fs.readFileSync(0).toString() 来进一步缩短。 - localhostdotdev
20
请注意,“魔术数”0可以替换为更清晰的 process.stdin.fd(只是硬编码为0,但可以更明显地表达你正在做什么)。 - Dave
显示剩余10条评论

69

readline是专门设计用于终端的(也就是说,process.stdin.isTTY === true)。有很多模块提供了通用流分割功能,比如split。它使得事情变得超级简单:

process.stdin.pipe(require('split')()).on('data', processLine)

function processLine (line) {
  console.log(line + '!')
}

7
不,这不是真的。如果你不想逐行阅读,那么你根本不需要它。 - vkurchatkin
9
提示:如果你想在处理完所有行后运行一些代码,请在第一个 .on() 后面添加 .on('end', doMoreStuff)。请记住,如果你只是在 .on() 语句之后正常编写代码,那么该代码将在读取任何输入之前运行,因为 JavaScript 不是同步的。 - Rory O'Kane

23
#!/usr/bin/env node

const EventEmitter = require('events');

function stdinLineByLine() {
  const stdin = new EventEmitter();
  let buff = '';

  process.stdin
    .on('data', data => {
      buff += data;
      lines = buff.split(/\r\n|\n/);
      buff = lines.pop();
      lines.forEach(line => stdin.emit('line', line));
    })
    .on('end', () => {
      if (buff.length > 0) stdin.emit('line', buff);
    });

  return stdin;
}

const stdin = stdinLineByLine();
stdin.on('line', console.log);

5
Node.js自从接受的答案发布以来发生了很多变化,所以这里提供一个现代示例,使用readline将流分割成行,使用for await从流中读取,并且使用ES模块
import { createInterface } from "node:readline"

for await (const line of createInterface({ input: process.stdin })) {
  // Do something with `line` here.
  console.log(line)
}

3

旧问题的新答案。

自Node 10(2018年4月)以来,由于添加了Symbol.asyncIterator方法(ReadableStream documentationSymbol.asyncIterator documentation),可读流(例如process.stdin)支持for-await-of循环。

使用这个方法,我们可以创建一个适配器,从遍历数据块转换为遍历行。这个逻辑是从this answer中改编而来的。

function streamByLines(stream) {
  stream.setEncoding('utf8');
  return {
    async *[Symbol.asyncIterator]() {
      let buffer = '';

      for await (const chunk of stream) {
        buffer += chunk;
        const lines = buffer.split(/\r?\n/);
        buffer = lines.pop();
        for (const line of lines) {
          yield line;
        }
      }
      if (buffer.length > 0) yield buffer;
    },
  };
}

您可以在允许使用 await 的上下文中像这样使用它

for await (const line of streamByLines(process.stdin)) {
  console.log('Current line:', line)
}

1
只是将https://dev59.com/gmIj5IYBdhLWcg3wmmJE#76743097所说的转换为普通的JavaScript,使用require并添加一个async function回调来允许我们进行测试。

readlines.js

const readline = require('readline');

(async function () {
for await (const line of readline.createInterface({ input: process.stdin })) {
  console.log(line)
}
})()

我们可以用以下方式进行测试:
(echo asdf; sleep 1; echo qwer; sleep 1; echo zxcv) | node  readlines.js

并且它输出:
asdf
qwer
zxcv

每读取一行stdin,就立即打印出来,相隔1秒。这证实了每行都是逐行读取的,并且在其可用后立即处理。
在Node.js v16.14.2和Ubuntu 23.04上进行了测试。

0

逐行读取流,适用于大文件通过stdin管道输入的情况,我的版本:

var n=0;
function on_line(line,cb)
{
    ////one each line
    console.log(n++,"line ",line);
    return cb();
    ////end of one each line
}

var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');

var buffer=[];
readStream.on('data', (chunk) => {
    const newlines=/[\r\n]+/;
    var lines=chunk.split(newlines)
    if(lines.length==1)
    {
        buffer.push(lines[0]);
        return;
    }   
    
    buffer.push(lines[0]);
    var str=buffer.join('');
    buffer.length=0;
    readStream.pause();

    on_line(str,()=>{
        var i=1,l=lines.length-1;
        i--;
        function while_next()
        {
            i++;
            if(i<l)
            {
                return on_line(lines[i],while_next);
            }
            else
            {
                buffer.push(lines.pop());
                lines.length=0;
                return readStream.resume();
            }
        }
        while_next();
    });
  }).on('end', ()=>{
      if(buffer.length)
          var str=buffer.join('');
          buffer.length=0;
        on_line(str,()=>{
            ////after end
            console.error('done')
            ////end after end
        });
  });
readStream.resume();

解释:

  • 为了正确地在 utf8 字母上进行切割,而不是在中间字节集编码上,将编码设置为 utf8,确保每次发出完整的多字节字母。
  • 当接收到数据时,输入会被暂停。它用于阻止输入,直到所有行都被使用完。如果行处理函数比输入慢,则可以防止缓冲区溢出。
  • 如果每次都有一行没有换行符。需要为所有调用累积它并且什么也不做,返回。一旦有多于一行,还要附加它并使用累积的缓冲区。
  • 在消耗了所有分裂的行之后。在最后一行上推送最后一行到缓冲区并恢复暂停的流。

ES6 代码

var n=0;
async function on_line(line)
{
    ////one each line
    console.log(n++,"line ",line);
    ////end of one each line
}

var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');

var buffer=[];
readStream.on('data', async (chunk) => {
    
    const newlines=/[\r\n]+/;
    var lines=chunk.split(newlines)
    if(lines.length==1)
    {
        buffer.push(lines[0]);
        return;
    }
    readStream.pause();

    // let i=0;
    buffer.push(lines[0]); // take first line
    var str=buffer.join('');
    buffer.length=0;//clear array, because consumed
    await on_line(str);
    
    for(let i=1;i<lines.length-1;i++)
       await on_line(lines[i]);
    buffer.push(lines[lines.length-1]);
    lines.length=0; //optional, clear array to hint GC.
    return readStream.resume();
  }).on('end', async ()=>{
      if(buffer.length)
          var str=buffer.join('');
          buffer.length=0;
          await on_line(str);
  });
  readStream.resume();

我没有测试es6代码


1
这个答案中发生了什么? - activedecay
1
@activedecay 添加了一个解释和一个ES6代码。 - Shimon Doodkin

-1
在我的情况下,程序(elinks)返回了看起来是空的行,但实际上含有特殊的终端字符、颜色控制代码和退格符,因此其他答案中介绍的grep选项对我无效。因此,我用Node.js编写了这个小脚本。我把文件叫做tight,但这只是一个随机的名称。
#!/usr/bin/env node

function visible(a) {
    var R  =  ''
    for (var i = 0; i < a.length; i++) {
        if (a[i] == '\b') {  R -= 1; continue; }  
        if (a[i] == '\u001b') {
            while (a[i] != 'm' && i < a.length) i++
            if (a[i] == undefined) break
        }
        else R += a[i]
    }
    return  R
}

function empty(a) {
    a = visible(a)
    for (var i = 0; i < a.length; i++) {
        if (a[i] != ' ') return false
    }
    return  true
}

var readline = require('readline')
var rl = readline.createInterface({ input: process.stdin, output: process.stdout, terminal: false })

rl.on('line', function(line) {
    if (!empty(line)) console.log(line) 
})

-2
如果你想要先询问用户行数:

    //array to save line by line 
    let xInputs = [];

    const getInput = async (resolve)=>{
            const readline = require('readline').createInterface({
                input: process.stdin,
                output: process.stdout,
            });
            readline.on('line',(line)=>{
            readline.close();
            xInputs.push(line);
            resolve(line);
            })
    }

    const getMultiInput = (numberOfInputLines,callback)=>{
        let i = 0;
        let p = Promise.resolve(); 
        for (; i < numberOfInputLines; i++) {
            p = p.then(_ => new Promise(resolve => getInput(resolve)));
        }
        p.then(()=>{
            callback();
        });
    }

    //get number of lines 
    const readline = require('readline').createInterface({
        input: process.stdin,
        output: process.stdout,
        terminal: false
    });
    readline.on('line',(line)=>{
        getMultiInput(line,()=>{
           //get here the inputs from xinputs array 
        });
        readline.close();
    })


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接