Fast-csv read several files synchronously

2019-07-20 05:05发布

I'm trying to read several files synchronously with fast-csv, it should looks like:

read file 1
execute something while reading
read file 2
execute something while reading (it must be execute after first execution's file that's why I need to do this synchronously)
...

Here is my code simplified:

const csv = require('fast-csv');
const PROCEDURES = [
    { "name": "p1", "file": "p1.csv" },
    { "name": "p2", "file": "p2.csv" },
];

const launchProcedure = (name, file) => {

    try {   
        const fs = require("fs");
        const stream = fs.createReadStream(file, {
            encoding: 'utf8'
        });
        console.log('launching parsing...');

        stream.once('readable', () => {
        // ignore first line
        let chunk;
        while (null !== (chunk = stream.read(1))) {
            if (chunk == '\n') {
                break;
            }
        }

            // CSV parsing
            const csvStream = csv.fromStream(stream, {
                renameHeaders: false,
                headers: true,
                delimiter: ',',
                rowDelimiter: '\n',
                quoteHeaders: false,
                quoteColumns: false
            }).on("data", data => {
                console.log('procedure execution...');
                // I execute a procedure...

            }).on("error", error => {
                logger.error(error);
            }).on("end", data => {
                logger.info(data);
            });
         });

    }
    catch (e) {
        logger.info(e);
    }
}

PROCEDURES.forEach(procedure => {
    launchProcedure(procedure.name, procedure.file);
});

Output will be:

launching parsing...
launching parsing...
procedure execution...
procedure execution...

Problem is on stream.once but I used this to ignore first line. I tried to promisify my function and use async await... ( I had a similar problem when executing my procedure and I solved it by using csvStream.pause() and csvStream.resume() ).

Any idea ?

3条回答
萌系小妹纸
2楼-- · 2019-07-20 05:17

Using promises seems like a good way to solve this. Be aware though, that when you create a new Promise with new Promise(executor), the executor is executed immediately. So you need to delay it until the previous promise has been executed.

To "promisify" the launchProcedure function, you need to return a new Promise at the start of the function:

