我需要找出阅读正在被写入一个文件,使用Node.js的实时数据的最佳方式。 麻烦的是,Node是一个快速移动的船,这使得寻找解决问题困难的最佳方法。
我想做的事
我有一个做的东西,然后写的这个东西它到一个文本文件的结果的java程序。 它通常需要5分钟至5小时的任何运行,与写入的整个时间数据,并最多可以得到一些相当沉重的吞吐率(约1000行/秒)。
我想读这个文件,实时,然后,使用节点汇总数据,并将其写入到可以在客户端上绘制一个插座。
在客户端,图表,插座和聚集逻辑都这样做,但我感到困惑的读取文件的最佳方法。
我曾尝试(或至少与饰演)
FIFO
-我可以告诉我的Java程序来写入FIFO和阅读本使用节点,其实这是我们如何有这个目前使用Perl implemted,但因为一切在节点正在运行它的代码在有意义的端口。
Unix Sockets
-正如上面。
fs.watchFile
-将这项工作我们需要什么?
fs.createReadStream
-这是比watchFile更好?
fs
和tail -f
-似乎是一个黑客。
什么,其实,是我的问题
我趋向于使用Unix套接字,这似乎是最快的选择。 但是,确实有节点从实时FS读取文件更好的内置功能?
如果你想保持文件作为数据的持久化存储,以防止系统崩溃或你正在运行的进程去世网络中的成员之一的情况下,流的损失,您仍然可以继续写入文件和读取从中。
如果你不需要这个文件从你的Java程序产生结果的持久存储,然后用Unix套接字会是非常的方便,也都更出色的表现。
fs.watchFile()
是不是你所需要的,因为它的工作原理上的文件统计一个文件系统的报告,并因为要读取的文件,因为它已经被写入,这是不是你想要的。
短的更新:我很遗憾地认识到,虽然我曾指责fs.watchFile()
在上一段中使用文件的统计,我做了我自己同样的事情在我下面的示例代码! 虽然我已经警告过读者“保重!” 因为我在短短几分钟写它,甚至没有测试好; 仍然,这是可以做到通过使用更好fs.watch()
代替watchFile
或fstatSync
如果底层系统支持它。
对于读/从文件写作,我刚才在下面写在我休息的乐趣:
测试-FS-writer.js:因为你在你的Java程序写入文件,您将不再需要这种]
var fs = require('fs'),
lineno=0;
var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});
stream.on('open', function() {
console.log('Stream opened, will start writing in 2 secs');
setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
});
测试-FS-reader.js:小心,这只是演示,检查ERR对象!]
var fs = require('fs'),
bite_size = 256,
readbytes = 0,
file;
fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });
function readsome() {
var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
if(stats.size<readbytes+1) {
console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
setTimeout(readsome, 3000);
}
else {
fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
}
}
function processsome(err, bytecount, buff) {
console.log('Read', bytecount, 'and will process it now.');
// Here we will process our incoming data:
// Do whatever you need. Just be careful about not using beyond the bytecount in buff.
console.log(buff.toString('utf-8', 0, bytecount));
// So we continue reading from where we left:
readbytes+=bytecount;
process.nextTick(readsome);
}
您可以安全地避免使用nextTick
并调用readsome()
而不是直接。 由于我们还在这里工作同步,没有必要在任何意义。 我只是喜欢它。 :p
编辑由奥利弗·劳埃德
以上面的例子,但它延伸到读CSV数据给出:
var lastLineFeed,
lineArray;
function processsome(err, bytecount, buff) {
lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');
if(lastLineFeed > -1){
// Split the buffer by line
lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');
// Then split each line by comma
for(i=0;i<lineArray.length;i++){
// Add read rows to an array for use elsewhere
valueArray.push(lineArray[i].split(','));
}
// Set a new position to read from
readbytes+=lastLineFeed+1;
} else {
// No complete lines were read
readbytes+=bytecount;
}
process.nextTick(readFile);
}
为什么你认为tail -f
是一个黑客?
虽然搞清楚我找到了一个很好的例子,我会做同样的事情。 使用的是Node.js和WebSocket的实时在线活动监视器例如:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket
只是为了让这个完整的答案,我写给你这将0.8.0下运行的示例代码 - (HTTP服务器是一个黑客可能)。
孩子进程产生与尾运行,并且因为一个子进程是三流的EventEmitter(我们在标准输出的情况下使用),你可以只是添加了一个监听器on
文件名:tailServer.js
用法: node tailServer /var/log/filename.log
var http = require("http");
var filename = process.argv[2];
if (!filename)
return console.log("Usage: node tailServer filename");
var spawn = require('child_process').spawn;
var tail = spawn('tail', ['-f', filename]);
http.createServer(function (request, response) {
console.log('request starting...');
response.writeHead(200, {'Content-Type': 'text/plain' });
tail.stdout.on('data', function (data) {
response.write('' + data);
});
}).listen(8088);
console.log('Server running at http://127.0.0.1:8088/');
此模块的原理@hasanyasin建议的实现:
https://github.com/felixge/node-growing-file
我把答案从@hasanyasin包好到模块化的承诺。 其基本思想是,你传递一个文件,并且不与从文件中读取的字符串化的缓冲东西处理函数。 如果处理函数返回true,那么该文件将停止读取。 您还可以设置一个超时,将杀死阅读,如果处理程序不返回true速度不够快。
如果决心()被调用因超时promiser将返回true,否则将返回false。
请参阅使用示例底部。
// https://stackoverflow.com/a/11233045
var fs = require('fs');
var Promise = require('promise');
class liveReaderPromiseMe {
constructor(file, buffStringHandler, opts) {
/*
var opts = {
starting_position: 0,
byte_size: 256,
check_for_bytes_every_ms: 3000,
no_handler_resolution_timeout_ms: null
};
*/
if (file == null) {
throw new Error("file arg must be present");
} else {
this.file = file;
}
if (buffStringHandler == null) {
throw new Error("buffStringHandler arg must be present");
} else {
this.buffStringHandler = buffStringHandler;
}
if (opts == null) {
opts = {};
}
if (opts.starting_position == null) {
this.current_position = 0;
} else {
this.current_position = opts.starting_position;
}
if (opts.byte_size == null) {
this.byte_size = 256;
} else {
this.byte_size = opts.byte_size;
}
if (opts.check_for_bytes_every_ms == null) {
this.check_for_bytes_every_ms = 3000;
} else {
this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms;
}
if (opts.no_handler_resolution_timeout_ms == null) {
this.no_handler_resolution_timeout_ms = null;
} else {
this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms;
}
}
startHandlerTimeout() {
if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) {
var that = this;
this._handlerTimer = setTimeout(
function() {
that._is_handler_timed_out = true;
},
this.no_handler_resolution_timeout_ms
);
}
}
clearHandlerTimeout() {
if (this._handlerTimer != null) {
clearTimeout(this._handlerTimer);
this._handlerTimer = null;
}
this._is_handler_timed_out = false;
}
isHandlerTimedOut() {
return !!this._is_handler_timed_out;
}
fsReadCallback(err, bytecount, buff) {
try {
if (err) {
throw err;
} else {
this.current_position += bytecount;
var buff_str = buff.toString('utf-8', 0, bytecount);
var that = this;
Promise.resolve().then(function() {
return that.buffStringHandler(buff_str);
}).then(function(is_handler_resolved) {
if (is_handler_resolved) {
that.resolve(false);
} else {
process.nextTick(that.doReading.bind(that));
}
}).catch(function(err) {
that.reject(err);
});
}
} catch(err) {
this.reject(err);
}
}
fsRead(bytecount) {
fs.read(
this.file,
new Buffer(bytecount),
0,
bytecount,
this.current_position,
this.fsReadCallback.bind(this)
);
}
doReading() {
if (this.isHandlerTimedOut()) {
return this.resolve(true);
}
var max_next_bytes = fs.fstatSync(this.file).size - this.current_position;
if (max_next_bytes) {
this.fsRead( (this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size );
} else {
setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms);
}
}
promiser() {
var that = this;
return new Promise(function(resolve, reject) {
that.resolve = resolve;
that.reject = reject;
that.doReading();
that.startHandlerTimeout();
}).then(function(was_resolved_by_timeout) {
that.clearHandlerTimeout();
return was_resolved_by_timeout;
});
}
}
module.exports = function(file, buffStringHandler, opts) {
try {
var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts);
return live_reader.promiser();
} catch(err) {
return Promise.reject(err);
}
};
然后用上面这样的代码:
var fs = require('fs');
var path = require('path');
var Promise = require('promise');
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser');
var ending_str = '_THIS_IS_THE_END_';
var test_path = path.join('E:/tmp/test.txt');
var s_list = [];
var buffStringHandler = function(s) {
s_list.push(s);
var tmp = s_list.join('');
if (-1 !== tmp.indexOf(ending_str)) {
// if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms
// by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true
return true;
// you can also return a promise:
// return Promise.resolve().then(function() { return true; } );
}
};
var appender = fs.openSync(test_path, 'a');
try {
var reader = fs.openSync(test_path, 'r');
try {
var options = {
starting_position: 0,
byte_size: 256,
check_for_bytes_every_ms: 3000,
no_handler_resolution_timeout_ms: 10000,
};
liveReadAppendingFilePromiser(reader, buffStringHandler, options)
.then(function(did_reader_time_out) {
console.log('reader timed out: ', did_reader_time_out);
console.log(s_list.join(''));
}).catch(function(err) {
console.error('bad stuff: ', err);
}).then(function() {
fs.closeSync(appender);
fs.closeSync(reader);
});
fs.write(appender, '\ncheck it out, I am a string');
fs.write(appender, '\nwho killed kenny');
//fs.write(appender, ending_str);
} catch(err) {
fs.closeSync(reader);
console.log('err1');
throw err;
}
} catch(err) {
fs.closeSync(appender);
console.log('err2');
throw err;
}