How to get streamed data from fetch() inside of page.evaluate() in puppeteer? (node.js)
Asked Answered
P

1

1

Here is a simplified version of my code:

var page;
var launched = false;

app.post("/test", async(req, res) => {

    if ( launched == false ) {
        const browser = await puppeteer.launch({
            headless: true, /* I've tried with "new" and false too */
        });

        page = await browser.newPage();

        var desiredUrl = "url here";
        await page.goto(desiredUrl);

        /* Stream data from the page */
        await page.exposeFunction('writeData', (data) => {
            console.log("Writing data");
            res.write(data);
        });

        /* End stream */
        await page.exposeFunction('endStream', () => {
                console.log("End stream");
                res.end();
        });

        launched = true;
    }

    await page.evaluate(async ()=>{
        var output = await fetch("/endpoint_here", {
    "headers": {
            /* headers here */
               },
        });

        var reader = output.body.getReader();

        while (true) {
            var { done, value } = await reader.read();
            if (done) {
                window.endStream();
                return;
            }
            
            var decoder = new TextDecoder();
            var decodedData = decoder.decode(value);
            window.writeData(decodedData);
        }
    });

})

However, this doesn't work. What I've tried is listed below:

res doesn't work inside of page.evaluate(). I've tried sending res into the page.evaluate(), but it breaks the code.

I've tried using page.exposeFunction() and doing the res.write (and res.end() ) there, and it worked but only for the first time. The second time (and every time after that) where I sent the post request, the code ran properly (it did the console.logs in those functions) except it didn't do the res.write() and res.end() at all.

I've even tried making it update a global variable inside the page.evaluate() using an exposed function, detecting the changes in that variable using a proxy and doing res.write() to write the data, but that also broke after the first post request.

The only fix to this strange problem of it only working the first time is restarting the program, which obviously isn't a solution.

I've also tried logging the stream data to the console in the page and used page.on('console') to res.write() the data back to the client. This worked perfectly with one request at a time. However, when there were multiple simultaneous requests to the endpoint "/test", it would write the response to both clients instead of just the one that initiated the request.

The only thing that DID work was just returning the response from the fetch after it ended without streaming it. However, I want it to be streamed.

I'm stuck and have no idea what to do, so any help would be greatly appreciated.

Prosimian answered 5/4 at 15:50 Comment(8)
res is only available in Node, not in the browser. Return the data back to Node with exposeFunction and call res.end() and res.write() in Node. I guess you tried this--but this is the correct approach, so if you could update your code to show that attempt, it's better to debug that than trying to call res.write() in the browser, which is definitely not going to work. Please also share the POST endpoint, or a reproducible representation of the page you're automating, so that it's possible to debug the "not working after the first time" situation using runnable code. Thanks.Consentaneous
Okay, I updated my post to have the page.exposeFunction() code. I should note that the console.logs in the function run and work all the time, it's just that the res.write() and res.end() don't do anything after the first request. Also, the only thing that worked was just returning the response from the fetch after it ended without streaming it, even with multiple simultaneous requests. However, I want it to be streamed.Prosimian
Thanks. Code looks fine now, so action is still required to determine what's going wrong in your particular case: 'Please also share the POST endpoint, or a reproducible representation of the page you're automating, so that it's possible to debug the "not working after the first time" situation using runnable code'. Thanks.Consentaneous
Can you create a simple express server that streams this? I can create such an example, but I can pretty much guarantee it works, so all this does is prove that your code is OK and the problem is elsewhere. Here's a simple server sent events stream if you need a starter. You mention the format is "plain text" but I bet it's a text/event-stream or similar, or there'd be no way to stream it.Consentaneous
Okay, however I forgot to mention that the page first goes to a desired url first (same domain as the fetch). I updated the code to show this change. Also, when trying to fetch the "/stream" from "/" in the console in the example you sent, it gives me this error in the console: Refused to connect to 'url_here/stream' because it violates the following Content Security Policy directive: "default-src 'none'". Note that 'connect-src' was not explicitly set, so 'default-src' is used as a fallback.Prosimian
That shouldn't make a significant difference. I posted a proof of concept showing that the general setup should be valid. Any problems are due to context you haven't provided yet. If you can't share the URL, please try to recreate it locally.Consentaneous
I'm trying to recreate it locally but it says this when I try to fetch it (it doesn't say this when I do it with my desired url): Refused to connect to 'url_here/stream' because it violates the following Content Security Policy directive: "default-src 'none'". Note that 'connect-src' was not explicitly set, so 'default-src' is used as a fallback.Prosimian
Did you try my example? See also Refused to load the script because it violates the following Content Security Policy directive.Consentaneous
C
0

