Send big file over reactive stream

2019-07-15 08:10发布

问题:

Part of application I am writing requires transferring arbitrarily big (for this question I will assume 100-200 GB) files from client to server. Important thing, is that receiver (server) is not storing this file - it just read/examine stream and sends it to next point. Because at no point I need whole file, but expect multiple transfers at same time, I would like to minimize RAM usage and eliminate disk usage. I would like to process files in chunks of 1 MB.

Right now, server uses Spring Boot and Akka.

My first attempt was to open buffered file input stream on client side, read it in chunks of 1 MB and send them in messages in separate thread. It works, however problem is that client is sending messages one-after-another without worrying if server has buffer to store it (lacks back pressure).

My second idea was to use akka-streams like this:

How to use Reactive Streams for NIO binary processing?

with use of ActorPublisher like this:

akka-streams with akka-cluster

however, as stated here:

http://doc.akka.io/docs/akka/2.4.16/scala/stream/stream-integrations.html#Implementing_Reactive_Streams_Publisher_or_Subscriber

"Warning ActorPublisher and ActorSubscriber will probably be deprecated in future versions of Akka.

Warning ActorPublisher and ActorSubscriber cannot be used with remote actors, because if signals of the Reactive Streams protocol (e.g. request) are lost the the stream may deadlock."

it doesn't look like good idea.

I do not want to save it in any storage provider (dropbox, google drive, ...) because I want to analyze data on-the-fly. I have Spring 5 and Akka on board, but I can use any other software, that will solve this. Raw socket will lack back pressure and torrents do not guarantee sequential/ordered read-write (that I need).

Main question is: how to stream big file from client to server, assuming server is not able to store file at once on disk nor in ram?

Bonus question is: how to calculate "correct" size of chunk in such transfer?

I have been looking for answer for days, and looks like I am not the only one with problem like this, however there are no answers or answers like "don't do it" without pointing out other adequate alternate solution.

回答1:

Akka stream provides functionality specifically for this use case: streaming File IO. From the documentation:

import akka.stream.scaladsl._
val file = Paths.get("example.csv")

val foreach: Future[IOResult] = 
  FileIO.fromPath(file)
        .to(Sink.ignore)
        .run()

Regarding your bonus question for the "correct size" of chunk; this is highly dependent on your hardware and software configurations. Your best bet is to write a test client and modulate the chunk size until you find a "sweet spot" for your server.