Deno on multi-core machines
Asked Answered
A

3

9

In Node.js there is the cluster module to utilize all available cores on the machine which is pretty great, especially when used with the node module pm2. But I am pretty stoked about some features of Deno but I have wondered about how to best run it on a multi-core machine.

I understand that there is workers which works great for a specific task but for normal web requests it seems like performance of multi-core machines is wasted somewhat? What is the best strategy to get maximum availability and utilization of my hardware in Deno?

I am a bit worried that if you only have a single process going on and there is some CPU intensive task for whatever reason it will "block" all other requests coming in. In node.js the cluster module would solve this, since another process would handle the request but I am unsure on how to handle this in Deno?

I think you could run several instances in Deno on different ports and then have some kind of load balancer in front of it but that seems like quite a complex setup in comparison. I also get that you could use some kind of service like Deno Deploy or whatever, but I already have hardware that I want to run it on.

What are the alternatives for me? Thanks in advance for you sage advice and better wisdom.

Anglonorman answered 4/4, 2022 at 11:20 Comment(1)
Workers and the subprocess API are the only multi-thread abstractions in Deno. It sounds like what you want is a pool abstraction on top of the Worker API. Nothing like this exists natively yet, but it seems likely that implementations have already been written. Have you already searched for something like that?Champlain
S
11

In Deno, like in a web browser, you should be able to use Web Workers to utilize 100% of a multi-core CPU.

In a cluster you need a "manager" node (which can be a worker itself too as needed/appropriate). In a similar fashion the Web Worker API can be used to create however many dedicated workers as desired. This means the main thread should never block as it can delegate all tasks that will potentially block to its workers. Tasks that won't block (e.g. simple database or other I/O bound calls) can be done directly on the main thread like normal.

Deno also supports navigator.hardwareConcurrency so you can query about available hardware and determine the number of desired workers accordingly. You might not need to define any limits though. Spawning a new dedicated worker from the same source as a previously spawned dedicated worker may be fast enough to do so on demand. Even so there may be value in reusing dedicated workers rather than spawning a new one for every request.

With Transferable Objects large data sets can be made available to/from workers without copying the data. This along with messaging makes it pretty straight forward to delegate tasks while avoiding performance bottlenecks from copying large data sets.

Depending on your use cases you might also use a library like Comlink "that removes the mental barrier of thinking about postMessage and hides the fact that you are working with workers."

e.g.

main.ts

import { serve } from "https://deno.land/[email protected]/http/server.ts";

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

serve(async function handler(request) {
  const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
    type: "module",
  });

  const handler = ComlinkRequestHandler.wrap(worker);

  return await handler(request);
});

worker.ts

/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