I'm unable to reproduce the problem. The issue seems to be something to do with the endpoint you're hitting, and/or your server configuration. I suggest sharing that information or trying to build a repro of your own.

Here's my replication attempt in case it helps you. You can see the code works if you run

$ node -v
v20.11.1
$ npm i
$ node sse-endpoint &
$ node server &
$ curl localhost:3001/stream
data: {"chunk":0}

data: {"chunk":1}

data: {"chunk":2}

data: {"chunk":3}

# ... and so on, streamed every second ...

package.json:

{
  "dependencies": {
    "express": "^4.19.2",
    "puppeteer": "^22.6.0"
  }
}

sse-endpoint.js (this is a mock of the remote API you're intercepting):

const express = require("express");
const app = express();

app.use((req, res, next) => {
  res.setHeader("Access-Control-Allow-Origin", "*");
  next();
});

app.get("/stream", (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let counter = 0;
  const interval = setInterval(() => {
    const chunk = JSON.stringify({chunk: counter++});
    res.write(`data: ${chunk}\n\n`);
  }, 1000);

  res.on("close", () => {
    clearInterval(interval);
    res.end();
  });
});

const listener = app.listen(process.env.PORT || 3000, () =>
  console.log(`SSE endpoint is listening on port ${listener.address().port}`)
);

server.js (this is your API):

const express = require("express");
const puppeteer = require("puppeteer");
const app = express();

app.use(express.static("public"));
const browserReady = puppeteer.launch();

app.get("/stream", async (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });
  let page;

  try {
    page = await (await browserReady).newPage();
    await page.goto("about:blank");
    await page.exposeFunction("writeData", data => {
      res.write(data);
    });
    await page.exposeFunction("endStream", () => {
      res.end();
    });
    await page.evaluate(async () => {
      const output = await fetch(
        "http://localhost:3000/stream"
      );
      const reader = output.body.getReader();

      while (!window._aborted) {
        const {done, value} = await reader.read();

        if (done) {
          return window.endStream();
        }

        const decoder = new TextDecoder();
        const decodedData = decoder.decode(value);
        window.writeData(decodedData);
      }
    });
    res.on("close", async () => {
      await page.evaluate("window._aborted = true");
    });
  } catch (err) {
    console.error(err);
    res.end();
  } finally {
    await page?.close();
  }
});

const listener = app.listen(process.env.PORT || 3001, () =>
  console.log(
    `Proxy server is listening on port ${listener.address().port}`
  )
);

Note: this code is for demonstration as a POC and doesn't necessarily demonstrate best practices.

If you want to reuse the page, the following should get you started, although you'll want to be prepared to add some page restart logic in case it crashes, which can happen pretty easily.

I generally recommend avoiding premature optimization and creating a new page per request if possible--they're pretty lightweight. Also, you will likely run into trouble with multiple clients manipulating the same page simultaneously. Each client will need their own request, result and fetch handling context and there are a lot of edge cases to handle.

const express = require("express");
const puppeteer = require("puppeteer");
const app = express();

app.use(express.static("public"));
const browserReady = puppeteer.launch();
const pageReady = (async () => {
  const browser = await browserReady;
  const [page] = await browser.pages();
  await page.exposeFunction("writeData", data => {
    writeData(data);
  });
  await page.exposeFunction("endStream", () => {
    endStream();
  });
  return page;
})();
let writeData = () => {};
let endStream = () => {};

app.get("/stream", async (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  try {
    const page = await pageReady;
    await page.goto("about:blank");
    writeData = data => res.write(data);
    endStream = () => res.end();
    res.on("close", async () => {
      await page.evaluate("window._aborted = true");
    });
    await page.evaluate(async () => {
      try {
        const output = await fetch(
          "http://localhost:3000/stream"
        );
        const reader = output.body.getReader();

        while (!window._aborted) {
          const {done, value} = await reader.read();

          if (done) {
            return window.endStream();
          }

          const decoder = new TextDecoder();
          const decodedData = decoder.decode(value);
          window.writeData(decodedData);
        }
      } catch (err) {
        window.writeData(decodedData);
      }
    });
  } catch (err) {
    console.error(err);
    res.end();
  }
});

const listener = app.listen(process.env.PORT || 3001, () =>
  console.log(
    `Proxy server is listening on port ${listener.address().port}`
  )
);

