Can a shared supply run multiple tap blocks simultaneously?
Asked Answered
B

2

13

Consider this code where a tap takes awhile to complete. All the blocks are running simultaneously (immediately outputting) then sleeping. Most don't finish because the program ends sooner then they do:

my $supply = Supply.interval(0.2);
my $tap = $supply.tap: { say "1 $^a"; sleep 5;  };
sleep 5;

The output (elided) has 25 lines (one for each tick of 0.2 in 5 seconds):

1. 0
1. 1
...
1. 24

Then I change that supply to .share:

my $supply = Supply.interval(0.2).share;
my $tap = $supply.tap: { say "1. $^a"; sleep 5 };
sleep 5;

I only see one line of input but I expected the same output:

1. 1

The .share makes it possible for multiple taps to get the same values.

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Still the output has output only for the first tap and still has only one line. I expected 25 lines for each:

1. 1
Broker answered 28/3, 2018 at 16:18 Comment(0)
B
13

The basic rules for Supply are:

  1. No introduction of concurrency without it being explicitly asked for
  2. Back-pressure through a sender-pays model
  3. A message is processed in full before the next one (so .map({ ...something with state... }) can be trusted not to cause conflicts over the state)

Rule 3 doesn't really apply to share since there's separate downstream operation chains after that point, but rules 1 and 2 do. The purpose of share is to allow publish/subscribe, and also to provide for re-use of a chunk of processing by multiple downstream message processors. Introducing parallel message processing is a separate concern from this.

The are various options. One is to have the messages for parallel processing stuck into a Channel. This explicitly introduces a place for the messages to be buffered (well, until you run out of memory...which is exactly why Supply comes with a sender-pays back-pressure model). Coercing a Channel back into a Supply gets the values pulled from the Channel and emitted on that Supply on a pool thread. That way looks like:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Note that since whenever automatically coerces the thing it's asked to react to to a Supply, then that'd look like whenever $supply.Channel { }, which makes it a pretty short solution - but at the same time nicely explicit in that it indicates how the normal back-pressure mechanism is being side-stepped. The other property of this solution is that it retains the order of the messages and still gives one-at-a-time processing downstream of the Channel.

The alternative is to react to each message by instead starting some asynchronous piece of work to handle it. The start operation on a Supply schedules the block it is passed to run on the thread pool for each message that is received, thus not blocking the arrival of the next message. The result is a Supply of Supply. This forces one to tap each inner Supply to actually make anything happen, which seems slightly counter-intuitive at first, but actually is for the good of the programmer: it makes it clear there's an extra bit of async work to keep track of. I very strongly suggest using this in combination with the react/whenever syntax, which does subscription management and error propagation automatically. The most direct transformation of the code in the question is:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Although it's also possible to instead write it as:

my $supply = Supply.interval(0.2).share;
my $tap  = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

Which points to the possibility writing a parallelize Supply combinator:

my $supply = Supply.interval(0.2).share;
my $tap  = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a";  };
sleep 5;

sub parallelize(Supply $messages, &operation) {
    supply {
        whenever $messages -> $value {
            whenever start operation($value) {
                emit $_;
            }
        }
     }
}

The output of this approach is rather different from the Channel one, since the operations are all kicked off as soon as the message comes in. Also it doesn't retain message order. There's still an implicit queue (unlike the explicit one with the Channel approach), it's just that now it's the thread pool scheduler's work queue and the OS scheduler that has to keep track of the in-progress work. And again, there's no back-pressure, but notice that it would be entirely possible to implement that by keeping track of outstanding Promises and blocking further incoming messages with an await Promise.anyof(@outstanding).

Finally, I'll note that there is some consideration of hyper whenever and race whenever constructs to provide some language-level mechanism for dealing with parallel processing of Supply messages. However the semantics of such, and how they play into the supply-block design goals and safety properties, represent significant design challenges.

Broil answered 28/3, 2018 at 21:8 Comment(5)
Do those rules only describe the live taps? They don't seem to hold for the on demand ones.Broker
Yes, they hold there too. Example that you think shows otherwise?Broil
My first example shows that. There are 25 tap blocks run even though the first one sleeps for 5 seconds while the program only runs for 5 seconds.Broker
Seems to be an (undesirable) property specific to Supply.interval, though it is a least honest about it. Running perl6 -e 'say Supply.interval(0.2).serial' gives False. Every other on-demand Supply I've checked gives True. Note that react whenever Supply.interval(0.2) { .say; sleep 5 } does indeed behave as I described, however, since whenever calls serialize on its target before tapping (yet another reason not to use .tap directly!) I'll tweak interval to not let a non-serial Supply escape, anyway.Broil
Supply.interval made consistent with other Supply factories in github.com/rakudo/rakudo/commit/7572983a41. Thanks for pointing it out.Broil
P
6

The taps of a Supply are run sequentially within a single thread. So the code of the second tap will only be run after the first tap (which sleeps for 5 seconds). This shows in the following code:

my $supply = Supply.interval(0.2).share;
my $tap  = $supply.tap: { say "1. $^a in #{+$*THREAD}" };
my $tap2 = $supply.tap: { say "2. $^a in #{+$*THREAD}" };
sleep 0.5;
===================
1. 1 in #4
2. 1 in #4
1. 2 in #4
2. 2 in #4

So the answer is currently: no

Parturifacient answered 28/3, 2018 at 20:25 Comment(2)
This is for live supplies only, right? I don't see same behavior in on-demand taps.Broker
Looks like, but it seems there's some strange stuff going with a shared on-demand tap.Parturifacient

© 2022 - 2024 — McMap. All rights reserved.