ComlinkRequestHandler.expose(async (request) => {
  const body = await request.text();
  return new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}\n`);
});

ComlinkRequestHandler.ts

import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";

interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
  url: string;
  headers: Record<string, string>;
  hasBody: boolean;
}

interface ResponseMessage extends ResponseInit {
  headers: Record<string, string>;
  hasBody: boolean;
}

export default class ComlinkRequestHandler {
  #handler: (request: Request) => Promise<Response>;
  #responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;

  static expose(handler: (request: Request) => Promise<Response>) {
    Comlink.expose(new ComlinkRequestHandler(handler));
  }

  static wrap(worker: Worker) {
    const { handleRequest, nextResponseBodyChunk } =
      Comlink.wrap<ComlinkRequestHandler>(worker);

    return async (request: Request): Promise<Response> => {
      const requestBodyReader = request.body?.getReader();

      const requestMessage: RequestMessage = {
        url: request.url,
        hasBody: requestBodyReader !== undefined,
        cache: request.cache,
        credentials: request.credentials,
        headers: Object.fromEntries(request.headers.entries()),
        integrity: request.integrity,
        keepalive: request.keepalive,
        method: request.method,
        mode: request.mode,
        redirect: request.redirect,
        referrer: request.referrer,
        referrerPolicy: request.referrerPolicy,
      };

      const nextRequestBodyChunk = Comlink.proxy(async () => {
        if (requestBodyReader === undefined) return undefined;
        const { value } = await requestBodyReader.read();
        return value;
      });

      const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
        requestMessage,
        nextRequestBodyChunk
      );

      const responseBodyInit: BodyInit | null = responseHasBody
        ? new ReadableStream({
            start(controller) {
              async function push() {
                const value = await nextResponseBodyChunk();
                if (value === undefined) {
                  controller.close();
                  return;
                }
                controller.enqueue(value);
                push();
              }

              push();
            },
          })
        : null;

      return new Response(responseBodyInit, responseInit);
    };
  }

  constructor(handler: (request: Request) => Promise<Response>) {
    this.#handler = handler;
  }

  async handleRequest(
    { url, hasBody, ...init }: RequestMessage,
    nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
  ): Promise<ResponseMessage> {
    const request = new Request(
      url,
      hasBody
        ? {
            ...init,
            body: new ReadableStream({
              start(controller) {
                async function push() {
                  const value = await nextRequestBodyChunk();
                  if (value === undefined) {
                    controller.close();
                    return;
                  }
                  controller.enqueue(value);
                  push();
                }

                push();
              },
            }),
          }
        : init
    );
    const response = await this.#handler(request);
    this.#responseBodyReader = response.body?.getReader();
    return {
      hasBody: this.#responseBodyReader !== undefined,
      headers: Object.fromEntries(response.headers.entries()),
      status: response.status,
      statusText: response.statusText,
    };
  }

  async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
    if (this.#responseBodyReader === undefined) return undefined;
    const { value } = await this.#responseBodyReader.read();
    return value;
  }
}

Example usage:

% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar

Received:

{"answer":42}

There's probably a better way to do this (e.g. via Comlink.transferHandlers and registering transfer handlers for Request, Response, and/or ReadableStream) but the idea is the same and will handle even large request or response payloads as the bodies are streamed via messaging.

Synecdoche answered 4/4, 2022 at 23:56 Comment(9)
Yes I know this and it kind of is an explanation but it still only is for specific use cases. I guess one could put every single path / operation on a web api (as an example) on it's own web worker but it will create a lot of setup to make it work. If not, you will still have the issue with that something can be slow before or after the work in the web worker. The web worker api is a bit like worker threads in node, but one usually don't use those instead of the cluster module/api.Anglonorman
I'm interested to learn what specific use case multiple workers won't work well in. A web api (for example) often doesn't even need any web workers as database calls don't block, etc. The only scenarios I can think of where web workers really come in handy is where a server is computing something in memory and takes time. Such can be delegated to a worker and the main thread is still then fully available for non-blocking requests or even other blocking ones to delegate to its workers pool.Synecdoche
What I am talking about is unexpected computing in memory for example. Maybe I write an endpoint today and doesn't take into account that this endpoint can grow. With more users and when the data grows, suddenly the endpoint gets slow because there just a lot more data to process for example. This has happened to me with node and basically crashed the app until I could spare time to fix it. I know it's pretty much solvable but at least the cluster module in node for example would protect against this somewhat.Anglonorman
Oh, I think I'm starting to understand better and you explained it earlier but it didn't click for me. 🤔 Yeah, I could see delegating every call, even trivial ones, to workers might be an annoying setup but maybe not. Every work can be spawned from the same source so I think it's really just a matter of forwarding the entire request and response to and from a worker. I've not used node cluster but I imagine that's basically what it is doing.Synecdoche
I guess you could do that and I even tried it with Oak the the entire context object. The error I got then was that it couldn't copy/clone the entire object to the worker, probably because it has a lot of functions and stuff on it. Not sure why it didn't really work but then I would have to copy just the parameters it would need to do it's work. Seems like a very complex way to work and that's why I started this question since I wonder how other people do it or if they just use the single thread hoping for the best?Anglonorman
I've updated my answer with some additional thoughts around managing workers as well as a call-out about a library called Comlink. I don't currently know of better ways to do what you're talking about and I think with minimal effort you should be able to delegate all calls to homogenous workers which I think will help keep a solution simple enough.Synecdoche
Let us continue this discussion in chat.Synecdoche
Thanks @Synecdoche I will check out comlink as I've used it before with success. Forgot about that library though. If there is no other reasonably way to do it I will award you the rep ;)Anglonorman
If I'm able I will update the Comlink example using proxies & transfer handlers to simplify things (see github.com/GoogleChromeLabs/comlink/issues/585).Synecdoche
U
2

It all depends on what workload you would like to push to the threads. If you are happy with the performance of the built in Deno HTTP server running on the main thread but you need to leverage multithreading to create the responses more efficiently then it's simple as of Deno v1.29.4.

The HTTP server will give you an async iterator server like

import { serve } from "https://deno.land/std/http/server.ts";

const server = serve({ port: 8000 });

Then you may use the built in functionality pooledMap like

import { pooledMap } from "https://deno.land/[email protected]/async/pool.ts";

const ress = pooledMap( window.navigator.hardwareConcurrency - 1
                      , server
                      , req => new Promise(v => v(respondWith(req))
                      );

for await (const res of ress) {
  // respond with res
}

Where respondWith is just a function which handles the recieved request and generates the respond object. If respondWith is already an async function then you don't even need to wrap it into a promise.

However, in case you would like to run multiple Deno HTTP servers on separate therads then that's also possible but you need a load balancer like GoBetween at the head. In this case you should instantiate multiple Deno HTTP servers at separate threads and receive their requsets at the main thread as separate async iterators. To achieve this, per thread you can do like;

At the worker side i.e. ./servers/server_800X.ts;

import { serve } from "https://deno.land/std/http/server.ts";

const server = serve({ port: 800X });
console.log("Listening on http://localhost:800X/");

for await (const req of server) {
  postMessage({ type: "request", req });
}

and at the main thread you can easily convert the correspodning worker http server into an async iterator like

async function* server_800X() {
  worker_800X.onmessage = event => {
    if (event.data.type === "request") {
      yield event.data.req;
    }
  };
}

for await (const req of server_800X()) {
  // Handle the request here in the main thread
}

You should also be able to multiplex either the HTTP (req) or the res async iterators by using the MuxAsyncIterators functionality in to a single stream and then spawn by pooledMap. So if you have 2 http servers working on server_8000.ts and server_8001.ts then you can multiplex them into a single async iterator like

const muxedServer = new MuxAsyncIterator<Request>();
muxedServer.add(server_8000);
muxedServer.add(server_8001);
for await (const req of muxedServer) {
  // repond accordingly(*)
}

Obviously you should also be able to spawn new threads to process requests received from the muxedServer by utilizing pooledMap as shown above.

(*) In case you choose to use a load balancer and multiple Deno http servers then you should assign special headers to the requests at the load balancer, designating the server ID that it's been diverted to. This way, by inspecting this speical header you can decide from which server to respond for any particular request.

Unconsidered answered 22/1, 2023 at 20:19 Comment(4)
Why is window.navigator.hardwareConcurrency-1 being used as the concurrency for pooledMap in your example? The pooledMap function isn't creating workers - it's just event-loop concurrency (i.e. for async stuff), rather than threaded/hardware concurrency. It's still using a single thread. Or am I misunderstanding something here?Nuclide
@Nuclide You are right pooledMap indeed does not involve threads but the promisified mapping function could benefit from workers as mentioned in the first paragraph of my answer.Unconsidered
My point is, there's no use of workers in that code at all, right? So it doesn't make sense to use navigator.hardwareConcurrency. It makes it look like poolMap is parallelizing something, when in fact that code is all running on the same (main) thread.Nuclide
@Nuclide There is use of workers in that code. It's just hidden behind the respondWith function which spawns n-1 promisified workers. pooledMap relieves you from the burden of responding to the received requests in each turn of for await req of server loop. In other words you can spawn n-1 responder web workers then a pooledMap setup with n-1 concurrency can feed all responders with requests at once and return an async iterable of responses. This answers OP's question by showing a way to utilize web workers in Deno HTTP Server APIs.Unconsidered
S
1

window.navigator.hardwareConcurrency - 1 Blockquote

Deno use up 8 threads for IO operations, garbage collecting and other utility operations, which giving to you a more efficient main thread processing. In a result, calculation of most effective core configuration can be non-trivial process and require a big number of practical tests. On intuitive level I will be going to create like 11–12 workers for 16-core server, but additional validation is needed.

Schuyler answered 15/11, 2023 at 19:19 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.