使用Node.js实时读取文件

16

我需要找出使用node.js实时读取正在写入文件的数据的最佳方法。问题是,Node是一个快速发展的技术,这使得解决问题变得困难。

我的目标
我有一个Java进程正在执行某些操作,然后将其结果写入文本文件中。通常需要5分钟到5小时不等的时间来运行,数据将在整个运行过程中不断写入,并且可能达到相当高的吞吐量(约1000行/秒)。

我想实时读取此文件,然后使用node.js对数据进行聚合并将其写入套接字,在客户端上可以在图表上显示。

客户端、图表、套接字和聚合逻辑都已完成,但是我对于读取文件的最佳方法感到困惑。

我尝试过的方法(或者至少探索过)
FIFO - 我可以告诉我的Java进程写入FIFO并使用node.js读取它,这实际上是我们当前使用Perl实现的方式,但由于其他所有内容都在node.js中运行,因此将代码移植过来是有意义的。

Unix Sockets - 如上所述。

fs.watchFile - 这个方法能够满足我们的需求吗?

fs.createReadStream - 这比watchFile更好吗?

fs & tail -f - 看起来像一个hack。

实际上,我的问题是什么
我倾向于使用Unix套接字,这似乎是最快的选项。但是,node.js是否有更好的内置功能,用于实时从文件系统中读取文件?

4个回答

9
如果您希望将文件作为数据的持久存储,以防系统崩溃或运行进程中的任一成员死亡导致流丢失,您仍然可以继续写入和读取文件。
如果您不需要该文件作为Java进程产生结果的持久存储,则使用Unix套接字更加易于操作和性能更好。 fs.watchFile() 不适用于您的需求,因为它基于文件系统报告的文件状态工作,而您要读取已经被写入的文件,这并不是您想要的。
短更新:非常抱歉,在前面的段落中指责了 fs.watchFile() 使用文件状态,而我在下面的示例代码中也做了同样的事情!虽然我已经警告读者要“小心!”,因为我只花了几分钟时间编写,并没有进行充分测试;但如果底层系统支持,使用 fs.watch() 而不是 watchFilefstatSync 可以更好地完成任务。
关于从文件读取/写入,以下是我在休息时写的一个有趣的示例: test-fs-writer.js:[您不需要此文件,因为您在Java进程中写入文件]
var fs = require('fs'),
    lineno=0;

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});

stream.on('open', function() {
    console.log('Stream opened, will start writing in 2 secs');
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
});

test-fs-reader.js: 【注意,这只是演示,务必检查错误对象!】

var fs = require('fs'),
    bite_size = 256,
    readbytes = 0,
    file;

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });

function readsome() {
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
    if(stats.size<readbytes+1) {
        console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
        setTimeout(readsome, 3000);
    }
    else {
        fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
    }
}

function processsome(err, bytecount, buff) {
    console.log('Read', bytecount, 'and will process it now.');

    // Here we will process our incoming data:
        // Do whatever you need. Just be careful about not using beyond the bytecount in buff.
        console.log(buff.toString('utf-8', 0, bytecount));

    // So we continue reading from where we left:
    readbytes+=bytecount;
    process.nextTick(readsome);
}

你可以安全地避免使用nextTick,直接调用readsome()即可。由于我们仍在同步工作,因此从任何意义上讲都不必要。我只是喜欢它。 :p

编辑者:Oliver Lloyd

将上面的示例扩展到读取CSV数据:

var lastLineFeed,
    lineArray;
function processsome(err, bytecount, buff) {
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');

    if(lastLineFeed > -1){

        // Split the buffer by line
        lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');

        // Then split each line by comma
        for(i=0;i<lineArray.length;i++){
            // Add read rows to an array for use elsewhere
            valueArray.push(lineArray[i].split(','));
        }   

        // Set a new position to read from
        readbytes+=lastLineFeed+1;
    } else {
        // No complete lines were read
        readbytes+=bytecount;
    }
    process.nextTick(readFile);
}

