Changing the target of a `whenever` block from the inside
Asked Answered
D

1

7

The following code attempts to react to one Supply and then, based on the content of some message, change its mind and react to messages from a different Supply. It's an attempt to provide similar behavior to Supply.migrate but with a bit more control.

my $c1 = Supplier.new;
my $c2 = Supplier.new;

my $s = supply {
    my $currently-listening-to = $c1.Supply;
    my $other-var = 'foo';
    whenever $currently-listening-to {
        say "got: $_";
        if .starts-with('3') {
            say "listening to something new";
            $currently-listening-to = $c2.Supply;
            $other-var = 'bar';
            say $other-var;
        }
    }
}

$s.tap;

for ^7 {
    $c1.emit: "$_ from \$c1";
    $c2.emit: "$_ from \$c2";
}
sleep 10;

If I understand the semantics of supply blocks correctly (highly doubtful!), this block should have exclusive and mutable access to any variables declared inside the supply block. Thus, I expected this to get the first 4 values from $c1 and then switch to $c2. However, it doesn't. Here's the output:

ot: 0 from $c1
got: 1 from $c1
got: 2 from $c1
got: 3 from $c1
listening to something new
bar
got: 4 from $c1
got: 5 from $c1
got: 6 from $c1

As that output shows, changing $other-var worked just as I expected it to, but the attempt to change $currently-listening-to failed (silently).

Is this behavior correct? If so, what am I missing about the semantics of supply blocks/other constructs that explains this behavior? I got the same results with react blocks and when using a Channel instead of a Supply, so the behavior is consistent across several multiple concurrency constructs.

(In the interest of avoiding an X-Y problem, the use case that triggered this question was an attempt implement Erlang-style error handling. To do so, I wanted to have a supervising supply block that listened to its children and could kill/re-launch any children that got into a bad state. But that means listening to the new children – which led directly to the issue described above.)

Dunite answered 7/10, 2021 at 3:48 Comment(0)
V
9

I tend to consider whenever as the reactive equivalent of for. (It even supports the LAST loop phaser for doing something when the tapped Supply is done, as well as supporting next, last, and redo like an ordinary for loop!) Consider this:

my $x = (1,2,3);
for $x<> {
    .say;
    $x = (4,5,6);
}

The output is:

1
2
3

Because at the setup stage of a for loop, we obtain an iterator, and then work through that, not reading $x again on each iteration. It's the same with whenever: it taps the Supply and then the body is invoked per emit event.

Thus another whenever is needed to achieve a tap of the next Supply, while simultaneously closing the tap on the current one. When there are just two Supplys under consideration, the easy way to write it is like this:

my $c1 = Supplier.new;
my $c2 = Supplier.new;

my $s = supply {
    whenever $c1 {
        say "got: $_";
        if .starts-with('3') {
            say "listening to something new";
            # Tap the next Supply...
            whenever $c2 {
                say "got: $_";
            }
            # ...and close the tap on the current one.
            last;
        }
    }
}

$s.tap;

for ^7 {
    $c1.emit: "$_ from \$c1";
    $c2.emit: "$_ from \$c2";
}

Which will produce:

got: 0 from $c1
got: 1 from $c1
got: 2 from $c1
got: 3 from $c1
listening to something new
got: 3 from $c2
got: 4 from $c2
got: 5 from $c2
got: 6 from $c2

(Note that I removed the sleep 10 because there's no need for it; we aren't introducing any concurrency in this example, so everything runs synchronously.)

Clearly, if there were a dozen Supplys to move between then this approach won't scale so well. So how does migrate work? The key missing piece is that we can obtain the Tap handle when working with whenever, and thus we are able to close it from outside of the body of that whenever. This is exactly how migrate works (copied from the standard library, with comments added):

method migrate(Supply:D:) {
    supply {
        # The Tap of the Supply we are currently emitting values from
        my $current;
        # Tap the Supply of Supply that we'll migrate between
        whenever self -> \inner {
            # Make sure we produce a sensible error
            X::Supply::Migrate::Needs.new.throw
                unless inner ~~ Supply;
            # Close the tap on whatever we are currently tapping
            $current.close if $current;
            # Tap the new thing and store the Tap handle
            $current = do whenever inner -> \value {
                emit(value);
            }
        }
    }
}

In short: you don't change the target of the whenever, but rather start a new whenever and terminate the previous one.

Vandalize answered 7/10, 2021 at 10:20 Comment(2)
”I tend to consider whenever as the reactive equivalent of for". Thanks, that's very helpful – I was thinking of supply/whenever as the reactive equivalent of for/when, so I was expecting to be able to change the "condition" of the whenever the way I can change the one for the when. But I see now that whenever is actually doing the (reactive) "iteration", not just matching on the iteration's result the way when doesDunite
Indeed. The equivalent to supply in iterable space is gather. For example, one could do gather for @outers -> @inners { for @inners -> $value { take $value; } }, producing a result iterable from multiple others.Vandalize

© 2022 - 2024 — McMap. All rights reserved.