AWS Lambda & Node: Write data while streaming - en

2019-08-14 16:09发布

问题:

I've got a Lambda function that is triggered by a write to an S3 bucket. It reads the JSON file that is written to the bucket, parses out the individual records, and writes them to a database.

Problem is; I'm not sure what I'm doing wrong, because the stream ends and the Lambda exits before all the data is written.

I'm in "flowing mode" on my readable stream, and I'm pausing/resuming during the db write. According to the docs, this should do the trick, but it's not working as expected.

Lambda handler:

exports.handler = async (event, context) => {
    let result = false;
    try {
        result = await parseData(event);
    } catch (e) {
        console.error(e);
    }
    return result;
};

Promise:

const StreamArray = require("stream-json/streamers/StreamArray");

async parseData(event) {
    try {
        let objectStream = s3.getObject(params).createReadStream();
        const streamParser = StreamArray.withParser();
        return new Promise((resolve, reject) => {
            objectStream.pipe(streamParser).on("data", async streamData => {
                objectStream.pause();
                let result = await writeData(streamData);
                objectStream.resume();
            }).on("finish", () => {
                console.log("STREAM FINISH!");
                resolve(true);
            }).on("error", e => {
                console.error("Stream error:", e);
                reject(e);
            });
        });
    } catch (e) {
        console.error(e);
    }
}

回答1:

Got it working by simply swapping-out stream-json with JSONStream, which is a more widely-used package anyhow. Works like a charm now!

const JSONStream = require("JSONStream");

async parseData(event) {
    try {
        let objectStream = s3.getObject(params).createReadStream();
        const streamParser = JSONStream.parse("*");
        return new Promise((resolve, reject) => {
            objectStream.pipe(streamParser).on("data", async streamData => {
                streamParser.pause();
                let result = await writeData(streamData);
                streamParser.resume();
            }).on("finish", () => {
                console.log("STREAM FINISH!");
                resolve(true);
            }).on("error", e => {
                console.error("Stream error:", e);
                reject(e);
            });
        });
    } catch (e) {
        console.error(e);
    }
}