Using Web Streams, create a TransformStream from several TransformStreams
Asked Answered
Y

2

6

Is it possible to create a single TransformStream out of several other TransformStreams using whatwg streams (the web Streams API)?

For example, if I have two TransformStreams which run in sequence, like transformer1 and transformer2:

readable.pipeThrough(transformer1).pipeThrough(transformer2).pipeTo(writable)

Ultimately, I'd like to be able to convert that to

readable.pipeThrough(allTransformers).pipeTo(writable)

Where allTransformers is the TransformStream combining transformer1 and transformer2.

Below is not real functional code, but I'd think there would be a way to do something like this:

const allTransformers = transformer1.pipeThrough(transformer2)

This is clearly a simplified example, but you can imagine there being many transform streams and I'd like to refactor to a single, reusable transform pipeline.

Yama answered 14/6, 2022 at 5:49 Comment(0)
O
8

I had the same issue, here are my solutions.

In the example below, UpperCaseTransformStream pipes the stream through TextDecoderStream, UpperCaseTextStream and TextEncoderStream.

class UpperCaseTextStream extends TransformStream {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });
  }
}

class UpperCaseTransformStream {
  constructor(...strategies) {
    const { writable, readable } = new TransformStream({}, ...strategies);
    this.writable = writable;
    this.readable = readable
      .pipeThrough(new TextDecoderStream())
      .pipeThrough(new UpperCaseTextStream())
      .pipeThrough(new TextEncoderStream());
  }
}

const TEXT_CHUNK =
 "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
 "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new UpperCaseTransformStream());  
(async () => {
  const text = await new Response(readableOuput).text();
  console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);

Here is a more generic implementation using the class PipelineStream which extends TransformStream and accepts an array of TransformStream instances as first parameter.

class UpperCaseTextStream extends TransformStream {
  constructor() {
    super({
      transform(chunk, controller) {
        controller.enqueue(chunk.toUpperCase());
      }
    });
  }
}

class PipelineStream extends TransformStream {
  constructor(transformStreams, ...strategies) {
    super({}, ...strategies);

    const readable = [super.readable, ...transformStreams]
      .reduce((readable, transform) => readable.pipeThrough(transform));

    Object.defineProperty(this, "readable", {
      get() {
        return readable;
      }
    });
  }
}

const TEXT_CHUNK =
 "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
 "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new PipelineStream([
  new TextDecoderStream(),
  new UpperCaseTextStream(),
  new TextEncoderStream()]));
(async () => {
  const text = await new Response(readableOuput).text();
  console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);
Oraleeoralia answered 7/7, 2022 at 22:59 Comment(6)
Do you know if this PipelineStream is able to accept the next chunk before the last TransformStream inside has enqueued the current chunk? I was playing with a slightly different approach and found the PipelineStream waits for each internal TransformStream to finish before accepting the next chunk and calling transform() again, which made it a bit less optimized than explicitly chaining multiple .pipeThroughs.Yama
I don't think this is possible. You can still try to ping the spec authors, that's what I did before implementing the solution. This helped me to validate it (I found the solution before the answer), see twitter.com/check_ca/status/1544817673560784896 and twitter.com/jaffathecake/status/1545387422414180353.Oraleeoralia
BTW, you might mark this question as answered ;)Oraleeoralia
Thanks for clarifying, and yes, this is the best possible answer. Appreciate it!Yama
I would be curious to understand why you chose Object.defineProperty() to set the readable property instead of another way (such as this.readable =).Zhdanov
It's just to make sure the user cannot override it.Oraleeoralia
Z
1

Based on check_ca’s response, I have created an even more generic approach that works a bit better with TypeScript and allows to use also the other methods of ReadableStream, not just pipeThrough (for example tee):

export class PipeableTransformStream<I, O> extends TransformStream<I, O> {
    constructor(transformReadable: (readable: ReadableStream<I>) => ReadableStream<O>, writableStrategy?: QueuingStrategy<I>, readableStrategy?: QueuingStrategy<O>) {
        super({}, writableStrategy);
        const readable = transformReadable(super.readable as any).pipeThrough(new TransformStream({}, undefined, readableStrategy));
        Object.defineProperty(this, "readable", {
            get() {
                return readable;
            }
        });
    }
}

For your example, it would be used like this:

const allTransformers = new PipeableTransformStream((readable) => (
    readable.pipeThrough(transformer1).pipeThrough(transformer2)
));
Zhdanov answered 29/4 at 18:39 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.