Possible to make an event handler wait until async / Promise-based code is done?
Asked Answered
B

2

6

I am using the excellent Papa Parse library in nodejs mode, to stream a large (500 MB) CSV file of over 1 million rows, into a slow persistence API, that can only take one request at a time. The persistence API is based on Promises, but from Papa Parse, I receive each parsed CSV row in a synchronous event like so: parseStream.on("data", row => { ... }

The challenge I am facing is that Papa Parse dumps its CSV rows from the stream so fast that my slow persistence API can't keep up. Because Papa is synchronous and my API is Promise-based, I can't just call await doDirtyWork(row) in the on event handler, because sync and async code doesn't mix.

Or can they mix and I just don't know how?

My question is, can I make Papa's event handler wait for my API call to finish? Kind of doing the persistence API request directly in the on("data") event, making the on() function linger around somehow until the dirty API work is done?

The solution I have so far is not much better than using Papa's non-streaming mode, in terms of memory footprint. I actually need to queue up the torrent of on("data") events, in form of generator function iterations. I could have also queued up promise factories in an array and work it off in a loop. Any which way, I end up saving almost the entire CSV file as huge collection of future Promises (promise factories) in memory, until my slow API calls have worked all the way through.

async importCSV(filePath) {
    let parsedNum = 0, processedNum = 0;

    async function* gen() {
        let pf = yield;
        do {
            pf = yield await pf();
        } while (typeof pf === "function");
    };

    var g = gen();
    g.next();


    await new Promise((resolve, reject) => {
        try {
            const dataStream = fs.createReadStream(filePath);
            const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
            dataStream.pipe(parseStream);

            parseStream.on("data", row => {

                // Received a CSV row from Papa.parse()

                try {
                    console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
                    );
                    parsedNum++;

                    // Simulate some really slow async/await dirty work here, for example 
                    // send requests to a one-at-a-time persistence API

                    g.next(() => {  // don't execute now, call in sequence via the generator above
                        return new Promise((res, rej) => {
                            console.log(
                                "DW#", processedNum, ": dirty work START",
                                row.filter((e, i) => i <= 2 ? e : undefined)
                            );
                            setTimeout(() => {
                                console.log(
                                    "DW#", processedNum, ": dirty work STOP ",
                                    row.filter((e, i) => i <= 2 ? e : undefined)
                                );
                                processedNum++;
                                res();
                            }, 1000)
                        })
                    
                    });
                } catch (err) {
                    console.log(err.stack);
                    reject(err);                    
                }
            });
            parseStream.on("finish", () => {
                console.log(`Parsed ${parsedNum} rows`);
                resolve();
            });

        } catch (err) {
            console.log(err.stack);
            reject(err);                    
        }
    });
    while(!(await g.next()).done);
}

So why the rush Papa? Why not allow me to work down the file a bit slower -- the data in the original CSV file isn't gonna run away, we have hours to finish the streaming, why hammer me with on("data") events that I can't seem to slow down?

So what I really need is for Papa to become more of a grandpa, and minimize or eliminate any queuing or buffering of CSV rows. Ideally I would be able to completely sync Papa's parsing events with the speed (or lack thereof) of my API. So if it weren't for the dogma that async code can't make sync code "sleep", I would ideally send each CSV row to the API inside the Papa event, and only then return control to Papa.

Suggestions? Some kind of "loose coupling" of the event handler with the slowness of my async API is fine too. I don't mind if a few hundred rows get queued up. But when tens of thousands pile up, I will run out of heap fast.

Bein answered 5/9, 2020 at 2:12 Comment(3)
use this, microservices.io/patterns/data/transactional-outbox.htmlAraxes
Thanks, if I am understanding your suggestion correctly, I should write Papa's CSV row flood to some "outbox", for example, a Redis key, that can certainly take 1000s of transactions per second. Though Redis again eats working memory. So the only "outbox" that I have that doesn't eat memory is fs.writeFileSync() to intermediate JSON files that I will import slowly later. Quite the acrobatics for what should be a "simple" streamed CSV import. LMK if I understood your suggestion correctly.Bein
transactional-outbox is a design pattern commonly used for reliable message delivery to multiple systems. A robust implementation can give flexibility to control the way you are doing dirtyWork. You can resume from wherever you left, in case of errors, so that you can do error correction, This decouples your receiving and sending logic which has its own advantages. Agreed that it is not simpler to implement and so is anything robust.Araxes
M
10

Why hammer me with on("data") events that I can't seem to slow down?

You can, you just were not asking papa to stop. You can do this by calling stream.pause(), then later stream.resume() to make use of Node stream's builtin back-pressure.

However, there's a much nicer API to use than dealing with this on your own in callback-based code: use the stream as an async iterator! When you await in the body of a for await loop, the generator has to pause as well. So you can write

