Producer Consumer using threads

2019-12-16 19:42发布

问题:

I’m writing a program that implements the Producer Consumer problem in Java using multithreading concepts. Below are few details how I’m supposed to do it:

1) The main thread should create a buffer with capacity specified as a command line argument. The number of producer and consumer threads are also specified as command line arguments. I’m supposed to assign a unique number to each producer and consumer thread. How do I assign a unique number to producer and consumer threads?

2) The producer thread operates in an infinite loop. It produces a data item (a string) with the following format: <producer number>_<data item number>. For example the 1st data item from thread number 1 will be 1_1 and second data item from thread number 3 will be 3_2. How do create data items in such a format?

3) Then the Producer thread writes an entry into the producer log file (< producer number > “Generated” <data item>). Upon writing the log entry, it attempts to insert into the buffer. If insertion is successful, it creates an entry into the log file (<producer number> <data item> “Insertion successful”). How do I write such a code?

Below is the Java code I wrote.

import java.util.*;
import java.util.logging.*;

public class PC2
{
    public static void main(String args[])
    {
            ArrayList<Integer> queue = new ArrayList<Integer>();

            int size = Integer.parseInt(args[2]);
            Thread[] prod = new Thread[Integer.parseInt(args[0])];
            Thread[] cons = new Thread[Integer.parseInt(args[1])];

            for(int i=0; i<prod.length; i++)
            {
                    prod[i] = new Thread(new Producer(queue, size));
                    prod[i].start();
            }

            for(int i=0; i<cons.length; i++)
            {
                    cons[i] = new Thread(new Consumer(queue, size));
                    cons[i].start();
                }

    }
}

class Producer extends Thread
{
    private final ArrayList<Integer> queue;
    private final int size;

    public Producer(ArrayList<Integer> queue, int size)
    {
            this.queue = queue;
            this.size = size;
    }

    public void run()
    {
            while(true){
            for(int i=0; i<size; i++)
            {
                    System.out.println("Produced: "+i+" by id " +Thread.currentThread().getId());
try
                    {
                            produce(i);
                            Thread.sleep(3000);
                    }
                    catch(Exception e)
                    {
                            Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
                    }
            }}
    }


    public void produce(int i) throws InterruptedException
    {
            while(queue.size() == size)
            {
                    synchronized(queue)
                    {
                            System.out.println("Queue is full "+Thread.currentThread().getName() +" is waiting, size: "+queue.size());
                            queue.wait();
                       }
            }
            synchronized(queue)
            {
                    queue.add(i);
                    queue.notifyAll();
            }
    }
}

class Consumer extends Thread
{
    private final ArrayList<Integer> queue;
    private final int size;

    public Consumer(ArrayList<Integer> queue, int size)
    {
            this.queue = queue;
            this.size = size;
    }

    public void run()
    {
            while(true)
            {
                    try
                    {       System.out.println("Consumed: "+consume());
                            Thread.sleep(1000);
                    }
                    catch(Exception e)
                    {
                            Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e);
                    }
            }
    }

    public int consume() throws InterruptedException
    {
            while(queue.isEmpty())
            {
                    synchronized(queue)
                    {
                            System.out.println("Queue is empty "+Thread.currentThread().getName()+" is waiting, size: "+queue.size());
                            queue.wait();
                        }
            }

            synchronized (queue)
            {
                    queue.notifyAll();
                    System.out.println("Consumed by id "+Thread.currentThread().getId());
                    return (Integer) queue.remove(0);

            }
    }
}

How can I carry out the above steps?

回答1:

I’m supposed to assign a unique number to each producer and consumer thread. How do I assign a unique number to producer and consumer threads?

Add an instance (non-static) variable to the Producer/Consumer classes. When you initialize the new Producer/Consumer Objects, pass in the unique number. You can keep track of what number you're on with an int counter in your main class.

2) The producer thread operates in an infinite loop. It produces a data item (a string) with the following format: < producer number >_< data item number > . For example the 1st data item from thread number 1 will be 1_1 and second data item from thread number 3 will be 3_2. How do create data items in such a format?