这是一个很好的例子,直接回答了我的问题。不过需要改进的是,它只能一次处理一行,但可以说这是一件好事;Node缺乏现有的fs接口意味着它是完全可定制的,因此即使我必须编写额外的代码,我也可以实现我所需的功能。 - Oliver Lloyd
当以node <test-fs-reader.js>运行时,这个代码绝对有效,但我该如何将这段代码放入app.js并在HTML页面中获得结果? - usersam

7

为什么你认为tail -f是一个hack?

在研究过程中,我发现了一个很好的例子,我会做类似的事情。使用node.js和WebSocket实现实时在线活动监视器的示例:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

为了使这个答案完整,我写了一个示例代码,可以在0.8.0下运行(http服务器可能是一个hack)。

生成一个子进程,运行tail命令,由于子进程是一个EventEmitter,有三个流(我们在这里使用stdout),所以你可以使用on添加监听器。

文件名:tailServer.js

用法:node tailServer /var/log/filename.log

var http = require("http");
var filename = process.argv[2];


if (!filename)
    return console.log("Usage: node tailServer filename");

var spawn = require('child_process').spawn;
var tail = spawn('tail', ['-f', filename]);

http.createServer(function (request, response) {
    console.log('request starting...');

    response.writeHead(200, {'Content-Type': 'text/plain' });

    tail.stdout.on('data', function (data) {
      response.write('' + data);                
    });
}).listen(8088);

console.log('Server running at http://127.0.0.1:8088/');

我对tail -f的担忧在于它要求读取进程在文件被写入之前处于活动状态,否则数据将会丢失。我的使用情况是,在数据被写入后很长一段时间才进行读取。虽然更新到0.8是一个好的解决方案,但这只适用于写入和读取来自同一源的情况。 - Oliver Lloyd
watchFile也是事件驱动的,但根据文档,不太稳定。上面的示例通过在高级代码中轮询来处理文件更改。对我来说,这看起来像是一个hack。但只要它对你有用,那么这样做就很好。否则,如果文件不存在,您可以使用touch命令,并且不会丢失任何数据,您可以使用wc -l message.text | awk '{print $1}'计算文件的行数,并将其传递给tail -f -n - vik
我认为这段代码在Windows机器上不会运行。 - Krunal Sonparate
我还没有在Windows上测试过。尽管tail应该可以在Win10上工作,否则可以使用WSL。但现在有更好的解决方案。也没有可用且维护良好的tail npm依赖项。 - vik

1

谢谢,这看起来在这里很适用,而且felixge的其他项目也很可靠,所以我很高兴尝试这个模块。 - Oliver Lloyd

0

我从@hasanyasin那里找到了答案,并将其封装成了一个模块化的Promise。基本思路是你传递一个文件和一个处理函数,该函数对从文件中读取的字符串缓冲执行某些操作。如果处理函数返回true,则文件将停止被读取。您还可以设置一个超时时间,如果处理程序不足够快地返回true,则会终止读取。

如果resolve()由于超时而被调用,则promiser将返回true,否则它将返回false。

请参见下面的使用示例。

// https://dev59.com/Nmgu5IYBdhLWcg3wln8O#11233045

var fs = require('fs');
var Promise = require('promise');

class liveReaderPromiseMe {
    constructor(file, buffStringHandler, opts) {
        /*
            var opts = {
                starting_position: 0,
                byte_size: 256,
                check_for_bytes_every_ms: 3000,
                no_handler_resolution_timeout_ms: null
            };
        */

        if (file == null) {
            throw new Error("file arg must be present");
        } else {
            this.file = file;
        }

        if (buffStringHandler == null) {
            throw new Error("buffStringHandler arg must be present");
        } else {
            this.buffStringHandler = buffStringHandler;
        }

        if (opts == null) {
            opts = {};
        }

        if (opts.starting_position == null) {
            this.current_position = 0;
        } else {
            this.current_position = opts.starting_position;
        }

        if (opts.byte_size == null) {
            this.byte_size = 256;
        } else {
            this.byte_size = opts.byte_size;
        }

        if (opts.check_for_bytes_every_ms == null) {
            this.check_for_bytes_every_ms = 3000;
        } else {
            this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms;
        }

        if (opts.no_handler_resolution_timeout_ms == null) {
            this.no_handler_resolution_timeout_ms = null;
        } else {
            this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms;
        }
    }


    startHandlerTimeout() {
        if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) {
            var that = this;
            this._handlerTimer = setTimeout(
                function() {
                    that._is_handler_timed_out = true;
                },
                this.no_handler_resolution_timeout_ms
            );
        }
    }

    clearHandlerTimeout() {
        if (this._handlerTimer != null) {
            clearTimeout(this._handlerTimer);
            this._handlerTimer = null;
        }
        this._is_handler_timed_out = false;
    }

    isHandlerTimedOut() {
        return !!this._is_handler_timed_out;
    }


    fsReadCallback(err, bytecount, buff) {
        try {
            if (err) {
                throw err;
            } else {
                this.current_position += bytecount;
                var buff_str = buff.toString('utf-8', 0, bytecount);

                var that = this;

                Promise.resolve().then(function() {
                    return that.buffStringHandler(buff_str);
                }).then(function(is_handler_resolved) {
                    if (is_handler_resolved) {
                        that.resolve(false);
                    } else {
                        process.nextTick(that.doReading.bind(that));
                    }
                }).catch(function(err) {
                    that.reject(err);
                });
            }
        } catch(err) {
            this.reject(err);
        }
    }

    fsRead(bytecount) {
        fs.read(
            this.file,
            new Buffer(bytecount),
            0,
            bytecount,
            this.current_position,
            this.fsReadCallback.bind(this)
        );
    }

    doReading() {
        if (this.isHandlerTimedOut()) {
            return this.resolve(true);
        } 

        var max_next_bytes = fs.fstatSync(this.file).size - this.current_position;
        if (max_next_bytes) {
            this.fsRead( (this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size );
        } else {
            setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms);
        }
    }


    promiser() {
        var that = this;
        return new Promise(function(resolve, reject) {
            that.resolve = resolve;
            that.reject = reject;
            that.doReading();
            that.startHandlerTimeout();
        }).then(function(was_resolved_by_timeout) {
            that.clearHandlerTimeout();
            return was_resolved_by_timeout;
        });
    }
}


module.exports = function(file, buffStringHandler, opts) {
    try {
        var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts);
        return live_reader.promiser();
    } catch(err) {
        return Promise.reject(err);
    }
};

然后像这样使用上述代码:

var fs = require('fs');
var path = require('path');
var Promise = require('promise');
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser');

var ending_str = '_THIS_IS_THE_END_';
var test_path = path.join('E:/tmp/test.txt');

var s_list = [];
var buffStringHandler = function(s) {
    s_list.push(s);
    var tmp = s_list.join('');
    if (-1 !== tmp.indexOf(ending_str)) {
        // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms
        // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true
        return true;
        // you can also return a promise:
        //  return Promise.resolve().then(function() { return true; } );
    }
};

var appender = fs.openSync(test_path, 'a');
try {
    var reader = fs.openSync(test_path, 'r');
    try {
        var options = {
            starting_position: 0,
            byte_size: 256,
            check_for_bytes_every_ms: 3000,
            no_handler_resolution_timeout_ms: 10000,
        };

        liveReadAppendingFilePromiser(reader, buffStringHandler, options)
        .then(function(did_reader_time_out) {
            console.log('reader timed out: ', did_reader_time_out);
            console.log(s_list.join(''));
        }).catch(function(err) {
            console.error('bad stuff: ', err);
        }).then(function() {
            fs.closeSync(appender);
            fs.closeSync(reader);
        });

        fs.write(appender, '\ncheck it out, I am a string');
        fs.write(appender, '\nwho killed kenny');
        //fs.write(appender, ending_str);
    } catch(err) {
        fs.closeSync(reader);
        console.log('err1');
        throw err;
    }
} catch(err) {
    fs.closeSync(appender);
        console.log('err2');
    throw err;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接