Producer-Consumer using Executor Service

2019-08-18 07:18发布

问题:

I am learning Executor service and trying to create producer-consumer scenario using Executor Service. I have defined producer and consumer with run method that keeps running till flag "isStopped" is not set false.

My Producer class has run method as follows

@Override
public void run() {
    while(!isStopped){
        MyTask task = new MyTask();
        System.out.println("I am thread "+this+" entering task "+ task);
        try {
            myBlockingQueue.addTask(task);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

And consumer looks

@Override
public void run() {
    while(!isStopped){
        System.out.println("I am thread "+this);
        try {
            MyTask task =  myBlockingQueue.removeTask();
            System.out.println("I am thread "+this+" retrieved task "+ task);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}

Both have method to set flag as

public void stopThread(){
    isStopped=true;
    this.interrupt();
}

I create 2 producers and 1 consumer as follows

public class MyExecutorTester {

public static void main(String[] args) throws InterruptedException, ExecutionException {
    MyBlockingQueue mbq = new MyBlockingQueue(5);
    ExecutorService es = Executors.newFixedThreadPool(4);
    MyProducerThread p1 = new MyProducerThread(mbq);
    MyProducerThread p2 = new MyProducerThread(mbq);
    MyConsumerThread c1 = new MyConsumerThread(mbq);
    es.execute(p1);
    es.execute(p2);
    es.execute(c1);

    es.shutdownNow();

    p1.stopThread();
    p2.stopThread();
    c1.stopThread();
}
}

Though es.shutdownNow() is called the producer and consumer keep running. I have to explicitly call stopThread. In that case producer stops but consumer which is waiting for queue to have task reamains stuck.

Now my questions are

1) Why shutdownNow() is not stopping all the threads

2) If I need to call the stop method myself; what is advantage of ExecutorServices

3) Can you please point to better implementation of producer-consumer using Exeutor; where producer & consumer keep running until stopped.

EDIT Based on the comments below, I handled the interruptedException in both the run() method as follows

@Override
public void run() {
    while(!isStopped ){
        System.out.println("I am thread "+this);
        try {
            MyTask task =  myBlockingQueue.removeTask();
            System.out.println("I am thread "+this+" retrieved task "+ task);
        } catch (InterruptedException e) {
            isStopped=true;
        }

    }
}

And

    @Override
public void run() {
    while(!isStopped){
        MyTask task = new MyTask();
        System.out.println("I am thread "+this+" entering task "+ task);
        try {
            myBlockingQueue.addTask(task);
        } catch (InterruptedException e) {
            isStopped=true;
        }

    }
}

Now I do not need to call stopThread() method. Calling es.shutdownNow() interrupts each thread. I set the flag to true. And thus all thread stop.

public static void main(String[] args) throws InterruptedException, ExecutionException {
    MyBlockingQueue mbq = new MyBlockingQueue(5);
    ExecutorService es = Executors.newFixedThreadPool(4);
    MyProducerThread p1 = new MyProducerThread(mbq);
    MyProducerThread p2 = new MyProducerThread(mbq);
    MyConsumerThread c1 = new MyConsumerThread(mbq);
    es.execute(p1);
    es.execute(p2);
    es.execute(c1);


    es.shutdownNow();


}