One Producer with Multiple Consumer, using PipedIn

2019-08-28 12:20发布

问题:

NOTE: I have this goal and I have many doubts, but I take the opportunity to do them all because I do not know if it would be advisable to open different posts with the same code / problem.

I have a producer (RunnableProducer) that has multiple consumers that are called RunnableWorker. The RunnableWorker may have several consumers of its same class.. Finally, its information is consumed by simple consumers RunnableConsumer.

The information can change in size, increasing or decreasing (that is called resampling), and in turn can be modified by filters (amplifying or attenuating). These operations are performed by RunnableWorker.

I have tried to perform an interaction modeling between Runnable interfaces.

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;

public class TestPipedRunnable {

  public static void main(String... args) {
    int numBytes = 8;

    RunnableProducer runnableProducer = new RunnableProducer("runnableProducer", numBytes);

    RunnableWorker runnableWorkerStage0Unique = new RunnableWorker("runnableWorkerStage0Unique", numBytes, runnableProducer.getPipedInputStream());

    RunnableWorker runnableWorkerStage1Number1 = new RunnableWorker("runnableWorkerStage1Number1", numBytes);
    RunnableWorker runnableWorkerStage1Number2 = new RunnableWorker("runnableWorkerStage1Number2", numBytes);

    runnableWorkerStage0Unique.addPipedOutputStream(runnableWorkerStage1Number1.getPipedOutputStream());
    runnableWorkerStage0Unique.addPipedOutputStream(runnableWorkerStage1Number2.getPipedOutputStream());

    RunnableConsumer runnableConsumer1 = new RunnableConsumer("runnableConsumer1", numBytes);
    runnableWorkerStage1Number1.addPipedOutputStream(runnableConsumer1.getPipedOutputStream());

    RunnableConsumer runnableConsumer2 = new RunnableConsumer("runnableConsumer2", numBytes);
    runnableWorkerStage1Number2.addPipedOutputStream(runnableConsumer2.getPipedOutputStream());

    ExecutorService executorService = Executors.newCachedThreadPool();

    executorService.execute(runnableProducer);
    executorService.execute(runnableWorkerStage0Unique);
    executorService.execute(runnableWorkerStage1Number1);
    executorService.execute(runnableWorkerStage1Number2);
    executorService.execute(runnableConsumer1);
    executorService.execute(runnableConsumer2);
    executorService.shutdown();
  }

