管道JSONStream.parsed()通过es.map数据()和JSONStream.strin

2019-09-30 10:01发布

我试图通过管道JSONStream.parse()的输入流(从一个巨大的GeoJSON的文件创建)打破流进的对象,然后通过event-stream.map(),让我变换对象,然后通过JSONStream .stringify()创建一个字符串出来,最后到一个可写的输出流。 随着进程运行时,我可以看到节点的内存占用量继续增长,直到它最终耗尽堆。 下面是重现该问题的最简单的脚本(test.js):

const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb) { 
        cb(null, data);
        return;
    } ))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out)

该喷出JSON源源不断地为节点的process.stdin一个小bash脚本(barf.sh)会造成节点的堆逐步成长:

#!/bin/bash

echo '{"type":"FeatureCollection","features":['
while :
do
    echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done

通过运行它像这样:

barf.sh | node test.js

有几个好奇的方式来回避这个问题:

  • 卸下fs.createWriteStream()和更改从最后一个管道阶段“.pipe(下)”,以“.pipe(process.stdout)”,然后管节点的标准输出到/ dev / null的
  • 改变异步es.map()到同步es.mapSync()

无论是前两个动作中的一个将允许脚本一直运行下去,随着节点的内存占用低和不变的。 我使用节点v6.3.1,事件流v3.3.4,并且JSONStream 1.1.4八个核机器上的运行Ubuntu 16.04 RAM 8GB。

我希望有人可以帮我更正一下,我敢肯定,是对我而言明显的错误。

Answer 1:

JSONStream不是streams2流,所以它不支持背压。 (有一个关于streams2一个简要这里 。)

这意味着数据将出来的parse在流data的事件和流将会保持抽水出来,无论消费流是否已为他们准备好。 如果两者之间的速度有多快的东西可以读取和写入的管道有些出入的地方,就会有缓冲 - 这是你所看到的。

barf.sh线束认为通过泵入功能stdin 。 相反,如果你在读一个巨大的文件,你应该能够通过暂停文件的读取流来管理流量。 所以,如果你要插入一些pause/resume逻辑到您的map回调,你应该能够得到它的处理大量文件; 它只是需要一点时间。 我会用这样的实验:

let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
    .pipe(js.parse("features.*"))
    .pipe(es.map(function(data, cb) {
        // This is just an example; a 10-millisecond wait per feature would be very slow.
        if (!in.isPaused()) {
            in.pause();
            global.setTimeout(function () { in.resume(); }, 10);
        }
        cb(null, data);
        return;
    }))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out);

顺便说一句,使用mapSync使得小到没有我的电脑上的区别(这是又老又慢)。 但是,除非你有一些异步操作中执行map ,我想一起去mapSync



文章来源: node heap exhausted when piping JSONStream.parsed() data through es.map() and JSONStream.stringify() to file stream