运行事件的NodeJS循环/等待子进程结束(Run NodeJS event loop / wait

2019-09-02 00:10发布

我第一次尝试对问题的总体描述,然后一些更详细的原因通常方法不起作用。 如果你想读这些抽象的解释下去。 最后,我解释了更大的问题和特定的应用程序,所以如果你宁愿读了,跳转到“实际应用”。

我使用的是Node.js的儿童过程中做一些计算密集型工作。 父进程做它的工作,但在执行过程中的某一点达到一个地步,必须有继续之前的子进程的信息。 因此,我正在寻找一种方式来等待子进程结束。

我目前的设置看起来有点像这样:

importantDataCalculator = fork("./runtime");
importantDataCalculator.on("message", function (msg) {
    if (msg.type === "result") {
        importantData = msg.data;
    } else if (msg.type === "error") {
        importantData = null;
    } else {
        throw new Error("Unknown message from dataGenerator!");
    }
});

和其他地方

function getImportantData() {
    while (importantData === undefined) {
        // wait for the importantDataGenerator to finish
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

所以,父进程启动时,它执行的代码中的第一位,产卵一个子进程来计算数据并继续做自己的工作位。 一旦时机成熟,它需要从子进程的结果,继续调用getImportantData() 这样的想法是, getImportantData()块直到数据进行计算。

然而,我所用的方式是行不通的。 我想这是因为我阻止事件循环从使用while循环执行。 而且,由于事件循环不从子进程执行没有消息可以被接收,因此while循环的条件不能改变,使之成为一个无限循环。

当然,我真的不希望使用这种while循环的。 我宁愿做的是告诉node.js的“执行事件循环的一个迭代,然后打电话给我。” 我会反复做,直到我需要的数据接收,然后继续我从吸气返回离开的地方执行。

我意识到他的姿势输入相同的功能几次的危险,但模块我想用这确实对事件循环几乎没有,除了等待子进程此消息,并发出其它消息报告它的进度,所以这不应该是一个问题。

有没有办法在执行Node.js的事件循环的只是一个迭代? 还是有另一种方式来实现类似的东西? 或者是有一个完全不同的方法来实现什么,我想在这里做什么?

我能想到的迄今唯一的解决办法是改变我介绍的另一种方法这样的方式计算。 在这种情况下,将有过程计算中的重要数据,过程计算这两个,这只是等待数据从两个子流程和联合不需要的重要数据,该数据的比特和父进程当他们到达的作品。 因为它没有做任何计算密集型工作本身,它只需等待事件循环事件(=信息),并做反应,在必要时转发合并后的数据并存储暂时还无法合并数据块。 然而,这又引入了另一个进程,并更进程间通信,它引入了更多的开销,这是我想避免的。

编辑

我看到,需要更多的细节。

父进程(我们称之为处理1)本身是另一个进程催生了一个进程(process 0)做了一些计算密集型工作。 实际上,它只是执行一些代码了,我就不用管了,所以我不能让它异步工作。 我能做什么(并且已经完成)是使该定期执行调用一个函数来报告它的进度和提供部分结果的代码。 这一进展报告,然后通过IPC发送回原本的处理。

但在少数情况下,部分结果是不正确的,所以他们不得不进行修改。 要做到这一点,我需要一些数据,我可以从正常计算独立计算。 但是,这种计算可能需要几秒钟; 因此,我开始另一个处理(处理2)执行此操作的计算,并提供结果的过程1,通过IPC消息。 现在处理1和2计算愉快那里的东西,并希望通过过程2中计算出的校正数据之前处理1需要它完成。 但有时的过程需要1早期的结果之一加以纠正,并在这种情况下,我不得不等待过程2完成其计算。 阻断过程1的事件循环理论上没有问题的,由于主进程(process 0)不会被受到它的影响。 唯一的问题是,通过防止代码执行进一步在过程1我也阻断事件循环,从而防止它从不断从过程2接收的结果。

所以,我需要以某种方式暂停的过程1代码的进一步执行,而不会阻塞事件循环。 我希望有一个像电话process.runEventLoopIteration执行事件循环的迭代,然后返回。

然后我会改变这样的代码:

function getImportantData() {
    while (importantData === undefined) {
        process.runEventLoopIteration();
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

从而执行所述的事件循环,直到已经收到的必要数据,但不能继续所述代码的执行调用getImportantData()。

基本上我在做的过程1是这样的:

function callback(partialDataMessage) {
    if (partialDataMessage.needsCorrection) {
        getImportantData();
        // use data to correct message
        process.send(correctedMessage); // send corrected result to main process
    } else {
        process.send(partialDataMessage); // send unmodified result to main process
    }
}

function executeCode(code) {
    run(code, callback); // the callback will be called from time to time when the code produces new data
    // this call is synchronous, run is blocking until the calculation is finished
    // so if we reach this point we are done
    // the only way to pause the execution of the code is to NOT return from the callback 
}

实际应用/实施/问题

我需要这种行为于以下应用。 如果你有更好的方法来实现这个随时提出它。

我想执行任意代码和什么变量它的变化,什么是所谓的功能,有什么异常发生等。我还需要在代码中的这些事件的位置,以便能够在下显示在界面上收集的信息的通知原代码。

要做到这一点,我的仪器代码,并插入回调到它。 然后我执行的代码,包裹在一个try-catch块的执行。 每当调用回调有关执行的一些数据(例如一个变量的变化),我将消息发送到主进程告诉它有关的变化。 通过这种方式,通知用户有关代码的执行,它运行时。 通过这些回调发生的事件的位置信息被添加到仪器中的回拨电话,所以这不是一个问题。

出现问题,发生异常时。 我也想通知有关的测试代码中的异常的用户。 因此,我包裹在一个try-catch代码的执行,并且走出执行的被捕获,并且发送到用户界面的任何异常。 但错误的位置不正确。 通过node.js中创建了一个错误的对象有一个完整的调用堆栈因此它知道它发生在哪里。 但这个位置,如果相对于改动的代码,所以我不能直接使用这些位置信息,显示错误的原代码旁边。 我需要在改动的代码将这一位置为位置的原代码。 要做到这一点,插装的代码后,我计算源地图在仪表代码位置映射到原代码的位置。 但是,这种计算可能需要几秒钟。 所以,我想,我会启动一个子进程来计算源图,而改动的代码的执行已经开始。 然后,当发生异常时,我检查是否源地图已经被计算,如果它不是我等计算到结束才能够正确的位置。

由于要执行的,看着完全可以任意的代码,我不能平凡重写它是异步的。 我只知道它调用提供的回调,因为我仪表的代码这样做。 我也不能只保存信息并返回到继续代码的执行,在下次调用源地图是否已经结束时检查回来,因为继续代码的执行也将阻止事件循环,防止计算源在执行过程中被接收到过地图。 或者,如果它被接收,然后后才执行的代码已经完全结束,这可能是相当晚婚或不(如果执行代码包含一个无限循环)。 但我收到sourceMap之前,我不能发送关于执行状态进一步更新。 相结合,这意味着我只能够要执行的代码完成(这可能是从来没有),这完全违背了程序的目的(使程序员观看这段代码的功能后,发送修正的进度消息,而它执行)。

暂时放弃控制权事件循环将解决这个问题。 然而,这似乎并不可能。 另一个想法我是引入的第三方法,它同时控制执行过程和sourceMapGeneration过程。 它从执行过程接收进度消息,并且如果所有消息的需要校正它等待sourceMapGeneration过程。 由于过程是独立的,所述控制方法可以存储所接收的消息,并等待sourceMapGeneration过程,同时执行处理继续执行,并且一旦它接收到的源映射,它校正所述消息,并把它们全部关闭。

然而,这不仅需要的另一种方法(开销),这也意味着我有更多的进程之间,并且由于代码可以有成千上万行的,这本身可能需要一些时间,一旦传输的代码,所以我想将它移到围绕尽可能少。

我希望这可以解释,为什么我不能和没有使用通常的“异步回调”的方法。

Answer 1:

添加第三个 (:))解决您的问题,您澄清后,你追求什么样的行为我建议使用光纤 。

纤维让你做协同例程中的NodeJS。 协程功能,允许多个入口/出口点。 这意味着你将能够得到控制,并恢复它,请你。

这里是一个sleep不同于正是这么做的,睡眠对于给定的时间量,并执行行动的官方文档功能。

function sleep(ms) {
    var fiber = Fiber.current;
    setTimeout(function() {
        fiber.run();
    }, ms);
    Fiber.yield();
}

Fiber(function() {
    console.log('wait... ' + new Date);
    sleep(1000);
    console.log('ok... ' + new Date);
}).run();
console.log('back in main');

你可以把该做的等待资源的功能的代码,使其产生,然后当任务完成后再次运行。

例如,从适应的问题你的例子:

var pausedExecution, importantData;
function getImportantData() {
    while (importantData === undefined) {
        pausedExecution = Fiber.current;
        Fiber.yield();
        pausedExecution = undefined;
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have proper data now
        return importantData;
    }
}

function callback(partialDataMessage) {
    if (partialDataMessage.needsCorrection) {
        var theData = getImportantData();
        // use data to correct message
        process.send(correctedMessage); // send corrected result to main process
    } else {
        process.send(partialDataMessage); // send unmodified result to main process
    }
}

function executeCode(code) {
    // setup child process to calculate the data
    importantDataCalculator = fork("./runtime");
    importantDataCalculator.on("message", function (msg) {
        if (msg.type === "result") {
            importantData = msg.data;
        } else if (msg.type === "error") {
            importantData = null;
        } else {
            throw new Error("Unknown message from dataGenerator!");
        }

        if (pausedExecution) {
            // execution is waiting for the data
            pausedExecution.run();
        }
    });


    // wrap the execution of the code in a Fiber, so it can be paused
    Fiber(function () {
        runCodeWithCallback(code, callback); // the callback will be called from time to time when the code produces new data
        // this callback is synchronous and blocking,
        // but it will yield control to the event loop if it has to wait for the child-process to finish
    }).run();
}

祝好运! 我总是说最好是在比解决3个问题以同样的方式3种方式来解决一个问题。 我很高兴我们能够制定出一些适合你的工作。 Admittingly,这是一个非常有趣的问题。



Answer 2:

异步编程的规则是,一旦你进入异步代码,您必须继续使用异步代码。 虽然可以继续通过一遍又一遍调用函数setImmediate或诸如此类的东西,你仍然有你想的问题return从一个异步过程。

不知道更多关于你的计划,我不能告诉你到底应该如何构建它,但总的来说,一直到“回归”的数据来自涉及异步代码是在回调中传递的过程; 或许这将让你在正确的轨道上:

function getImportantData(callback) {
    importantDataCalculator = fork("./runtime");
    importantDataCalculator.on("message", function (msg) {
        if (msg.type === "result") {
            callback(null, msg.data);
        } else if (msg.type === "error") {
            callback(new Error("Data could not be generated."));
        } else {
            callback(new Error("Unknown message from sourceMapGenerator!"));
        }
    });
}

那么你可以使用这样的此功能:

getImportantData(function(error, data) {
    if (error) {
        // handle the error somehow
    } else {
        // `data` is the data from the forked process
    }
});

我讲这一点更详细的我的截屏之一, 异步思考 。



Answer 3:

你正在运行到是很常见的场景,谁是开始熟练的NodeJS程序员经常挣扎。

你是正确的。 你不能做到这一点,你尝试的方式(循环)。

在node.js中的主要过程是单线程的,你阻塞事件循环。

解决这个最简单的方法是这样的:

function getImportantData() {
    if(importantData === undefined){ // not set yet
        setImmediate(getImportantData); // try again on the next event loop cycle
        return; //stop this attempt
    }

    if (importantData === null) {
        throw new Error("Data could not be generated.");
    } else {
        // we should have a proper data now
        return importantData;
    }
}

我们在做什么,是函数重新尝试处理使用事件循环的下一次迭代的数据setImmediate

这虽然引入了一个新的问题,你的函数返回一个值。 因为它没有准备好,你返回值是不确定的。 所以,你必须 被动编码。 你需要告诉你的代码,当数据到达该怎么办。

这是在节点通常完成了一个回调

function getImportantData(err,whenDone) {
    if(importantData === undefined){ // not set yet
        setImmediate(getImportantData.bind(null,whenDone)); // try again on the next event loop cycle
        return; //stop this attempt
    }

    if (importantData === null) {
        err("Data could not be generated.");
    } else {
        // we should have a proper data now
        whenDone(importantData);
    }
}

这可以通过以下方式使用

getImportantData(function(err){
    throw new Error(err); // error handling function callback
}, function(data){ //this is whenDone in our case
    //perform actions on the important data
})


Answer 4:

你的问题(更新)是非常有趣的,这似乎是密切相关的一个问题,我与异步捕获异常。 (也布兰登和Ihad我关于它的有趣的讨论!小小世界)

见这个问题如何捕获异常异步。 关键的概念是,你可以使用(假设的NodeJS 0.8及更高版本) 的NodeJS域限制一个例外的范围。

这将让您轻松获得异常的位置,因为你可以环绕异步块与atry/catch 。 我想,这应该在这里解决更大的问题。

你可以找到链接的问题相关的代码。 用法是这样的:

atry(function() {
    setTimeout(function(){
        throw "something";
    },1000);
}).catch(function(err){
    console.log("caught "+err);
});

既然你可以访问的范围atry ,你可以得到的堆栈跟踪有这将让你跳过更复杂的源地图的使用。

祝好运!



文章来源: Run NodeJS event loop / wait for child process to finish