Thread safe InputStream in java

2020-06-11 13:42发布

问题:

How to create a Thread safe InputStream .During multithreaded operations the inputStream data gets corrupted,so how can i make my inputStream thread safe.will the following code work

public class SynchronizedInputStream  extends InputStream{

    private InputStream in;

    private SynchronizedInputStream( InputStream in ) {
        this.in = in;
    }

    /* ... method for every InputStream type to use */
    public  static InputStream createInputStream( InputStream in) {
        return new SynchronizedInputStream( in);
    }

    public static InputStream createPushBackInputStream(InputStream in,int BUFSIZE){
        return new SynchronizedInputStream(new PushbackInputStream(in,BUFSIZE));
    }

    /* Wrap all InputStream methods Used */

    public int read(){
        synchronized (this) {
            try {
                return in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return 0;
    }

    @Override
    public int available() {
        synchronized( this ) {
            try {
                return in.available();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return 0;
    }


}

In the NANOHTTPD file

 public HTTPSession(TempFileManager tempFileManager, InputStream inputStream, OutputStream outputStream, InetAddress inetAddress) {
            this.inputStream=(PushbackInputStream) SynchronizedInputStream.createPushBackInputStream(inputStream);
        /*lines of code..........*/
        }

Then i call it like this

String Data = readStream(session.getInputStream());//session is HTTPSession
/*.....code....*/
private String readStream(InputStream in) {
        synchronized (in) {
            PushbackInputStream inputStream = (PushbackInputStream) in;
            StringBuffer outputBuffer = null;
            try {
                                 //Reading the InputStream Here
                }
            } catch (IOException ioe) {
                //error msg
            }
                return outputBuffer.toString();

        } 
    }

回答1:

The short answer is that the class you have shown us is thread-safe, but the code that uses your class probably won't be thread safe!!

What you have implemented are operations that atomically read one character, and atomically test if there is something to be read. The implementation of those operations is thread-safe, if (and only if) all threads use the same SynchronizedInputStream object to access a given InputStream, and nothing apart from your wrapper access the InputStream directly.

However, this most likely that this will not be sufficient to make your application's use of the streams thread-safe in the larger sense.

I expect that the "corruption" that you are observing is actually happening a higher level; e.g. two threads that are simultaneously making a sequence of read calls to read (say) messages are interleaving so that some bytes of a message are going to the wrong thread. Assuming that that is your problem, then this does not fix it. Your read method only locks the stream while a thread reads a single byte. After unlocking, there is nothing to stop a different thread from reading the next byte.

There are a few ways to solve this. For example"

  • A simple way is to restructure your code only one thread ever reads from a given InputStream. That thread reads the messages, and turns them into objects that can be handed off to others via a queue ... for example.

  • Another way is to replace your wrapper class with one that reads an entire message atomically. Don't extend InputStream. Instead design your API in terms of the larger scale operations, and synchronize at that level of granularity.

UPDATE

Re the extra code you added.

It looks like only one thread (the current request thread) should ever be reading from the input stream. If you are only using one thread there should be no issues with multi-threading or thread safety. (And besides, that this the way that the nanoHTTPD code is designed to work.)

Supposing that there were multiple threads, your synchronized (in) { block in readStream would normally be sufficient to make the code thread-safe, provided that all all of the threads were using the same in object.

The problem is that your hacked HttpSession class is creating a separate SynchronizedInputStream for each "session", and THAT is what your code synchronizes on. So if (somehow) two threads created HttpSessions objects using the same socket input stream, they would synchronize on different objects, and there would be no mutual exclusion.

But this is all conjecture. So far, you have not demonstrated that there are multiple threads attempting to use the same input stream.



回答2:

You need to think about how it would make sense. Imagine more than one people are reading a magic book, which erases the character the first time anyone sees it. So only one person can read any given character. That's kind of how streams are.

This makes it really hard to read the book in a useful manner. When most naively done, each person will just get some random subset of the characters; not very useful information.

One straight forward solution is to let one read it and then copy it onto a book that doesn't erase characters when one reads it. This way everyone can read the book. In some situation, you don't need everyone to understand the book, and people can just work as long as they are given a sentence. In this case, the one reader can post each sentence to a queue from which everyone takes one sentence at a time.

Other approaches include having a buffer where each threads store the character they read, and then check each time if they can form a word, and if so emitting the word for downstream processing. For an example, see Netty's codec package (e.g. this).

These approaches are however usually implemented on top of a stream rather than inside it. You could well have a stream that does these inside, but it will probably confuse people.