Here is my previous question: Send big file over reactive stream
I have managed to send file over Akka stream using FileIO.fromPath(Paths.get(file.toURI())) and it works fine. However, I would like to compress and encrypt file before sending it. I have created method, that opens FileInputStream, routes it through compression stream and then through encryption stream and now I would like to direct it into socket using Akka stream.
File -> FileInputStream -> CompressedInputStream -> EncryptedInputStream -?> Spring/Akka stream
The point is, I can compress/encrypt file on-the-fly while reading it chunk by chunk (I am not creating additional files on disk), and I do not see how can I send InputStream (compressed and encrypted) over Akka/Spring stream (by Spring stream I mean Akka stream under project Reactor API).
Question is: how to compress, encrypt and send file without saving whole compressed/encrypted file to disk at no point?
I have (accidently) found: StreamConverters.fromInputStream!
http://doc.akka.io/docs/akka/2.4.17/java/stream/stages-overview.html#fromInputStream
Actually, there is a source specifically for working with resources like input streams. It's called Source.unfoldResource
:
Source<ByteString, NotUsed> source = Source.unfoldResource(
() -> prepareEncryptedStream(),
is -> readChunk(is, 4096),
InputStream::close
);
Optional<ByteString> readChunk(InputStream is, int size) throws IOException {
byte[] data = new byte[size];
int read = is.read(data);
if (read < 0) {
return Optional.empty();
}
return Optional.of(ByteString.fromArray(data, 0, read));
}
InputStream prepareEncryptedStream() { ... }
Here prepareCompressedFile()
is a method which should return the encrypted stream you want to create a reactive stream from, and readChunk()
is a convenience method which reads a ByteString
from an InputStream
of the specified size.
If you can express compression and encryption routines as ByteString -> ByteString
functions, then you don't really need this; all you need to do is to pass these routines to the map()
flow:
Source<ByteString, CompletionStage<IOResult>> source =
FileIO.fromPath(Paths.get("somewhere"))
.map(bs -> compress(bs))
.map(bs -> encrypt(bs));
ByteString encrypt(ByteString bs) { ... }
ByteString compress(ByteString bs) { ... }