Use synchronized methods and/or atomic variables. Look into Java Concurrency.

3) Then the Producer thread writes an entry into the producer log file (< producer number > “Generated” < data item >). Upon writing the log entry, it attempts to insert into the buffer. If insertion is successful, it creates an entry into the log file (< producer number > < data item > “Insertion successful”). How do I write such a code?

My answer is the same as the previous question: read about Java concurrency. Spend an hour reading about synchronization, locks, and atomic variables and I guarantee you will easily write your program.



回答2:

For producer consumer problem best solution is BlockingQueue. I was testing a few things so designed same kind of program now modified it as per your need.

See if it helps.

import java.util.concurrent.*;
public class ThreadingExample {

    public static void main(String args[]){
        BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(100);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(blockingQueue));
        exec.execute(new Consumer(blockingQueue));
    }

}
class Message{
    private static int count=0;
    int messageId;
    Message(){
        this.messageId=count++;
        System.out.print("message Id"+messageId+" Created ");
    }
}
class Producer implements Runnable{

    private BlockingQueue<Message> blockingQueue;
    Producer(BlockingQueue<Message> blockingQueue){
        this.blockingQueue=blockingQueue;
    }

    @Override
    public void run(){
        while(!Thread.interrupted()){
            System.out.print("Producer Started");
            try {
                blockingQueue.put(new Message());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Producer Done");
        }
    }
}

class Consumer implements Runnable{
    private BlockingQueue<Message> blockingQueue;
    Consumer(BlockingQueue<Message> blockingQueue){
        this.blockingQueue=blockingQueue;
    }

    @Override
    public void run(){
        while(!Thread.interrupted()){
            System.out.print("Concumer Started");
            try{
                Message message  = blockingQueue.take();
                System.out.print("message Id"+message.messageId+" Consumed ");
            }
            catch(InterruptedException e){
                e.printStackTrace();
            }
            System.out.println("Concumer Done");
        }
    }
}


回答3:

I tried the following which might work for you, except for the buffer condition on 3, which you can add the part of the code by yourself. Hope this helps.

public class Message {

    private String msg;

    public Message(String msg) {
        super();
        this.msg = msg;
    }

    public String getMsg(){
        return msg;
    }
}




import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue<Message> queue;
    private boolean run = true;

    public Producer(BlockingQueue<Message> queue) {
        super();
        this.queue = queue;
    }

    public void setRun(boolean val) {
        this.run = val;
    }

