How to solve the producer-consumer using semaphore

2020-02-10 10:32发布

问题:

I need to code a problem similar to producer-consumer, that must use semaphores. I tried a couple of solutions and none of them worked. First I tried a solution on Wikipedia and it didn't worked. My current code is something like that:

Method run of the consumer:

    public void run() {
    int i=0;
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    String s = new String();
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        try {
            Thread.sleep(1000);///10000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.encheBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i] == null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            QuantidadeBuffer.quantidade--;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            int identificador;
            identificador=buffer[i].getIdentificador()[0];
            s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
            //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            buffer[i]= null;
        }
        // RC
        this.mutex.up();
        //this.esvaziaBuffer.up();
        System.out.println(s);
  //            lock.up();
    }
}

Method run of the producer:

    public void run() {
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    int i=0;
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        // Produz Item
        try {
            Thread.sleep(500);//50000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.esvaziaBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i]!=null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            int identificador[]=new int[Pedido.getTamanho_identificador()];
            identificador[0]=i;
            buffer[i]=new Pedido();
            Produtor.buffer[i].setIdentificador(identificador);
            Produtor.buffer[i].setTexto("pacote de dados");
            QuantidadeBuffer.quantidade++;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            i++;
        }
        // RC
        this.mutex.up();
        //this.encheBuffer.up();
    }
    //this.encheBuffer.up();
}

In the above code it happened of a consumer thread to read a position and then, another thread read the same position without a producer fill that position, something like this:

Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1

回答1:

It seems that you are using a mutex not a semaphore?

In using a mutex you have only binary synchronisation - locking and unlocking one resource. Sempahores have a value that you can signal or acquire.

You are trying to lock/unlock the entire buffer but that is the wrong way to go because, as you are seeing, either the producer or consumer locks, and when the reader has locked it the producer can't fill the buffer (because it has to lock first).

You should instead create a Sempahore, then when the producer writes one packet or block of data it can signal the semaphore. The consumers can then be trying to acquire the semaphore so they will be waiting until the producer has signalled a packet has been written. Upon signalling a written packet, one of the consumers will be woken and it will know it can read one packet. It can read a packet, then go back to trying to acquire on the semaphore. If in that time the producer has written another packet it has signalled again and either of the consumers will then go on to read another packet. Etc...

For example:

(Producer) - Write one packet - Semaphore.release(1)

(Consumer xN) - Semaphore.acquire(1) - Read one packet