If the SSE endpoint is streamed by another page, that shouldn't impact this repro. You can have sse-endpoint serve a HTML file and run page.goto("localhost:3000") before running the evaluate()/fetch(); it shouldn't make a difference. Make sure you're intercepting or consuming your actual endpoint correctly. Likely, the problem lies there and details matter at that stage.

Depending on what you're trying to achieve (I'm guessing something like proxying a GPT chat feed?), there may be a much simpler way to achieve whatever the fundamental goal is--another reason why full context is important.

Consentaneous answered 5/4 at 18:21 Comment(17)
I think I've figured out the problem, but I don't know how to solve it. I edited my original post to show this, but I have a variable called "launched" which checks if the browser and page is launched yet or not. It's meant to stay open the entire time. However, when I change the code to make it so that the code to launch runs every time a request is made and the browser is closed at the end of each request, the code works perfectly. That means the problem has something to do with the browser staying open after the first request. Check my original post to see the updated code.Prosimian
I see, good work. const browser = ... is scoped to the if block, so after the first launch, that block won't run again and there's no browser to read browser.newPage() from. See my example for a better approach than a boolean.Consentaneous
I don't want the page to be closed at the end of the request though. When I remove the line to close the page, it works, but it adds a new page every time a new request is made. How do I prevent that from happening and keep it on the same page?Prosimian
Move the page variable out to the global scope too. If you have await at top level (Node 20+ modules), use that, otherwise you can await the promises in the route as I'm doing with browser. Or put all of the routes in an async IIFE. Keep in mind you'll probably need to handle errors with the page and be prepared to restart it. Launching new pages on the same browser seems to be the best approach in many cases--it's fast enough and keeps state from getting weird. But if it makes sense to keep one page open for your use case, go ahead. LMK if you can't figure it out and I'll update.Consentaneous
I'm confused as to what to put for the page code. If possible, can you edit your answer to show what to put for that?Prosimian
I played around with it, but it's actually pretty annoying to keep the page open because you have to find a way to close the fetch stream if a client disconnects. I suggest closing the page and using a new one for each request. Also, multiple clients manipulating the same page concurrently can cause all kinds of misbehavior. If the page crashes, you'll need to manage restarting it. Can you provide context for why you need to use a single page?Consentaneous
My main worry with adding new pages for each request is high load on RAM if there are many concurrent clients. However, I don't know anything for sure since I haven't tried it. Would the benefits of using a new page for each request outweigh the concern about RAM?Prosimian
Yeah, RAM might be a consideration. I'd start with a new page on each request, which is easier to code, manage and ensure correctness, then if you run into memory problems, look into optimizing it later.Consentaneous
Okay, thank you! Also, a few quick questions: 1. What's the difference between await page?.close(); and await page.close()? 2. How can I handle closing the page when the client connection is closed without it throwing an error?Prosimian
1. Optional chaining. If the property doesn't exist, which might happen if the page can't be created, the expression returns undefined. 2. I'm not totally sure what you mean but I think it might be related to an issue in my demo, which is the evaluate can keep running even if the client disconnects. I think the server can just call res.end() and pass a boolean into the evaluate() to stop the while loop. Is that what you're referring to? My snippets are just POCs, lots of room for tweaks.Consentaneous
No, I want the page to close immediately when the connection is closed to avoid unecessary RAM usage and slower speeds since the page isn't in use anymore.Prosimian
Yeah, I think we're on the same page then. See my last comment.Consentaneous
I passed a boolean into the page.evaluate() and changed it when the connection closed. I made it so that it wouldn't do res.write() anymore if the value of the boolean changed. However it didn't work, and I think putting a variable into page.evaluate() just makes a copy of it for each execution so it wouldn't know if it's updated or not.Prosimian
It might be time to open a fresh question with your latest code, since we've covered a lot of ground, hopefully reproducible with a complete example similar to the one I cooked up here. Any variables you pass into evaluate that you need to share between calls need to be added to the window, so if the loop is for (window.running = true; window.running;) {} and your evaluate does something like window.running = false, then it should break the loop. fetch also has an abort controller that might be more "elegant".Consentaneous
I'm a little confused can you please update your code to do that?Prosimian
Also, the function to check if the connection is closed is outside of the page.evaluate(). How would I make it so that the page knows when it's closed?Prosimian
Here's the flow: res.on("close", () => { res.end(); page.evaluate("window.running = false"); }), plus the modification to the reader loop I gave you above. Note that if multiple clients share the page, then running needs to become an object with different client ids managing each running status. Like I said, it's a headache to use the same page for everyone so I wouldn't do it unless I absolutely had to.Consentaneous

© 2022 - 2024 — McMap. All rights reserved.