async importCSV(filePath) {
    let parsedNum = 0;

    const dataStream = fs.createReadStream(filePath);
    const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
    dataStream.pipe(parseStream);

    for await (const row of parseStream) {
        // Received a CSV row from Papa.parse()
        const data = row.filter((e, i) => i <= 2 ? e : undefined);
        console.log("PA#", parsedNum, ": parsed", data);
        parsedNum++;
        await dirtyWork(data);
    }
    console.log(`Parsed ${parsedNum} rows`);
}

importCSV('sample.csv').catch(console.error);

let processedNum = 0;
function dirtyWork(data) {
    // Simulate some really slow async/await dirty work here,
    // for example send requests to a one-at-a-time persistence API
    return new Promise((res, rej) => {
        console.log("DW#", processedNum, ": dirty work START", data)
        setTimeout(() => {
             console.log("DW#", processedNum, ": dirty work STOP ", data);
             processedNum++;
             res();
        }, 1000);
    });
}
Meggie answered 5/9, 2020 at 12:26 Comment(8)
Thanks, I will try this out right now. If it works, we're lucky that we're dealing with a stream that can be paused/resumed. Though I have run into this problem before where the events did not originate from a stream. They are just created "somehow" - any tips what to do in such cases?Bein
@Bein If the events are generated somehow and there's not way to block them, all you can do is to buffer them - or to find a better API to use that does support backpressure.Meggie
Thanks - on a scale of 1 to 10, how certain are we that the backpressure doesn't simply pile up the entire CSV in node's stream buffer, putting us back to square one? Because some streams just can't be kept open for hours or days -- imagine a stream that comes in from the Internet. Do you know of any documentation on that? I will test your solution with a 2 GB file and see if backpressure translates into memory pressure...Bein
@Bein Node's readFileStream will respect backpressure for sure, and I'd expect papa's csv parser to be implemented correctly. If your stream source doesn't support backpressure (notice that TCP, in fact, does), and you cannot process data as fast as it comes in, you've just lost and at some point your app will crash.Meggie
It works, pure genius, thank you! Yay for streams. I tested spooling 100s of 1000s of rows and memory use stayed around 100MB no matter how long it ran. One idea I will try to slow down non-streamed on() events: The Fibers library. It's kind of cheating since it's compiled C++ but hey, better than creating a zillion temp files and writing another queue loop to work through them...Bein
@Bein I don't think using fibers would help or even simplify anything. If the source of asynchronous events is under your control, you need to change it to support backpressure, if it isn't under your control you need to improve your processing speed.Meggie
@Meggie the for await iterates only on 'data' events? say I have a custom 'validation' event, is there a way to for-await on them too?Vanquish
@Vanquish Not to my knowledge, no. You might want to file an issue for this, maybe have stream.iterator() take an event: "validation" in its options bag?Meggie
C
-5

Async code in JavaScript can sometimes be a little hard to grok. It's important to remember how Node operates handles concurrency.

The node process is single-threaded, but it uses a concept called an event loop. The consequence of this is that async code and callbacks are essentially equivalent representations of the same thing.

Of course, you need an async function to use await, but your callback from Papa Parse can be an async function:

parse.on("data", async row => {
  await sync(row)
})

Once the await operation completes, the arrow function ends, and all references to row will be eliminated, so the garbage collector can successfully collect row, releasing that memory.

The effect this has is concurrently executing sync every time a row is parsed, so if you can only sync one record at a time, then I would recommend wrapping the sync function in a debouncer.

Chastitychasuble answered 5/9, 2020 at 4:33 Comment(6)
Just Keep in mind with this solution, you will end up creating a lot of promises, 1 for each data event, which will slow down your app drastically. Better way to handle this to use, transaction outbox pattern microservices.io/patterns/data/transactional-outbox.html.Araxes
No matter what, you're going to be creating 1 million promisesChastitychasuble
Creating 1 million promises is not a problem, problem is having most of them waiting at once. Unbounded code like above should use promises in a controlled manner.Araxes
Secondly, simply putting an async keyword and awaiting inside handler, won't do the trick. Because, your streaming will not wait for handler's promise to resolve.Araxes
Thanks for replying, I tried refactoring the on handler to async, but as commented here, that makes no corresponding await happen on the Papa Parse side. Its code is set in stone. We still have to deal with 1 million simultaneously executed Promises that will hammer my persistence API. In my code I am already serializing promise execution with the generator -- could have been an array of promise factories also -- but regardless the whole point of a streaming CSV processor is that it NOT load the whole database into memory, be it as arrays or Promises waiting to be executed. Ideas?Bein
There must be a super common design pattern that I am missing, to somehow loose-couple the timing of incoming events, with whatever async "slow dirty work" that needs to be done on each such event. There must be a legit way to stem the event flow so that my processing async code has time to stay more or less on top of the flow. I can't be the first person who has this problem...? I can think of lame workarounds such as writing the CSV row via fs.writeFileSync() to another (JSON?) buffer file, or perhaps to hundreds of smaller files to handle later, but then why use Papa in the first place?Bein

© 2022 - 2024 — McMap. All rights reserved.