If you have multiple consumers then the consumers (not the producer) should lock the buffer when reading the packet (but not when acquiring the semaphore) to prevent race conditions. In the example below the producer also locks the list since everything is on the same JVM.

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Semaphores {

    static Object LOCK = new Object();

    static LinkedList list = new LinkedList();
    static Semaphore sem = new Semaphore(0);
    static Semaphore mutex = new Semaphore(1);

    static class Consumer extends Thread {
        String name;
        public Consumer(String name) {
            this.name = name;
        }
        public void run() {
            try {

                while (true) {
                    sem.acquire(1);
                    mutex.acquire();
                    System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
                    mutex.release();
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    static class Producer extends Thread {
        public void run() {
            try {

                int N = 0;

                while (true) {
                    mutex.acquire();
                    list.add(new Integer(N++));
                    mutex.release();
                    sem.release(1);
                    Thread.sleep(500);
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        new Producer().start();
        new Consumer("Alice").start();
        new Consumer("Bob").start();
    }
}


回答2:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * To change this license header, choose License Headers in Project Properties.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
/**
 *
 * @author sakshi
 */
public class SemaphoreDemo {

    static Semaphore producer = new Semaphore(1);
    static Semaphore consumer = new Semaphore(0);
    static List<Integer> list = new ArrayList<Integer>();

    static class Producer extends Thread {

        List<Integer> list;

        public Producer(List<Integer> list) {
            this.list = list;
        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    producer.acquire();

                } catch (InterruptedException ex) {
                    Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                }
                System.out.println("produce=" + i);

                list.add(i);
                consumer.release();

            }
        }
    }

    static class Consumer extends Thread {

        List<Integer> list;

        public Consumer(List<Integer> list) {
            this.list = list;
        }

        public void run() {
            for (int i = 0; i < 10; i++) {
                try {
                    consumer.acquire();
                } catch (InterruptedException ex) {
                    Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                }

                System.out.println("consume=" + list.get(i));

                producer.release();
            }
        }
    }

    public static void main(String[] args) {
        Producer produce = new Producer(list);

        Consumer consume = new Consumer(list);

        produce.start();
        consume.start();
    }
}


output:

produce=0
consume=0
produce=1
consume=1
produce=2
consume=2
produce=3
consume=3
produce=4
consume=4
produce=5
consume=5
produce=6
consume=6
produce=7
consume=7
produce=8
consume=8
produce=9
consume=9


回答3:

import java.util.concurrent.Semaphore;


public class ConsumerProducer{

    public static void main(String[] args) {

           Semaphore semaphoreProducer=new Semaphore(1);
           Semaphore semaphoreConsumer=new Semaphore(0);
           System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0");

           new Producer(semaphoreProducer,semaphoreConsumer).start();
           new Consumer(semaphoreConsumer,semaphoreProducer).start();

    }


/**
 * Producer Class.
 */
static class Producer extends Thread{

    Semaphore semaphoreProducer;
    Semaphore semaphoreConsumer;


    public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) {
           this.semaphoreProducer=semaphoreProducer;
           this.semaphoreConsumer=semaphoreConsumer;
    }

    public void run() {
           for(;;){
                  try {
                      semaphoreProducer.acquire();
                      System.out.println("Produced : "+Thread.currentThread().getName());
                      semaphoreConsumer.release();

                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }          
    }
}

/**
 * Consumer Class.
 */
static class Consumer extends Thread{

    Semaphore semaphoreConsumer;
    Semaphore semaphoreProducer;

    public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) {
           this.semaphoreConsumer=semaphoreConsumer;
           this.semaphoreProducer=semaphoreProducer;
    }

    public void run() {

           for(;;){
                  try {
                      semaphoreConsumer.acquire();
                      System.out.println("Consumed : "+Thread.currentThread().getName());
                      semaphoreProducer.release();
                  } catch (InterruptedException e) {
                        e.printStackTrace();
                  }
           }
    }

}
}


回答4:

One of the most common usage pattern of Multi threaded application is to create an asynchronous communication network. Several real world applications require this. There are 2 ways of achieving this :-

  1. The producer and consumer are tightly coupled. This is not asynchronous and each producer waits for a consumer and vice versa. The throughput of the application also becomes the minimum of the 2 entities. This is generally never a good design.
  2. The better (and more complicated) way of doing this is by introducing a shared buffer between the producer and consumer. This way, a faster producer or faster consumer are not throttled due to a slower counterpart. It also allows for multiple producers and multiple consumers to connect via the shared buffer.

There are several ways to create a Producer-Consumer pattern.

  1. Using wait/notify/nofityAll which was covered in the earlier module on "Locking Fundamentals"
  2. Using the API provided by Java - java.util.concurrent.BlockingQueue. We will cover more on this in a subsequent module.
  3. Using Semaphores : This is a very convenient way of creating the producer-consumer pattern.

    public class ProducerConsumerSemaphore {
    
    private static final int BUFFER_SIZE = 10;
    private static final int MAX_VALUE = 10000;
    private final Stack<Integer> buffer = new Stack<Integer>();
    private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
    private final Semaphore readPermits = new Semaphore(0);
    private final Random random = new Random();
    
    class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                writePermits.acquireUninterruptibly();
                buffer.push(random.nextInt(MAX_VALUE));
                readPermits.release();
            }
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                readPermits.acquireUninterruptibly();
                System.out.println(buffer.pop());
                writePermits.release();
            }
        }
    }
    
    public static void main(String[] args) {
    
        ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
        Producer p1 = obj.new Producer();
        Producer p2 = obj.new Producer();
        Producer p3 = obj.new Producer();
        Consumer c1 = obj.new Consumer();
        Consumer c2 = obj.new Consumer();
        Consumer c3 = obj.new Consumer();
        Thread t1 = new Thread(p1);
        Thread t2 = new Thread(p2);
        Thread t3 = new Thread(p3);
        Thread t4 = new Thread(c1);
        Thread t5 = new Thread(c2);
        Thread t6 = new Thread(c3);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        t6.start();
    }
    

We use 2 semaphores - 1 for consumers and 1 for producers.

The number of permits allowed for the producer are set to maximum buffer size.

Each producer consumes 1 write permit and releases 1 read permit on producing 1 message.

Each consumer consumes 1 read permit and releases 1 write permit for consumption of each message.

Imagine the permit to be piggy banked on the actual message. Write permit flows from the Producer to Consumer (and back to the Producer). Read permit flows from the Consumer to Producer (and back to the Consumer). Total messages in the buffer at any given point of time will be exactly equal to the number of read permits issued. If the rate of producing messages is greater than the rate of consuming messages, then at a certain point, number of write permits available would be exhausted and all the producer threads would be blocked until a consumer reads from the buffer and releases a write permit. The same logic exists the other way round as well.

Above is a more visual articulation of flow of messages and permits in the system. By using Semaphores, we are only abstracting away the gory details and care required to write a piece of code using wait/notify/notifyAll. The above code can be compared with the wait et. al approach :

When a thread is blocked for lack of permits, it is equivalent to a wait() call on that semaphore.

When a thread releases a permit, it is equivalent to a notifyAll() call on that particular semaphore.