producer - consumer multithreading in Java

2020-07-18 05:39发布

问题:

I want to write program using multithreading wait and notify methods in Java.
This program has a stack (max-length = 5). Producer generate number forever and put it in the stack, and consumer pick it from stack.

When stack is full producer must wait and when stack is empty consumers must wait.
The problem is that it runs just once, I mean once it produce 5 number it stops but i put run methods in while(true) block to run nonstop able but it doesn't.
Here is what i tried so far.
Producer class:

package trail;
import java.util.Random;
import java.util.Stack;

public class Thread1 implements Runnable {
    int result;
    Random rand = new Random();
    Stack<Integer> A = new Stack<>();

    public Thread1(Stack<Integer> A) {
        this.A = A;
    }

    public synchronized void produce()
    {
        while (A.size() >= 5) {
            System.out.println("List is Full");
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        result = rand.nextInt(10);

        System.out.println(result + " produced ");
        A.push(result);
        System.out.println(A);

        this.notify();
    }

    @Override
    public void run() {
        System.out.println("Producer get started");

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            produce();
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

And the consumer:

package trail;

import java.util.Stack;

public class Thread2 implements Runnable {
    Stack<Integer> A = new Stack<>();

    public Thread2(Stack<Integer> A) {
        this.A = A;
    }

    public synchronized void consume() {
        while (A.isEmpty()) {
            System.err.println("List is empty" + A + A.size());
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.err.println(A.pop() + " Consumed " + A);
        this.notify();
    }

    @Override
    public void run() {
        System.out.println("New consumer get started");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            consume();
        }
    }
}

and here is the main method:

public static void main(String[] args) {

        Stack<Integer> stack = new Stack<>();

        Thread1 thread1 = new Thread1(stack);// p
        Thread2 thread2 = new Thread2(stack);// c
        Thread A = new Thread(thread1);
        Thread B = new Thread(thread2);
        Thread C = new Thread(thread2);
        A.start();

        B.start();
        C.start();     
    }

回答1:

I think it will be better for understanding and dealing with synchronisation in general if you try to separate three things which are currently mixed:

  1. Task which is going to do the actual job. Names for classes Thread1 & Thread2 are misleading. They are not Thread objects, but they are actually jobs or tasks implementing Runnable interface you are giving to Thread objects.

  2. Thread object itself which you are creating in main

  3. Shared object which encapsulates synchronised operations/logic on a queue, a stack etc. This object will be shared between tasks. And inside this shared object you will take care of add/remove operations (either with synchronized blocks or synchronized methods). Currently (as it was pointed out already), synchronization is done on a task itself (i.e. each task waits and notifies on its own lock and nothing happens). When you separate concerns, i.e. let one class do one thing properly it will eventually become clear where is the problem.



回答2:

Your consumer and you producer are synchronized on different objects and do not block each other. If this works, I daresay it's accidental.

Read up on java.util.concurrent.BlockingQueue and java.util.concurrent.ArrayBlockingQueue. These provide you with more modern and easier way to implement this pattern.

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html



回答3:

You should synchronize on the stack instead of putting it at the method level try this code.

Also don't initalize the stack in your thread classes anyways you are passing them in the constructor from the main class, so no need of that.

Always try to avoid mark any method with synchronized keyword instead of that try to put critical section of code in the synchronized block because the more size of your synchronized area more it will impact on performance.

So, always put only that code into synchronized block that need thread safety.

Producer Code :

public void produce() {
    synchronized (A) {
        while (A.size() >= 5) {
            System.out.println("List is Full");
            try {
                A.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        result = rand.nextInt(10);

        System.out.println(result + " produced ");
        A.push(result);
        System.out.println("stack ---"+A);

        A.notifyAll();
    }
}

Consumer Code :

public void consume() {
    synchronized (A) {
        while (A.isEmpty()) {
            System.err.println("List is empty" + A + A.size());
            try {
                System.err.println("wait");
                A.wait();

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.err.println(A.pop() + " Consumed " + A);
        A.notifyAll();
    }
}


回答4:

Try this:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CircularArrayQueue<T> {

    private volatile Lock rwLock = new ReentrantLock();
    private volatile Condition emptyCond = rwLock.newCondition();
    private volatile Condition fullCond = rwLock.newCondition();

    private final int size;

    private final Object[] buffer;
    private volatile int front;
    private volatile int rare;

    /**
     * @param size
     */
    public CircularArrayQueue(int size) {
        this.size = size;
        this.buffer = new Object[size];
        this.front = -1;
        this.rare = -1;
    }

    public boolean isEmpty(){
        return front == -1;
    }

    public boolean isFull(){
        return (front == 0 && rare == size-1) || (front == rare + 1);
    }

    public void enqueue(T item){
        try {
            // get a write lock
            rwLock.lock();
            // if the Q is full, wait the write lock
            if(isFull())
                fullCond.await();

            if(rare == -1){
                rare = 0;
                front = 0;
            } else if(rare == size - 1){
                rare = 0;
            } else {
                rare ++;
            }

            buffer[rare] = item;
            //System.out.println("Added\t: " + item);

            // notify the reader
            emptyCond.signal();
        } catch(InterruptedException e){
            e.printStackTrace();
        } finally {
            // unlock the write lock
            rwLock.unlock();
        }

    }

    public T dequeue(){
        T item = null;
        try{
            // get the read lock
            rwLock.lock();
            // if the Q is empty, wait the read lock
            if(isEmpty())
                emptyCond.await();

            item = (T)buffer[front];
            //System.out.println("Deleted\t: " + item);
            if(front == rare){
                front = rare = -1;
            } else if(front == size - 1){
                front = 0;
            } else {
                front ++;
            }

            // notify the writer
            fullCond.signal();

        } catch (InterruptedException e){
            e.printStackTrace();
        } finally{
            // unlock read lock
            rwLock.unlock();
        }
        return item;
    }
}


回答5:

You can use Java's awesome java.util.concurrent package and its classes.

You can easily implement the producer consumer problem using the BlockingQueue. A BlockingQueue already supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

Without BlockingQueue, every time we put data to queue at the producer side, we need to check if queue is full, and if full, wait for some time, check again and continue. Similarly on the consumer side, we would have to check if queue is empty, and if empty, wait for some time, check again and continue. However with BlockingQueue we don’t have to write any extra logic than to just add data from Producer and poll data from Consumer.

Read more From:

http://javawithswaranga.blogspot.in/2012/05/solving-producer-consumer-problem-in.html

http://www.javajee.com/producer-consumer-problem-in-java-using-blockingqueue



回答6:

use BlockingQueue,LinkedBlockingQueue this was really simple. http://developer.android.com/reference/java/util/concurrent/BlockingQueue.html



回答7:

package javaapplication;

import java.util.Stack;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ProducerConsumer {

    public static Object lock = new Object();
    public static Stack stack = new Stack();

    public static void main(String[] args) {
        Thread producer = new Thread(new Runnable() {
            int i = 0;

            @Override
            public void run() {
                do {
                    synchronized (lock) {

                        while (stack.size() >= 5) {
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                            }
                        }
                        stack.push(++i);
                        if (stack.size() >= 5) {
                            System.out.println("Released lock by producer");
                            lock.notify();
                        }
                    }
                } while (true);

            }

        });

        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                do {
                    synchronized (lock) {
                        while (stack.empty()) {
                            try {
                                lock.wait();
                            } catch (InterruptedException ex) {
                                Logger.getLogger(ProdCons1.class.getName()).log(Level.SEVERE, null, ex);
                            }
                        }

                        while(!stack.isEmpty()){
                            System.out.println("stack : " + stack.pop());
                        }

                        lock.notifyAll();
                    }
                } while (true);
            }
        });

        producer.start();

        consumer.start();

    }

}


回答8:

Have a look at this code example:

import java.util.concurrent.*;
import java.util.Random;

public class ProducerConsumerMulti {
    public static void main(String args[]){
        BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

        Thread prodThread  = new Thread(new Producer(sharedQueue,1));
        Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
        Thread consThread2 = new Thread(new Consumer(sharedQueue,2));

        prodThread.start();
        consThread1.start();
        consThread2.start();
    } 
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    private Random rng;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
        this.rng = new Random();
    }
    @Override
    public void run() {
        while(true){
            try {
                int number = rng.nextInt(100);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
                Thread.sleep(100);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }

    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
                Thread.sleep(100);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

Notes:

  1. Started one Producer and two Consumers as per your problem statement
  2. Producer will produce random numbers between 0 to 100 in infinite loop
  3. Consumer will consume these numbers in infinite loop
  4. Both Producer and Consumer share lock free and Thread safe LinkedBlockingQueue which is Thread safe. You can remove wait() and notify() methods if you use these advanced concurrent constructs.


回答9:

Seems like you skipped something about wait(), notify() and synchronized. See this example, it should help you.