Understanding back-pressure in rxjs - only cache 5

2019-04-09 08:34发布

I am working on a node project that needs to submit thousands of images for processing. Before these images are uploaded to the processing server they need to be resized so I have something along the lines of this:

imageList
    .map(image => loadAndResizeImage)
    .merge(3)
    .map(image => uploadImage)
    .merge(3)
    .subscribe();

Image resizing typically takes a few tenths of a second, uploading and processing takes around 4 seconds.

How can I prevent thousands of resized images building up in memory as I wait for the upload queue to clear? I probably want to have 5 images resized and waiting to go so that as soon as an image upload finishes the next resized image is pulled from the queue and uploaded and a new image is resized and added to the 'buffer'.

An illustration of the issue can be found here:

https://jsbin.com/webaleduka/4/edit?js,console

Here there is a load step (taking 200ms) and a process step (taking 4 seconds). Each process is limited to a concurrency of 2. We can see that with 25 initial items we get to 20 images in memory.

I did look at the buffer options but neither seemed to do what I wanted to do.

At the moment I have just combined the load, resize and upload into one deferred observable that I merge with a max concurrency. I would like to have the images waiting for upload though and I am sure that it must be possible.

I am using RXjs 4 but I imagine the principals will be the same for 5.

Many Thanks.

2条回答
乱世女痞
2楼-- · 2019-04-09 08:44

In RxJS 5 I'd do it like this:

Observable.range(1, 25)
    .bufferCount(5)
    .concatMap(batch => { // process images
        console.log('process', batch);
        return Observable.from(batch)
            .mergeMap(val => Observable.of('p' + val).delay(300))
            .toArray();
    })
    .concatMap(batch => { // send images
        console.log('send batch', batch);
        return Observable.from(batch)
            .mergeMap(val => Observable.of('s' + val).delay(500))
            .toArray();
    })
    .subscribe(val => {
        // console.log('response');
        console.log('response', val);

    });

With bufferCount operator I split the input array into batches of 5 items. Then each batch is first processed with the first concatMap() (I'm using concat on purpose because I want to wait until the nested Observable completes). Then the processed data is sent to another concatMap() that sends it to your server.

I'm using two delay() operators to simulate that different tasks take different time. In our case processing images is very quick so the first concatMap will emit items faster than the second concatMap is able to sent them to the server which is allright. The processed images will be stacked inside concatMap and will be send in batches one after another.

Output from this demo will look like this:

process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
process [ 6, 7, 8, 9, 10 ]
process [ 11, 12, 13, 14, 15 ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
process [ 16, 17, 18, 19, 20 ]
process [ 21, 22, 23, 24, 25 ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]

See live demo: https://jsbin.com/mileqa/edit?js,console

However if you wanted to always first process a batch than send it and when it's sent than continue with another batch you'd have to move the second inner Observable from concatMap at the end of the toArray() in the first concatMap() call.

.concatMap(batch => { // process images
    console.log('process', batch);
    return Observable.from(batch)
        .mergeMap(val => Observable.of('p' + val).delay(100))
        .toArray()
        .concatMap(batch => { // send images
            console.log('send batch', batch);
            return Observable.from(batch)
                .mergeMap(val => Observable.of('s' + val).delay(500))
                .toArray();
        });
})

See live demo: https://jsbin.com/sabena/2/edit?js,console

This produces output as the following:

process [ 1, 2, 3, 4, 5 ]
send batch [ 'p1', 'p2', 'p3', 'p4', 'p5' ]
response [ 'sp1', 'sp2', 'sp3', 'sp4', 'sp5' ]
process [ 6, 7, 8, 9, 10 ]
send batch [ 'p6', 'p7', 'p8', 'p9', 'p10' ]
response [ 'sp6', 'sp7', 'sp8', 'sp9', 'sp10' ]
process [ 11, 12, 13, 14, 15 ]
send batch [ 'p11', 'p12', 'p13', 'p14', 'p15' ]
response [ 'sp11', 'sp12', 'sp13', 'sp14', 'sp15' ]
process [ 16, 17, 18, 19, 20 ]
send batch [ 'p16', 'p17', 'p18', 'p19', 'p20' ]
response [ 'sp16', 'sp17', 'sp18', 'sp19', 'sp20' ]
process [ 21, 22, 23, 24, 25 ]
send batch [ 'p21', 'p22', 'p23', 'p24', 'p25' ]
response [ 'sp21', 'sp22', 'sp23', 'sp24', 'sp25' ]

You can see that "process", "send batch" and "response" logs are in order.

Implementation in RxJS 4 should be almost identical (just operators names might be slightly different).

In RxJS 4 there's also controlled() operator that doesn't exsit in RxJS 5 (yet?). I does maybe something very similar to what you need.

查看更多
Juvenile、少年°
3楼-- · 2019-04-09 08:48

I think that I have managed to solve this by using the controlled() rxjs operator:

var queuedImages = 0;

var imageSource = Rx.Observable.range(1, 25)
  .map(index => "image_" + index)
  .controlled();

imageSource
  .map(image => loadImage(image))
  .merge(2)
  .do((image) => {
    queuedImages++;
    console.log(`Images waiting for processing: ${queuedImages}`);
  })
  .map(image => processImage(image))
  .merge(2)
  .do( () => {
    queuedImages--;
    console.log(`Images waiting for processing: ${queuedImages}`);

    if(queuedImages < 4){
      console.log(`requesting more image loads`);
      imageSource.request(4-queuedImages);
    }
  })
  .subscribe( 
    (item) => {}, null, 
    () => console.log(`All Complete`) );

imageSource.request(4);

Initially 4 images are requested. These are loaded from disc and then processed. As each image is loaded and then processed the number of images in memory are kept track of using the queuedImages variable. When this number drops below 4 more images are requested.

A jsbin of this can be seen here:

https://jsbin.com/webaleduka/11/edit?js,console

This method means that there are never more than 6 or so images in the cache and ensures that there are always enough images in the cache waiting to be uploaded.

查看更多
登录 后发表回答