    @Override
    public void run() {
        int i = 0;
        while (run) {
            Message msg = new Message(Thread.currentThread().getName() + "_"+ i);
            try {
                Thread.sleep(i * 100);
                queue.put(msg);
                System.out.println("Producer: "+Thread.currentThread().getName()+" produced and added to the queue: "+msg.getMsg());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            i++;
            if(i==10){
                setRun(false);
                System.out.println(Thread.currentThread().getName()+" stopped");
            }
        }

    }
}



import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable{

    private BlockingQueue<Message> queue;
    private boolean run = true;

    public Consumer(BlockingQueue<Message> queue) {
        super();
        this.queue = queue;
    }

    public void setRun(boolean val){
        this.run = val;
    }

    @Override
    public void run() {
        while(run){
            try {
                Thread.sleep(100);
                Message msg = queue.take();
                System.out.println("Consumer: "+Thread.currentThread().getName()+"         generated/consumed "+msg.getMsg());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }
  }




import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerMain {

    public static void main(String[] args) {
        System.out
                .println("please enter the number of producer:consumer:size of the queue in order");

        Scanner scan = new Scanner(System.in);

        Thread[] prodThreads = new Thread[scan.nextInt()];
        Thread[] consThreads = new Thread[scan.nextInt()];
        BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(scan.nextInt());

        for (int i = 0; i < prodThreads.length; i++) {
            prodThreads[i] = new Thread(new Producer(queue), "" + i);
            prodThreads[i].start();
        }

        for (int i = 0; i < consThreads.length; i++) {
            consThreads[i] = new Thread(new Consumer(queue), "" + i);
            consThreads[i].start();
        }


    }

}


回答4:

Please refer the below code. You can change the constant values based on the command line arguments. I have tested the code, its working as per your requirement.

import java.util.LinkedList;
import java.util.Queue;

public class ProducerConsumerProblem {
    public static int CAPACITY = 10; // At a time maximum of 10 tasks can be
                                        // produced.
    public static int PRODUCERS = 2;
    public static int CONSUMERS = 4;

    public static void main(String args[]) {
        Queue<String> mTasks = new LinkedList<String>();
        for (int i = 1; i <= PRODUCERS; i++) {
            Thread producer = new Thread(new Producer(mTasks));
            producer.setName("Producer " + i);
            producer.start();
        }
        for (int i = 1; i <= CONSUMERS; i++) {
            Thread consumer = new Thread(new Consumer(mTasks));
            consumer.setName("Consumer " + i);
            consumer.start();
        }

    }

}

class Producer implements Runnable {

    Queue<String> mSharedTasks;
    int taskCount = 1;

    public Producer(Queue<String> mSharedTasks) {
        super();
        this.mSharedTasks = mSharedTasks;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (mSharedTasks) {
                try {
                    if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY) {
                        System.out.println("Producer Waiting!!");
                        mSharedTasks.wait();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            while (mSharedTasks.size() != ProducerConsumerProblem.CAPACITY) {

                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                }

                String produceHere = Thread.currentThread().getName()
                        + "_Item number_" + taskCount++;

                synchronized (mSharedTasks) {
                    mSharedTasks.add(produceHere);
                    System.out.println(produceHere);
                    if (mSharedTasks.size() == 1) {
                        mSharedTasks.notifyAll(); // Informs consumer that there
                                                    // is something to consume.
                    }
                }
            }

        }
    }
}

class Consumer implements Runnable {
    Queue<String> mSharedTasks;

    public Consumer(Queue<String> mSharedTasks) {
        super();
        this.mSharedTasks = mSharedTasks;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (mSharedTasks) {
                if (mSharedTasks.isEmpty()) { // Checks whether there is no task
                                                // to consume.
                    try {
                        mSharedTasks.wait(); // Waits for producer to produce!
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }

            }
            while (!mSharedTasks.isEmpty()) { // Consumes till task list is
                                                // empty
                try {
                    // Consumer consumes late hence producer has to wait...!
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                synchronized (mSharedTasks) {

                    System.out.println(Thread.currentThread().getName()
                            + " consumed " + mSharedTasks.poll());
                    if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY - 1)
                        mSharedTasks.notifyAll();
                }

            }

        }
    }

}


回答5:

public class ProducerConsumerTest {

    public static void main(String[] args) {
        CubbyHole c = new CubbyHole();
        Producer p1 = new Producer(c, 1);
        Consumer c1 = new Consumer(c, 1);
        p1.start();
        c1.start();
    }
}

class CubbyHole {

    private int contents;
    private boolean available = false;

    public synchronized int get() {
        while (available == false) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        available = false;
        notifyAll();
        return contents;
    }

    public synchronized void put(int value) {
        while (available == true) {
            try {
                wait();
            } catch (InterruptedException e) {
            }
        }
        contents = value;
        available = true;
        notifyAll();
    }
}

class Consumer extends Thread {

    private CubbyHole cubbyhole;
    private int number;

    public Consumer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        int value = 0;
        for (int i = 0; i < 10; i++) {
            value = cubbyhole.get();
            System.out.println("Consumer #"
                    + this.number
                    + " got: " + value);
        }
    }
}

class Producer extends Thread {

    private CubbyHole cubbyhole;
    private int number;

    public Producer(CubbyHole c, int number) {
        cubbyhole = c;
        this.number = number;
    }

    public void run() {
        for (int i = 0; i < 10; i++) {
            cubbyhole.put(i);
            System.out.println("Producer #" + this.number
                    + " put: " + i);
            try {
                sleep((int) (Math.random() * 100));
            } catch (InterruptedException e) {
            }
        }
    }
}