我需要通过从Java标准输入传递消息,CLI PHP程序。 我想保持在一个池中运行约20 PHP的过程,例如,当我传递消息到池中时,它发送的每个消息,以一个单独的线程,保持消息队列被递送。 我想这些PHP程序尽可能长时间维持生命,带来了一个新的,如果一人死亡。 我看着一个静态线程池这样做,但它似乎更专为执行,只是死任务。 我怎么能做到这一点,一个简单的界面来传递消息到池? 请问我要实现自己的自定义“线程池”?
Answer 1:
我提供一些代码,使这是我认为这将让事情更清晰。 基本上你需要保持一个池的过程中物体周围。 体贴是每个流程都有,你需要以某种方式来管理输入,输出和错误流。 在我的例子我只是重定向错误并输出到主流程控制台。 您可以根据需要设置回调和处理程序来获得PHP程序的输出。 如果你是刚刚处理任务,并不在乎什么PHP接着说保留原样或重定向到一个文件中。
我使用的Apache的共享池库中ObjectPool的。 无需重新发明之一。
你必须运行你的PHP程序20个进程池。 仅凭这一点不会得到你所需要的。 您可能要处理针对这些过程的全部20个任务“在同一时间。” 所以,你还需要一个线程池,将拉过程从ObjectPool的。
您还需要了解的是,如果你杀死,或CTRL-C您的Java程序的init
过程将接管你的PHP程序,他们将只是坐在那里。 你可能想保留日志的PHP的所有PID的进程你产卵,然后清除它们,如果你重新运行Java程序。
public class StackOverflow_10037379 {
private static Logger sLogger = Logger.getLogger(StackOverflow_10037379.class.getName());
public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
private String mProcessToRun;
public CLIPoolableObjectFactory(String processToRun) {
mProcessToRun = processToRun;
}
@Override
public Process makeObject() throws Exception {
ProcessBuilder builder = new ProcessBuilder();
builder.redirectError(Redirect.INHERIT);
// I am being lazy, but really the InputStream is where
// you can get any output of the PHP Process. This setting
// will make it output to the current processes console.
builder.redirectOutput(Redirect.INHERIT);
builder.redirectInput(Redirect.PIPE);
builder.command(mProcessToRun);
return builder.start();
}
@Override
public boolean validateObject(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException ex) {
return true;
}
}
@Override
public void destroyObject(Process process) throws Exception {
// If PHP has a way to stop it, do that instead of destroy
process.destroy();
}
@Override
public void passivateObject(Process process) throws Exception {
// Should really try to read from the InputStream of the Process
// to prevent lock-ups if Rediret.INHERIT is not used.
}
}
public static class CLIWorkItem implements Runnable {
private ObjectPool<Process> mPool;
private String mWork;
public CLIWorkItem(ObjectPool<Process> pool, String work) {
mPool = pool;
mWork = work;
}
@Override
public void run() {
Process workProcess = null;
try {
workProcess = mPool.borrowObject();
OutputStream os = workProcess.getOutputStream();
os.write(mWork.getBytes(Charset.forName("UTF-8")));
os.flush();
// Because of the INHERIT rule with the output stream
// the console stream overwrites itself. REMOVE THIS in production.
Thread.sleep(100);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
} finally {
if (workProcess != null) {
try {
// Seriously.. so many exceptions.
mPool.returnObject(workProcess);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
}
}
}
}
}
public static void main(String[] args) throws Exception {
// Change the 5 to 20 in your case.
// Also change mock_php.exe to /usr/bin/php or wherever.
ObjectPool<Process> pool =
new GenericObjectPool<>(
new CLIPoolableObjectFactory("mock_php.exe"), 5);
// This will only allow you to queue 100 work items at a time. I would suspect
// that if you only want 20 PHP processes running at a time and this queue
// filled up you'll need to implement some other strategy as you are doing
// more work than PHP can keep up with. You'll need to block at some point
// or throw work away.
BlockingQueue<Runnable> queue =
new ArrayBlockingQueue<>(100, true);
ThreadPoolExecutor executor =
new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
// print some stuff out.
executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
executor.shutdown();
executor.awaitTermination(4000, TimeUnit.HOURS);
pool.close();
}
}
程序运行的输出:
12172 - Message 2
10568 - Message 1
4804 - Message 3
11916 - Message 4
11116 - Message 5
12172 - Message 6
4804 - Message 7
10568 - Message 8
11916 - Message 9
11116 - Message 10
12172 - Message 11
的C ++代码程序只是输出什么输入:
#include <windows.h>
#include <iostream>
#include <string>
int main(int argc, char* argv[])
{
DWORD pid = GetCurrentProcessId();
std::string line;
while (true) {
std::getline (std::cin, line);
std::cout << pid << " - " << line << std::endl;
}
return 0;
}
更新
抱歉耽搁了。 这里是一个JDK版本6兴趣的人。 你必须运行一个单独的线程从过程的InputStream读入所有的输入。 我给自己定的代码了产卵沿侧一个新的线程每一个新的进程。 该线程总是从过程只要它是活的阅读。 而不是直接输出到文件中我把它使得它使用的日志框架。 这样,你可以设置一个日志记录配置去一个文件,滚动,去安慰等而不会被硬编码到一个文件中。
你会注意到我只启动一个gobbler之。因为即使一个进程有输出和错误的每个过程。 我stderr重定向到标准输出,只是为了让事情变得更容易。 显然,只有JDK6支持这种类型的重定向。
public class StackOverflow_10037379_jdk6 {
private static Logger sLogger = Logger.getLogger(StackOverflow_10037379_jdk6.class.getName());
// Shamelessy taken from Google and modified.
// I don't know who the original Author is.
public static class StreamGobbler extends Thread {
InputStream is;
Logger logger;
Level level;
StreamGobbler(String logName, Level level, InputStream is) {
this.is = is;
this.logger = Logger.getLogger(logName);
this.level = level;
}
public void run() {
try {
InputStreamReader isr = new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
String line = null;
while ((line = br.readLine()) != null) {
logger.log(level, line);
}
} catch (IOException ex) {
logger.log(Level.SEVERE, "Failed to read from Process.", ex);
}
logger.log(
Level.INFO,
String.format("Exiting Gobbler for %s.", logger.getName()));
}
}
public static class CLIPoolableObjectFactory extends BasePoolableObjectFactory<Process> {
private String mProcessToRun;
public CLIPoolableObjectFactory(String processToRun) {
mProcessToRun = processToRun;
}
@Override
public Process makeObject() throws Exception {
ProcessBuilder builder = new ProcessBuilder();
builder.redirectErrorStream(true);
builder.command(mProcessToRun);
Process process = builder.start();
StreamGobbler loggingGobbler =
new StreamGobbler(
String.format("process.%s", process.hashCode()),
Level.INFO,
process.getInputStream());
loggingGobbler.start();
return process;
}
@Override
public boolean validateObject(Process process) {
try {
process.exitValue();
return false;
} catch (IllegalThreadStateException ex) {
return true;
}
}
@Override
public void destroyObject(Process process) throws Exception {
// If PHP has a way to stop it, do that instead of destroy
process.destroy();
}
@Override
public void passivateObject(Process process) throws Exception {
// Should really try to read from the InputStream of the Process
// to prevent lock-ups if Rediret.INHERIT is not used.
}
}
public static class CLIWorkItem implements Runnable {
private ObjectPool<Process> mPool;
private String mWork;
public CLIWorkItem(ObjectPool<Process> pool, String work) {
mPool = pool;
mWork = work;
}
@Override
public void run() {
Process workProcess = null;
try {
workProcess = mPool.borrowObject();
OutputStream os = workProcess.getOutputStream();
os.write(mWork.getBytes(Charset.forName("UTF-8")));
os.flush();
// Because of the INHERIT rule with the output stream
// the console stream overwrites itself. REMOVE THIS in production.
Thread.sleep(100);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
} finally {
if (workProcess != null) {
try {
// Seriously.. so many exceptions.
mPool.returnObject(workProcess);
} catch (Exception ex) {
sLogger.log(Level.SEVERE, null, ex);
}
}
}
}
}
public static void main(String[] args) throws Exception {
// Change the 5 to 20 in your case.
ObjectPool<Process> pool =
new GenericObjectPool<Process>(
new CLIPoolableObjectFactory("mock_php.exe"), 5);
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(100, true);
ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20, 1, TimeUnit.HOURS, queue);
// print some stuff out.
executor.execute(new CLIWorkItem(pool, "Message 1\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 2\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 3\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 4\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 5\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 6\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 7\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 8\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 9\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 10\r\n"));
executor.execute(new CLIWorkItem(pool, "Message 11\r\n"));
executor.shutdown();
executor.awaitTermination(4000, TimeUnit.HOURS);
pool.close();
}
}
产量
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 3
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 2
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 1
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 4
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 5
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8868 - Message 8
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 10
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 8776 - Message 9
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 10096 - Message 6
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 9440 - Message 7
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: 6100 - Message 11
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.295131993.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.756434719.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.332711452.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1981440623.
Apr 11, 2012 8:41:02 PM stackoverflow.StackOverflow_10037379_jdk6$StreamGobbler run
INFO: Exiting Gobbler for process.1043636732.
Answer 2:
这里最好的办法是使用了pcntl功能派生进程,但进程之间的通信是困难的。 我建议建立一个队列,你的进程可以读出,而不是试图将消息传递给命令行。
魔豆有,你可以用它来处理进程之间的消息传递几个PHP客户。