Extract binary values from stream with low memory consumption
Asked Answered
L

1

8

I'm building a NodeJS server with ExpressJS that processes data (50KB to >100MB) sent via POST request from a desktop app to be processed and returned. The desktop app gzip compresses the data prior to sending (50KB becomes 4KB).

I want the server to decompress the data, extract the values from the data (strings, integers, chars, arrays, json, etc), process that data, and then respond with the processed data.

I started with this:

apiRoute.route("/convert").post(bodyParser.raw({limit: '100Mb'}), (req, res) =>{
    let outputData;
    //extract values from req.body Buffer and do math on them.
    //save processed data in outputData

    res.json({
        status: true,
        data: outputData
    });
});

This works because body-parser decompresses the data into a Buffer req.body stored in memory. That is my main issue... memory usage. I do not want to store the entire dataset in memory.


To resolve this I removed body-parser and instead piped the request stream directly into a zlib transform stream:

apiRoute.route("/convert").post((req, res) =>{
    req.pipe(zlib.createGunzip());
});

The issue now is that I don't know how to extract binary values from the stream.


This is what I would LIKE to be able to do:

apiRoute.route("/convert").post((req, res) =>{
    let binaryStream = new stream.Transform();

    req
    .pipe(zlib.createGunzip())
    .pipe(binaryStream);

    let aValue = binaryStream.getBytes(20);//returns 20 bytes
    let bValue = binaryStream.getBytes(20000);//returns the next 20000 bytes
    //etc...

});

However I don't know of any way to accomplish this. Modules like Dissolve are close, however they require you to set up the parsing logic ahead of time and all the grabbed values are stored in memory.

Plus I don't know how to respond with the outputData without also loading it all into memory.


So my question is, how do I...

  • Read the data from the stream at my own rate asynchronously and extract the values within
  • Send the processed data back to the desktop app without putting it all in memory
Longrange answered 26/3, 2019 at 19:44 Comment(0)
L
2

I solved my own problem. I'm not 100% confident this is the best way to accomplish this, so I'm open to suggestions.

I made a subclass of stream.Transform and implemented the _transform method. I discovered that the next data chunk only gets input when the _transform callback is called. Knowing this, I stored that callback function as a property and only call it when I need the next chunk.

getBytes(size) is a method that will get a specified number of bytes from the current chunk (saved as a property as well) and call the earlier saved callback if the next chunk is needed. This is done recursively to account for varying sizes of chunks and varying number of requested bytes.

Then with a mix of async/await and promises, I was able to keep this entire process asynchronous (afaik) and backpressured.

const {Transform} = require('stream'),
events = require('events');

class ByteStream extends Transform{

    constructor(options){
        super(options);

        this.event_emitter = new events.EventEmitter();
        this.hasStarted = false;
        this.hasEnded = false;
        this.currentChunk;
        this.nextCallback;
        this.pos = 0;

        this.on('finish', ()=>{
            this.hasEnded = true;
            this.event_emitter.emit('chunkGrabbed');
        });
    }

    _transform(chunk, enc, callback){
        this.pos = 0;
        this.currentChunk = chunk;
        this.nextCallback = callback;

        if(!this.hasStarted){
            this.hasStarted = true;
            this.event_emitter.emit('started');
        }
        else{
            this.event_emitter.emit('chunkGrabbed');
        }
    }

    doNextCallback(){
        return new Promise((resolve, reject) =>{
            this.event_emitter.once('chunkGrabbed', ()=>{resolve();});
            this.nextCallback();
        });
    }

    async getBytes(size){
        if(this.pos + size > this.currentChunk.length)
        {
            let bytes = this.currentChunk.slice(this.pos, this.currentChunk.length);

            if(!this.hasEnded)
            {
                var newSize = size-(this.currentChunk.length - this.pos);
                //grab next chunk
                await this.doNextCallback();
                if(!this.hasEnded){
                    this.pos = 0;
                    let recurseBytes; await this.getBytes(newSize).then(bytes => {recurseBytes = bytes;});
                    bytes = Buffer.concat([bytes, recurseBytes]);
                }
            }

            return bytes;
        }
        else{
            let bytes = this.currentChunk.slice(this.pos, this.pos+size);
            this.pos += size;
            return bytes;
        }
    }
}

module.exports = {
    ByteStream : ByteStream 
}

My express route is now:

apiRoute.route("/convert").post((req, res)=>{

    let bStream = new ByteStream({});
    let gStream = zlib.createGunzip();

    bStream event_emitter.on('started', async () => {
        console.log("started!");

        let myValue; await bStream.getBytes(60000).then(bytes => {myValue = bytes});
        console.log(myValue.length);
    });

    req
    .pipe(gStream)
    .pipe(bStream);
});

By checking for an event started I can know when the first chunk was streamed into bStream. From there, it's just a matter of calling getBytes() with my desired byte count and then assigning the promised value to a variable. It does just what I need, although I haven't don't any rigorous testing yet.

Longrange answered 29/3, 2019 at 0:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.