如何处理Java中多个数据流?(How do I handle multiple streams i

2019-09-23 02:28发布

我试图运行一个过程,做的东西与它的输入,输出和错误流。 最显而易见的方法做,这是使用类似select()但我能找到在Java中,做的唯一的事情是Selector.select()这需要一个Channel 。 这似乎并不可能得到一个ChannelInputStreamOutputStreamFileStreamgetChannel()方法,但是,这并不在这里帮助)

所以,相反我写了一些代码来查询所有流:

while( !out_eof || !err_eof )
{
    while( out_str.available() )
    {
        if( (bytes = out_str.read(buf)) != -1 )
        {
            // Do something with output stream
        }
        else
            out_eof = true;
    }
    while( err_str.available() )
    {
        if( (bytes = err_str.read(buf)) != -1 )
        {
            // Do something with error stream
        }
        else
            err_eof = true;
    }
    sleep(100);
}

它的工作原理,但它永远不会终止。 当一个流到达文件的末尾, available()返回零,以便read()不被调用,我们永远无法得到回报-1将指示EOF。

一个解决办法是检测EOF非阻塞方式。 我看不出一个在任何地方的文档。 或者有没有做我想要做的更好的办法?

我看到这个问题在这里: 链接文本 ,虽然它并不完全做我想做的,我也许可以使用这个想法,产卵单独的线程为每个数据流,为特定的问题,我现在有。 但可以肯定是不这样做的唯一途径? 当然,必须有来自多个流,不会使用每个线程读取的方式?

Answer 1:

正如你所说,该解决方案在这个答案概括是从进程读取标准输出和标准错误的传统方式。 一个线程每个流是要走的路,即使它是有点恼人。



Answer 2:

事实上,你将不得不去产卵一个线程要监视每个流的路线。 如果你的使用情况允许合并标准输出和问题,你只需要一个线程进程的标准错误,否则需要两个。

我花了相当长的一段时间,让它在我们的项目中,我必须启动一个外部进程,采取其产量和用它做的东西,而在同一时间寻找错误和进程终止,也能够终止它的一个正确当Java应用程序的用户取消该操作。

我创建了一个相当简单的类来封装的鉴赏部分,其run()方法看起来是这样的:

public void run() {
    BufferedReader tStreamReader = null;
    try {
        while (externalCommand == null && !shouldHalt) {
            logger.warning("ExtProcMonitor("
                           + (watchStdErr ? "err" : "out")
                           + ") Sleeping until external command is found");
            Thread.sleep(500);
        }
        if (externalCommand == null) {
            return;
        }
        tStreamReader =
                new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                        : externalCommand.getInputStream()));
        String tLine;
        while ((tLine = tStreamReader.readLine()) != null) {
            logger.severe(tLine);
            if (filter != null) {
                if (filter.matches(tLine)) {
                    informFilterListeners(tLine);
                    return;
                }
            }
        }
    } catch (IOException e) {
        logger.logExceptionMessage(e, "IOException stderr");
    } catch (InterruptedException e) {
        logger.logExceptionMessage(e, "InterruptedException waiting for external process");
    } finally {
        if (tStreamReader != null) {
            try {
                tStreamReader.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

在通话方面,它看起来是这样的:

    Thread tExtMonitorThread = new Thread(new Runnable() {

        public void run() {
            try {
                while (externalCommand == null) {
                    getLogger().warning("Monitor: Sleeping until external command is found");
                    Thread.sleep(500);
                    if (isStopRequested()) {
                        getLogger()
                                .warning("Terminating external process on user request");
                        if (externalCommand != null) {
                            externalCommand.destroy();
                        }
                        return;
                    }
                }
                int tReturnCode = externalCommand.waitFor();
                getLogger().warning("External command exited with code " + tReturnCode);
            } catch (InterruptedException e) {
                getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
            }
        }
    }, "ExtCommandWaiter");

    ExternalProcessOutputHandlerThread tExtErrThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
    ExternalProcessOutputHandlerThread tExtOutThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
    tExtMonitorThread.start();
    tExtOutThread.start();
    tExtErrThread.start();
    tExtErrThread.setFilter(new FilterFunctor() {

        public boolean matches(Object o) {
            String tLine = (String)o;
            return tLine.indexOf("Error") > -1;
        }
    });

    FilterListener tListener = new FilterListener() {
        private boolean abortFlag = false;

        public boolean shouldAbort() {
            return abortFlag;
        }

        public void matched(String aLine) {
            abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
        }

    };

    tExtErrThread.addFilterListener(tListener);
    externalCommand = new ProcessBuilder(aCommand).start();
    tExtErrThread.setProcess(externalCommand);
    try {
        tExtMonitorThread.join();
        tExtErrThread.join();
        tExtOutThread.join();
    } catch (InterruptedException e) {
        // when this happens try to bring the external process down 
        getLogger().severe("Aborted because auf InterruptedException.");
        getLogger().severe("Killing external command...");
        externalCommand.destroy();
        getLogger().severe("External command killed.");
        externalCommand = null;
        return -42;
    }
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();

    externalCommand = null;
    try {
        getLogger().warning("command exit code: " + tRetVal);
    } catch (IllegalThreadStateException ex) {
        getLogger().warning("command exit code: unknown");
    }
    return tRetVal;

不幸的是,我没有为一个自包含的可运行的例子,但也许这会有所帮助。 如果我不得不做一遍我会再看看使用了Thread.interrupt()方法,而不是自制的停车标志(头脑声明它挥发!),但我留到另一个时间。 :)



文章来源: How do I handle multiple streams in Java?
标签: java process io