In Deno, like in a web browser, you should be able to use Web Workers to utilize 100% of a multi-core CPU.
In a cluster you need a "manager" node (which can be a worker itself too as needed/appropriate). In a similar fashion the Web Worker API can be used to create however many dedicated workers as desired. This means the main thread should never block as it can delegate all tasks that will potentially block to its workers. Tasks that won't block (e.g. simple database or other I/O bound calls) can be done directly on the main thread like normal.
Deno also supports navigator.hardwareConcurrency
so you can query about available hardware and determine the number of desired workers accordingly. You might not need to define any limits though. Spawning a new dedicated worker from the same source as a previously spawned dedicated worker may be fast enough to do so on demand. Even so there may be value in reusing dedicated workers rather than spawning a new one for every request.
With Transferable Objects large data sets can be made available to/from workers without copying the data. This along with messaging makes it pretty straight forward to delegate tasks while avoiding performance bottlenecks from copying large data sets.
Depending on your use cases you might also use a library like Comlink "that removes the mental barrier of thinking about postMessage
and hides the fact that you are working with workers."
e.g.
main.ts
import { serve } from "https://deno.land/[email protected]/http/server.ts";
import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
serve(async function handler(request) {
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
});
const handler = ComlinkRequestHandler.wrap(worker);
return await handler(request);
});
worker.ts
/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />
import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
ComlinkRequestHandler.expose(async (request) => {
const body = await request.text();
return new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}\n`);
});
ComlinkRequestHandler.ts
import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";
interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
url: string;
headers: Record<string, string>;
hasBody: boolean;
}
interface ResponseMessage extends ResponseInit {
headers: Record<string, string>;
hasBody: boolean;
}
export default class ComlinkRequestHandler {
#handler: (request: Request) => Promise<Response>;
#responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;
static expose(handler: (request: Request) => Promise<Response>) {
Comlink.expose(new ComlinkRequestHandler(handler));
}
static wrap(worker: Worker) {
const { handleRequest, nextResponseBodyChunk } =
Comlink.wrap<ComlinkRequestHandler>(worker);
return async (request: Request): Promise<Response> => {
const requestBodyReader = request.body?.getReader();
const requestMessage: RequestMessage = {
url: request.url,
hasBody: requestBodyReader !== undefined,
cache: request.cache,
credentials: request.credentials,
headers: Object.fromEntries(request.headers.entries()),
integrity: request.integrity,
keepalive: request.keepalive,
method: request.method,
mode: request.mode,
redirect: request.redirect,
referrer: request.referrer,
referrerPolicy: request.referrerPolicy,
};
const nextRequestBodyChunk = Comlink.proxy(async () => {
if (requestBodyReader === undefined) return undefined;
const { value } = await requestBodyReader.read();
return value;
});
const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
requestMessage,
nextRequestBodyChunk
);
const responseBodyInit: BodyInit | null = responseHasBody
? new ReadableStream({
start(controller) {
async function push() {
const value = await nextResponseBodyChunk();
if (value === undefined) {
controller.close();
return;
}
controller.enqueue(value);
push();
}
push();
},
})
: null;
return new Response(responseBodyInit, responseInit);
};
}
constructor(handler: (request: Request) => Promise<Response>) {
this.#handler = handler;
}
async handleRequest(
{ url, hasBody, ...init }: RequestMessage,
nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
): Promise<ResponseMessage> {
const request = new Request(
url,
hasBody
? {
...init,
body: new ReadableStream({
start(controller) {
async function push() {
const value = await nextRequestBodyChunk();
if (value === undefined) {
controller.close();
return;
}
controller.enqueue(value);
push();
}
push();
},
}),
}
: init
);
const response = await this.#handler(request);
this.#responseBodyReader = response.body?.getReader();
return {
hasBody: this.#responseBodyReader !== undefined,
headers: Object.fromEntries(response.headers.entries()),
status: response.status,
statusText: response.statusText,
};
}
async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
if (this.#responseBodyReader === undefined) return undefined;
const { value } = await this.#responseBodyReader.read();
return value;
}
}
Example usage:
% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar
Received:
{"answer":42}
There's probably a better way to do this (e.g. via Comlink.transferHandlers
and registering transfer handlers for Request
, Response
, and/or ReadableStream
) but the idea is the same and will handle even large request or response payloads as the bodies are streamed via messaging.