Send data from InputStream over Akka/Spring stream

2019-07-25 13:30发布

问题:

Here is my previous question: Send big file over reactive stream

I have managed to send file over Akka stream using FileIO.fromPath(Paths.get(file.toURI())) and it works fine. However, I would like to compress and encrypt file before sending it. I have created method, that opens FileInputStream, routes it through compression stream and then through encryption stream and now I would like to direct it into socket using Akka stream.

File -> FileInputStream -> CompressedInputStream -> EncryptedInputStream -?> Spring/Akka stream

The point is, I can compress/encrypt file on-the-fly while reading it chunk by chunk (I am not creating additional files on disk), and I do not see how can I send InputStream (compressed and encrypted) over Akka/Spring stream (by Spring stream I mean Akka stream under project Reactor API).

Question is: how to compress, encrypt and send file without saving whole compressed/encrypted file to disk at no point?

回答1:

I have (accidently) found: StreamConverters.fromInputStream!

http://doc.akka.io/docs/akka/2.4.17/java/stream/stages-overview.html#fromInputStream



回答2:

Actually, there is a source specifically for working with resources like input streams. It's called Source.unfoldResource:

Source<ByteString, NotUsed> source = Source.unfoldResource(
    () -> prepareEncryptedStream(),
    is -> readChunk(is, 4096),
    InputStream::close
);

Optional<ByteString> readChunk(InputStream is, int size) throws IOException {
    byte[] data = new byte[size];
    int read = is.read(data);
    if (read < 0) {
        return Optional.empty();
    }
    return Optional.of(ByteString.fromArray(data, 0, read));
}

InputStream prepareEncryptedStream() { ... }

Here prepareCompressedFile() is a method which should return the encrypted stream you want to create a reactive stream from, and readChunk() is a convenience method which reads a ByteString from an InputStream of the specified size.

If you can express compression and encryption routines as ByteString -> ByteString functions, then you don't really need this; all you need to do is to pass these routines to the map() flow:

Source<ByteString, CompletionStage<IOResult>> source =
    FileIO.fromPath(Paths.get("somewhere"))
        .map(bs -> compress(bs))
        .map(bs -> encrypt(bs));

ByteString encrypt(ByteString bs) { ... }

ByteString compress(ByteString bs) { ... }