I am working on a node project that needs to submit thousands of images for processing. Before these images are uploaded to the processing server they need to be resized so I have something along the lines of this:
imageList
.map(image => loadAndResizeImage)
.merge(3)
.map(image => uploadImage)
.merge(3)
.subscribe();
Image resizing typically takes a few tenths of a second, uploading and processing takes around 4 seconds.
How can I prevent thousands of resized images building up in memory as I wait for the upload queue to clear? I probably want to have 5 images resized and waiting to go so that as soon as an image upload finishes the next resized image is pulled from the queue and uploaded and a new image is resized and added to the 'buffer'.
An illustration of the issue can be found here:
https://jsbin.com/webaleduka/4/edit?js,console
Here there is a load step (taking 200ms) and a process step (taking 4 seconds). Each process is limited to a concurrency of 2. We can see that with 25 initial items we get to 20 images in memory.
I did look at the buffer options but neither seemed to do what I wanted to do.
At the moment I have just combined the load, resize and upload into one deferred observable that I merge with a max concurrency. I would like to have the images waiting for upload though and I am sure that it must be possible.
I am using RXjs 4 but I imagine the principals will be the same for 5.
Many Thanks.
In RxJS 5 I'd do it like this:
With
bufferCount
operator I split the input array into batches of 5 items. Then each batch is first processed with the firstconcatMap()
(I'm using concat on purpose because I want to wait until the nested Observable completes). Then the processed data is sent to anotherconcatMap()
that sends it to your server.I'm using two
delay()
operators to simulate that different tasks take different time. In our case processing images is very quick so the firstconcatMap
will emit items faster than the secondconcatMap
is able to sent them to the server which is allright. The processed images will be stacked insideconcatMap
and will be send in batches one after another.Output from this demo will look like this:
See live demo: https://jsbin.com/mileqa/edit?js,console
However if you wanted to always first process a batch than send it and when it's sent than continue with another batch you'd have to move the second inner Observable from
concatMap
at the end of thetoArray()
in the firstconcatMap()
call.See live demo: https://jsbin.com/sabena/2/edit?js,console
This produces output as the following:
You can see that "process", "send batch" and "response" logs are in order.
Implementation in RxJS 4 should be almost identical (just operators names might be slightly different).
In RxJS 4 there's also
controlled()
operator that doesn't exsit in RxJS 5 (yet?). I does maybe something very similar to what you need.I think that I have managed to solve this by using the
controlled()
rxjs operator:Initially 4 images are requested. These are loaded from disc and then processed. As each image is loaded and then processed the number of images in memory are kept track of using the
queuedImages
variable. When this number drops below 4 more images are requested.A jsbin of this can be seen here:
https://jsbin.com/webaleduka/11/edit?js,console
This method means that there are never more than 6 or so images in the cache and ensures that there are always enough images in the cache waiting to be uploaded.