  public static String getRandomString(int size) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < size; i++) {
      char c = (char) (new Random().nextInt(25) + 'a');
      sb.append(c);
    }
    return sb.toString();
  }

  public static class RunnableProducer implements Runnable {

    private PipedInputStream pipedInputStream;
    private PipedOutputStream pipedOutputStream;
    private String name;
    private final int bufferSize;

    public RunnableProducer(String name, int bufferSize) {
      this.name = name;
      this.bufferSize = bufferSize;
      pipedInputStream = new PipedInputStream();
      pipedOutputStream = new PipedOutputStream();
      try {
        pipedOutputStream.connect(pipedInputStream);
      } catch (IOException e) {
        System.out.println("Exception:" + e.getMessage());
      }
    }

    public RunnableProducer(String name, int bufferSize, PipedOutputStream pipedOutputStream) {
      this.name = name;
      this.bufferSize = bufferSize;
      this.pipedOutputStream = pipedOutputStream;
    }

    @Override
    public void run() {
      while (true) {
        try {
          //          byte[] outBytesSamples = new byte[bufferSize];
          //          new Random().nextBytes(outBytesSamples);
          byte[] outBytesSamples = getRandomString(bufferSize).getBytes();
          pipedOutputStream.write(outBytesSamples);
          pipedOutputStream.flush();
          System.out.println(name + ".Produced " + new String(outBytesSamples));
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        } catch (Exception e) {
          System.out.println("Exception with " + name);
          System.out.println("Exception:" + e.getMessage());
        }
      }
    }

    public PipedInputStream getPipedInputStream() {
      return pipedInputStream;
    }

  }

  public static class RunnableConsumer implements Runnable {

    private PipedInputStream pipedInputStream;
    private PipedOutputStream pipedOutputStream;
    private String name;
    private final int bufferSize;

    public RunnableConsumer(String name, int bufferSize) {
      this.name = name;
      this.bufferSize = bufferSize;
      pipedInputStream = new PipedInputStream();
      pipedOutputStream = new PipedOutputStream();
      try {
        pipedInputStream.connect(pipedOutputStream);
      } catch (IOException e) {
        System.out.println("Exception:" + e.getMessage());
      }
    }

    public RunnableConsumer(String name, int bufferSize, PipedInputStream pipedInputStream) {
      this.name = name;
      this.bufferSize = bufferSize;
      this.pipedInputStream = pipedInputStream;
    }

    @Override
    public void run() {
      while (true) {
        try {
          byte[] inBytesSamples = new byte[bufferSize];
          pipedInputStream.read(inBytesSamples);
          System.out.println(name + ".Consume " + new String(inBytesSamples));
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);
          Thread.sleep(timeSleeping);
        } catch (Exception e) {
          System.out.println("Exception with " + name);
          System.out.println("Exception:" + e.getMessage());
        }
      }
    }

    public PipedOutputStream getPipedOutputStream() {
      return pipedOutputStream;
    }

  }

  public static class RunnableWorker implements Runnable {

    private String name;
    private final int bufferSize;
    private PipedInputStream pipedInputStream;
    private PipedOutputStream pipedOutputStream;
    private final List<PipedOutputStream> listPipedOutputStream;

    public RunnableWorker(String name, int bufferSize, PipedInputStream pipedInputStream) {
      this.name = name;
      this.bufferSize = bufferSize;
      this.pipedInputStream = pipedInputStream;
      this.listPipedOutputStream = new ArrayList<>();
    }

    public RunnableWorker(String name, int bufferSize, PipedOutputStream pipedOutputStream) {
      this.name = name;
      this.bufferSize = bufferSize;
      this.pipedOutputStream = pipedOutputStream;
      this.listPipedOutputStream = new ArrayList<>();
    }

    public RunnableWorker(String name, int bufferSize) {
      this.name = name;
      this.bufferSize = bufferSize;
      this.pipedInputStream = new PipedInputStream();
      try {
        this.pipedOutputStream = new PipedOutputStream(pipedInputStream);
      } catch (IOException e) {
        System.out.println("Exception with " + name);
        System.out.println("Exception:" + e.getMessage());
      }
      this.listPipedOutputStream = new ArrayList<>();
    }

    @Override
    public void run() {
      int qtyBytes;
      byte[] incomingBytes = new byte[bufferSize * 2];

      if (pipedInputStream != null) {
        try {
          while ((qtyBytes = pipedInputStream.read(incomingBytes)) != -1) {
            byte[] inBytesSamples = new byte[qtyBytes];
            System.arraycopy(incomingBytes, 0, inBytesSamples, 0, qtyBytes);
            // Begin Simulate Change of Size and Change information 
            // applying FFT and others complex and heavy process
            // 
            double change = 0.0;//ThreadLocalRandom.current().nextDouble(-0.5, 0.5);
            int newSize = (int) (qtyBytes * (1.0 + change));
            byte[] outBytesSamples = new byte[newSize];
            int minSize = newSize < qtyBytes ? newSize : qtyBytes;
            System.arraycopy(inBytesSamples, 0, outBytesSamples, 0, minSize);
            if (newSize > qtyBytes) {
              byte[] restBytesSamples = getRandomString(newSize - qtyBytes).getBytes();//
              //byte[] restBytesSamples = new byte[newSize - qtyBytes];
              //new Random().nextBytes(restBytesSamples);
              System.arraycopy(restBytesSamples, 0, outBytesSamples, minSize, restBytesSamples.length);
            }
            //new Random().nextBytes(outBytesSamples);
            // Finish Simulate Change of Size and Change information
            writepipedOutputStream(outBytesSamples);
            //writepipedOutputStream(inBytesSamples); // For Test Porpouse (not changes applied to information)

          }
        } catch (IOException e) {
          System.out.println("Exception with " + name);
          System.out.println("Exception:" + e.getMessage());
        }
      }
    }

    public PipedInputStream getPipedInputStream() {
      return pipedInputStream;
    }

    public PipedOutputStream getPipedOutputStream() {
      return pipedOutputStream;
    }

    public void addPipedOutputStream(PipedOutputStream pipedOutputStream) {
      listPipedOutputStream.add(pipedOutputStream);
    }

    public void delPipedOutputStream(PipedOutputStream pipedOutputStream) {
      listPipedOutputStream.remove(pipedOutputStream);
    }

    private void writepipedOutputStream(byte[] buffer) {
      listPipedOutputStream.forEach(pipedOutputStream -> {
        try {
          pipedOutputStream.write(buffer);
        } catch (IOException e) {
          System.out.println("Exception with " + name);
          System.out.println("Exception:" + e.getMessage());
        }
      });
    }
  }

}

When I use in RunnableWorker the line writepipedOutputStream(inBytesSamples); // For Test Porpouse (not changes applied to information) I got the output (It seems that everything it is OK).

--- exec-maven-plugin:1.5.0:exec (default-cli) @ MPMC ---
runnableProducer.Produced nbtsavuf
runnableProducer.Produced pwrnttbi
runnableConsumer1.Consume nbtsavuf
runnableConsumer2.Consume nbtsavuf
runnableProducer.Produced rsuaffdo
runnableConsumer1.Consume pwrnttbi
runnableConsumer2.Consume pwrnttbi
runnableProducer.Produced clmldcfx
runnableConsumer1.Consume rsuaffdo
runnableConsumer2.Consume rsuaffdo
runnableProducer.Produced tnkosyrv

But, I got this Exceptions when I use in RunnableWorker the line writepipedOutputStream(outBytesSamples);

--- exec-maven-plugin:1.5.0:exec (default-cli) @ MPMC ---
runnableProducer.Produced bniqphsw
runnableProducer.Produced vfjmwqai
Exception with runnableConsumer2
Exception with runnableConsumer1
java.io.StreamCorruptedException: invalid stream header: 77087666
    at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:354)
    at org.joseluisbz.mpmc.TestPipedRunnable$RunnableConsumer.run(TestPipedRunnable.java:142)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
java.io.StreamCorruptedException: invalid stream header: 77087666
    at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:354)
    at org.joseluisbz.mpmc.TestPipedRunnable$RunnableConsumer.run(TestPipedRunnable.java:142)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
java.io.StreamCorruptedException: invalid stream header: 6A6D7771
Exception with runnableConsumer2
    at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
    at java.io.ObjectInputStream.<init>(ObjectInputStream.java:354)
    at org.joseluisbz.mpmc.TestPipedRunnable$RunnableConsumer.run(TestPipedRunnable.java:142)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Exception with runnableConsumer1    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

Some time I got:

java.io.StreamCorruptedException: invalid type code: C4

Questions

Is it possible translate my modeling to using BlockingQueue? BlockingQueue vs PipedOutputStream and PipedInputStream

What improvements could be made to avoid exceptions? (taking into account that worker will take a long time making the indicated changes before passing the information to their own consumers).

Will there be a pattern of development for this raised problem?