How to "multicast" an async iterable?
Asked Answered
C

3

10

Can an async generator be somehow broadcast or multicast, so that all its iterators ("consumers"? subscribers?) receive all values?

Consider this example:

const fetchMock = () => "Example. Imagine real fetch";
async function* gen() {
  for (let i = 1; i <= 6; i++) {
    const res = await fetchMock();
    yield res.slice(0, 2) + i;
  }
}
const ait = gen();

(async() => {
  // first "consumer"
  for await (const e of ait) console.log('e', e);
})();
(async() => {
  // second...
  for await (const é of ait) console.log('é', é);
})();

Iterations "consume" a value, so only one or the other gets it. I would like for both of them (and any later ones) to get every yielded value, if such a generator is possible to create somehow. (Similar to an Observable.)

Cousin answered 23/8, 2020 at 4:40 Comment(2)
Is it possible for you to change the design so that gen accepts callback functions that are applied to each yielded thing? Is there a specific reason you want / need to use generators?Kinetics
@Kinetics Yes gen could accept a callback. I'm using async generators because they are a built-in feature of JS, otherwise I would use an Observable for this but I thought they do almost the same thing (except for the ability to "multicast")Cousin
G
6

This is not easily possible. You will need to explicitly tee it. This is similar to the situation for synchronous iterators, just a bit more complicated:

const AsyncIteratorProto = Object.getPrototypeOf(Object.getPrototypeOf(async function*(){}.prototype));
function teeAsync(iterable) {
    const iterator = iterable[Symbol.asyncIterator]();
    const buffers = [[], []];
    function makeIterator(buffer, i) {
        return Object.assign(Object.create(AsyncIteratorProto), {
            next() {
                if (!buffer) return Promise.resolve({done: true, value: undefined});
                if (buffer.length) return buffer.shift();
                const res = iterator.next();
                if (buffers[i^1]) buffers[i^1].push(res);
                return res;
            },
            async return() {
                if (buffer) {
                    buffer = buffers[i] = null;
                    if (!buffers[i^1]) await iterator.return();
                }
                return {done: true, value: undefined};
            },
        });
    }
    return buffers.map(makeIterator);
}

You should ensure that both iterators are consumed at about the same rate so that the buffer doesn't grow too large.

Gleich answered 23/8, 2020 at 18:35 Comment(2)
Wow, thank you, indeed it's not trivial. Hoping to find a good library that contains this and other abstractions for async iterators.Cousin
@ᆼᆺᆼ I guess you'd find something like this in a CSP library with channelsGleich
C
0

Here's a solution using Highland as an intermediary. From the docs:

A stream forked to multiple consumers will pull values, one at a time, from its source as only fast as the slowest consumer can handle them.

import _ from 'lodash'
import H from 'highland'

export function fork<T>(generator: AsyncGenerator<T>): [
    AsyncGenerator<T>,
    AsyncGenerator<T>
] {
    const source = asyncGeneratorToHighlandStream(generator).map(x => _.cloneDeep(x));
    return [
        highlandStreamToAsyncGenerator<T>(source.fork()),
        highlandStreamToAsyncGenerator<T>(source.fork()),
    ];
}

async function* highlandStreamToAsyncGenerator<T>(
    stream: Highland.Stream<T>
): AsyncGenerator<T> {
    for await (const row of stream.toNodeStream({ objectMode: true })) {
        yield row as unknown as T;
    }
}

function asyncGeneratorToHighlandStream<T>(
    generator: AsyncGenerator<T>
): Highland.Stream<T> {
    return H(async (push, next) => {
        try {
            const result = await generator.next();
            if (result.done) return push(null, H.nil);
            push(null, result.value);
            next();
        } catch (error) {
            return push(error);
        }
    });
}

Usage:

const [copy1, copy2] = fork(source);

Works in Node, browser untested.

Corena answered 18/9, 2021 at 23:44 Comment(2)
Does this work in browsers or only on Node?Cousin
I assume Node only, because of how highlandStreamToAsyncGenerator is implemented.Corena
R
-1

I built a library to do this here: https://github.com/tjenkinson/forkable-iterator

Means you can do something like:

import { buildForkableIterator, fork } from 'forkable-iterator';

function* Source() {
  yield 1;
  yield 2;
  return 'return';
}

const forkableIterator = buildForkableIterator(Source());

console.log(forkableIterator.next()); // { value: 1, done: false }

const child1 = fork(forkableIterator);
// { value: 2, done: false }
console.log(child1.next());
// { value: 2, done: false }
console.log(forkableIterator.next());

// { value: 'return', done: true }
console.log(child1.next());
// { value: 'return', done: true }
console.log(forkableIterator.next());

If you no longer need to keep consuming from a fork providing you loose references to it there also shouldn’t be a memory leak.

Rayborn answered 20/8, 2022 at 11:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.