Node.js: splitting stream content for n-parts

2019-02-21 19:56发布

问题:

I'm trying to understand node streams and their life-cycle. So, I want to split the content of a stream for n-parts. The code below is just to explain my intentions and to show that I already try something by myself. I omitted some details

I have a stream which just generates some data(just a sequence of numbers):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();

a function which tries to take n next chunks:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
}

and want to use it like this:

take(stream, 3)
  .then(res => {
    assert.deepEqual(res, [1, 2, 3])
    return take(stream, 3)
  })
  .then(res => {
    assert.deepEqual(res, [4, 5, 6])
  })

What is the idiomatic way to do that?

回答1:

Using ReadableStream you could use a single function to check if elements of current chunk of data is equal to expected result.

Create variables, CHUNK and N, where CHUNK is the number of elements to slice or splice from original array, N is the variable incremented by CHUNK at each .enqueue() call within pull() call.

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));

Using chained .then()

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(({value, done}) => {
  if ([1,2,3].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read()
  }
})
.then(({value, done}) => {
  if ([4,5,6].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    // return `result`; `reader.closed` returns a `Promise`
    return reader.closed.then(() => result);
  }
})
.then(readComplete)
.catch(err => console.log(err));

See also Chrome memory issue - File API + AngularJS



回答2:

I think this is something that could help you out - https://github.com/substack/stream-handbook

It's an amazingly detailed handbook with sample code for various streaming scenarios and I'm using the same as a reference for my own project and have found it useful so far! It has sample code in /examples as well