Node.js: splitting stream content for n-parts
Asked Answered
H

2

9

I'm trying to understand node streams and their life-cycle. So, I want to split the content of a stream for n-parts. The code below is just to explain my intentions and to show that I already try something by myself. I omitted some details

I have a stream which just generates some data(just a sequence of numbers):

class Stream extends Readable {
  constructor() {
    super({objectMode: true, highWaterMark: 1})
    this.counter = 0
  }

  _read(size) {
    if(this.counter === 30) {
      this.push(null)
    } else {
      this.push(this.counter)
    }
    this.counter += 1
  }
}

const stream = new Stream()
stream.pause();

a function which tries to take n next chunks:

function take(stream, count) {
  const result = []
  return new Promise(function(resolve) {
    stream.once('readable', function() {
      var chunk;
      do {
        chunk = stream.read()
        if (_.isNull(chunk) || result.length > count) {
          stream.pause()
          break
        }
        result.push(chunk)
      } while(true)
      resolve(result)
    })
  })
}

and want to use it like this:

take(stream, 3)
  .then(res => {
    assert.deepEqual(res, [1, 2, 3])
    return take(stream, 3)
  })
  .then(res => {
    assert.deepEqual(res, [4, 5, 6])
  })

What is the idiomatic way to do that?

Housebound answered 26/4, 2017 at 10:17 Comment(0)
I
8

Using ReadableStream you could use a single function to check if elements of current chunk of data is equal to expected result.

Create variables, CHUNK and N, where CHUNK is the number of elements to slice or splice from original array, N is the variable incremented by CHUNK at each .enqueue() call within pull() call.

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(processData)
.then(readComplete)
.catch(err => console.log(err));

Using chained .then()

const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []];

let N = 0;

const stream = new ReadableStream({
  pull(controller) {
    if (N < data.length)
      // slice `N, N += CHUNK` elements from `data`
      controller.enqueue(data.slice(N, N += CHUNK))
    else
      // if `N` is equal to `data.length` call `.close()` on stream
      controller.close()
  }
});

const reader = stream.getReader();

const processData = ({value, done}) => {
  // if stream is closed return `result`; `reader.closed` returns a `Promise`
  if (done) return reader.closed.then(() => result);
  if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read().then(data => processData(data))
  }
}

const readComplete = res => console.log(`result: [${res}]`);

reader.read()
.then(({value, done}) => {
  if ([1,2,3].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    return reader.read()
  }
})
.then(({value, done}) => {
  if ([4,5,6].every((n, index) => n === value[index])) {
    console.log(`N: ${N}, value: [${value}]`)
    result.push(...value);
    // return `result`; `reader.closed` returns a `Promise`
    return reader.closed.then(() => result);
  }
})
.then(readComplete)
.catch(err => console.log(err));

See also Chrome memory issue - File API + AngularJS

Isoclinal answered 13/5, 2017 at 17:13 Comment(4)
can you give a link for some documentation about ReadableStream? the actual question is about Node.js streams which is probably something differentHousebound
@Housebound See Streams Standard, web-streams-polyfillIsoclinal
it's not the expected answer, but I'll probably give you the reward. At least I found something new in itHousebound
@Housebound Not certain if nodejs implements Streams Standard, though that is the specification. You can ask one of the maintainers of the standard or nodejs maintainers if the standard is implemented by nodejs.Isoclinal
D
-1

I think this is something that could help you out - https://github.com/substack/stream-handbook

It's an amazingly detailed handbook with sample code for various streaming scenarios and I'm using the same as a reference for my own project and have found it useful so far! It has sample code in /examples as well

Diandiana answered 9/5, 2017 at 14:29 Comment(2)
I didn't find the answer hereHousebound
It references the npm package called split, that could be applied in your case - npmjs.com/package/split npmjs.com/package/split2Diandiana

© 2022 - 2024 — McMap. All rights reserved.