How to lazily create tasks for use by a Java threa

2019-07-04 06:51发布

问题:

I'm writing a load-testing application in Java, and have a thread pool that executes tasks against the server under test. So to make 1000 jobs and run them in 5 threads I do something like this:

    ExecutorService pool = Executors.newFixedThreadPool(5);
    List<Runnable> jobs = makeJobs(1000);
    for(Runnable job : jobs){
        pool.execute(job);
    }

However I don't think this approach will scale very well, because I have to make all the 'job' objects ahead of time and have them sitting in memory until they are needed.

I'm looking for a way to have the threads in the pool go to some kind of 'JobFactory' class each time they need a new job, and for the factory to build Runnables on request until the required number of jobs have been run. The factory could maybe start returning 'null' to signal to the threads that there is no more work to do.

I could code something like this up by hand, but it seems like a common enough use-case and was wondering if there was anything in the wonderful but complex 'java.util.concurrent' package that I could use instead?

回答1:

You can do all the work in the executing threads of the thread pools using an AtomicInteger to monitor the number of runnables executed

 int numberOfParties = 5;
 AtomicInteger numberOfJobsToExecute = new AtomicInteger(1000);
 ExecutorService pool = Executors.newFixedThreadPool(numberOfParties );
 for(int i =0; i < numberOfParties; i++){
     pool.submit(new Runnable(){
        public void run(){
            while(numberOfJobsToExecute.decrementAndGet() >= 0){
                makeJobs(1).get(0).run();
            }
        }
     });
 }

You can also store the returned Future's in a List and get() on them to await completion (among other mechanisms)



回答2:

Hrm. You could create a BlockingQueue<Runnable> with a fixed capacity and have each of your worker threads dequeue a Runnable and run it. Then you could have a producer thread which is what puts the jobs into the queue.

Main thread would do something like:

// 100 is the capacity of the queue before blocking
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(100);
// start the submitter thread
new Thread(new JobSubmitterThread(queue)).start();
// make in a loop or something?
new Thread(new WorkerThread(queue)).start();
new Thread(new WorkerThread(queue)).start();
...

The worker would look something like:

public class WorkerThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         // run until the main thread shuts it down using volatile boolean or ...
         while (!shutdown) {
             Runnable job = queue.take();
             job.run();
         }
     }
}

And the job submitter would look something like:

 public class JobSubmitterThread implements Runnable {
     private final BlockingQueue<Runnable> queue;
     public WorkerThread(BlockingQueue<Runnable> queue) {
         this.queue = queue;
     }
     public void run() {
         for (int jobC = 0; jobC < 1000; jobC++) {
             Runnable job = makeJob();
             // this would block when the queue reaches capacity
             queue.put(job);
         }
     }
 }