const launchProcedure = (name, file) => {
    return new Promise((resolve, reject) => {

And then you need to call resolve (for success) and reject (for failure) whenever the parsing has finished.

Finally we need to string together the promises:

let promise = launchProcedure(PROCEDURES[0].name, PROCEDURES[0].file);
for (let i = 1; i < PROCEDURES.length; i++) {
    promise = promise.then(() => launchProcedure(PROCEDURES[i].name, PROCEDURES[i].file));
}

Note that I use a lambda function within the then to delay the creation of the Promise. (By the way, there are also nicer ways to string together promises.)

The final code looks like this:

const csv = require('fast-csv');
const PROCEDURES = [
    { "name": "p1", "file": "p1.csv" },
    { "name": "p2", "file": "p2.csv" },
];

const launchProcedure = (name, file) => {
    return new Promise((resolve, reject) => {
        try {   
            const fs = require("fs");
            const stream = fs.createReadStream(file, {
                encoding: 'utf8'
            });
            console.log('launching parsing...');

            stream.once('readable', () => {
                // ignore first line
                let chunk;
                while (null !== (chunk = stream.read(1))) {
                    if (chunk == '\n') {
                        break;
                    }
                }

                // CSV parsing
                const csvStream = csv.fromStream(stream, {
                    renameHeaders: false,
                    headers: true,
                    delimiter: ',',
                    rowDelimiter: '\n',
                    quoteHeaders: false,
                    quoteColumns: false
                }).on("data", data => {
                    console.log('procedure execution...');
                    // I execute a procedure...

                }).on("error", error => {
                    logger.error(error);
                    reject(error);
                }).on("end", data => {
                    logger.info(data);
                    resolve();
                });
            });
        }
        catch (e) {
            logger.info(e);
            reject(e);
        }
    });
}

let promise = launchProcedure(PROCEDURES[0].name, PROCEDURES[0].file);
for (let i = 1; i < PROCEDURES.length; i++) {
    promise = promise.then(() => launchProcedure(PROCEDURES[i].name, PROCEDURES[i].file));
}
promise.then(() => { console.log('all files parsed'); });
查看更多
Luminary・发光体
3楼-- · 2019-07-20 05:31

The problem here is that launchProcedure has to be async in order to use await. Another problem is that using async/await together with Array.forEach is not the best choice (see here).

You can do this using the "for-of" loop and await inside the loop body:

const csv = require('fast-csv');
const fs = require('fs');
const PROCEDURES = [
    { "name": "p1", "file": "p1.csv" },
    { "name": "p2", "file": "p2.csv" },
];

const launchProcedure = async (name, file) => {

    try {   
        // only require this once in the file (you require each time `launchProcedure` is getting called)
        // const fs = require("fs"); 
        const stream = fs.createReadStream(file, {
            encoding: 'utf8'
        });

        console.log('launching parsing...');

        // wait for the readable (or error) event
        const ready = await new Promise((resolve, reject) => {
            stream.on('readable', resolve);
            stream.on('error', reject);
        })
        .then(() => true)
        .catch(() => false);

        console.log('file is ready: ', ready)

        if (!ready) {
            throw new Error(`Unable to read file (file-name: "${file}")`);
        }

        // ignore first line
        let chunk;
        while (null !== (chunk = stream.read(1))) {
            if (chunk == '\n') {
                break;
            }
        }

        // CSV parsing
        const csvStream = csv.fromStream(stream, {
            renameHeaders: false,
            headers: true,
            delimiter: ',',
            rowDelimiter: '\n',
            quoteHeaders: false,
            quoteColumns: false
        }).on("data", data => {
            console.log('procedure execution...');
            // I execute a procedure...

        }).on("error", error => {
            logger.error(error);
        }).on("end", data => {
            logger.info(data);
        });

        console.log(`Done reading file (file-name: "${file}")`);
    }
    catch (e) {
        logger.info(e);
    }
}


// Wrap your iteration over the `PROCEDURES` array into an async function (this makes `await` available inside the function)
// Then use "for-of" here instead of for each to have full async support.
const runProcedures = async (procedures) => {
    for (procedure of PROCEDURES) {
        await launchProcedure(procedure.name, procedure.file);
    }
}

runProcedures(PROCEDURES);

OUTPUT:

launching parsing...
file is ready:  true
Done reading file (file-name: "p1.csv")
launching parsing...
file is ready:  true
Done reading file (file-name: "p2.csv")
查看更多
仙女界的扛把子
4楼-- · 2019-07-20 05:38

Hey I found a solution in the meantime before seiing your answers (sorry for posting later) !

const launchProcedure = (name, file, callback) => {

    try {   
        const fs = require("fs");
        const stream = fs.createReadStream(file, {
            encoding: 'utf8'
        });
        console.log('launching parsing...');

        stream.once('readable', () => {
        // ignore first line
        let chunk;
        while (null !== (chunk = stream.read(1))) {
            if (chunk == '\n') {
                break;
            }
        }

            // CSV parsing
            const csvStream = csv.fromStream(stream, {
                renameHeaders: false,
                headers: true,
                delimiter: ',',
                rowDelimiter: '\n',
                quoteHeaders: false,
                quoteColumns: false
            }).on("data", data => {
                console.log('procedure execution...');
                // I execute a procedure...

            }).on("error", error => {
                logger.error(error);
            }).on("end", data => {
                logger.info(data);
                callback();
            });
         });

    }
    catch (e) {
        logger.info(e);
    }
}
async.eachSeries(PROCEDURES, (procedure, callback) => {
        launchProcedure(db, procedure.name, procedure.file, callback);        
    }, error => {
        if (error) {
            logger.error(error);
        }
        else {
            logger.info("done");
        }
    });
查看更多
登录 后发表回答