Input and Output Stream Pipe in Java

2019-01-25 10:45发布

问题:

Does anyone have any good suggestions for creating a Pipe object in Java which is both an InputStream and and OutputStream since Java does not have multiple inheritance and both of the streams are abstract classes instead of interfaces?

The underlying need is to have a single object that can be passed to things which need either an InputStream or an OutputStream to pipe output from one thread to input for another.

回答1:

It seems the point of this question is being missed. If I understand you correctly, you want an object that functions like an InputStream in one thread, and an OutputStream in another to create a means of communicating between the two threads.

Perhaps one answer is to use composition instead of inheritance (which is recommended practice anyway). Create a Pipe which contains a PipedInputStream and a PipedOutputStream connected to each other, with getInputStream() and getOutputStream() methods.

You can't directly pass the Pipe object to something needing a stream, but you can pass the return value of it's get methods to do it.

Does that work for you?



回答2:

java.io.PipedOutputStream and java.io.PipedInputStream look to be the classes to use for this scenario. They are designed to be used together to pipe data between threads.

If you really want some single object to pass around it would need to contain one of each of these and expose them via getters.



回答3:

This is a pretty common thing to do, I think. See this question.

Easy way to write contents of a Java InputStream to an OutputStream



回答4:

You can't create a class which derives both from InputStream and OutputStream because these aren't interfaces and they have common methods and Java doesn't allow multiple inheritance (the compiler doesn't know whether to call InputStream.close() or OutputStream.close() if you call close() on your new object).

The other problem is the buffer. Java wants to allocate a static buffer for the data (which doesn't change). This means when you use the `java.io.PipedXxxStream', the writing data to it will eventually block unless you use two different threads.

So the answer from Apocalisp is correct: You must write a copy loop.

I suggest that you include Apache's commons-io in your project which contains many helper routines just for tasks like this (copy data between streams, files, strings and all combinations thereof).



回答5:

See http://ostermiller.org/utils/CircularBuffer.html



回答6:

I had to implement a filter for slow connections to Servlets so basically I wrapped the servlet output stream into a QueueOutputStream which will add every byte (in small buffers), into a queue, and then output those small buffers to a 2nd output stream, so in a way this acts as input/output stream, IMHO this is better than JDK pipes which won't scale that well, basically there is too much context switching in the standard JDK implementation (per read/write), a blocking queue is just perfect for a single producer/consumer scenario:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }