The basic rules for Supply
are:
- No introduction of concurrency without it being explicitly asked for
- Back-pressure through a sender-pays model
- A message is processed in full before the next one (so
.map({ ...something with state... })
can be trusted not to cause conflicts over the state)
Rule 3 doesn't really apply to share
since there's separate downstream operation chains after that point, but rules 1 and 2 do. The purpose of share
is to allow publish/subscribe, and also to provide for re-use of a chunk of processing by multiple downstream message processors. Introducing parallel message processing is a separate concern from this.
The are various options. One is to have the messages for parallel processing stuck into a Channel
. This explicitly introduces a place for the messages to be buffered (well, until you run out of memory...which is exactly why Supply
comes with a sender-pays back-pressure model). Coercing a Channel
back into a Supply
gets the values pulled from the Channel
and emitted on that Supply
on a pool thread. That way looks like:
my $supply = Supply.interval(0.2).share;
my $tap = $supply.Channel.Supply.tap: { say "1. $^a"; sleep 5 };
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Note that since whenever
automatically coerces the thing it's asked to react to to a Supply
, then that'd look like whenever $supply.Channel { }
, which makes it a pretty short solution - but at the same time nicely explicit in that it indicates how the normal back-pressure mechanism is being side-stepped. The other property of this solution is that it retains the order of the messages and still gives one-at-a-time processing downstream of the Channel
.
The alternative is to react to each message by instead starting some asynchronous piece of work to handle it. The start
operation on a Supply
schedules the block it is passed to run on the thread pool for each message that is received, thus not blocking the arrival of the next message. The result is a Supply
of Supply
. This forces one to tap each inner Supply
to actually make anything happen, which seems slightly counter-intuitive at first, but actually is for the good of the programmer: it makes it clear there's an extra bit of async work to keep track of. I very strongly suggest using this in combination with the react
/whenever
syntax, which does subscription management and error propagation automatically. The most direct transformation of the code in the question is:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply.start({ say "1. $^a"; sleep 5 }) { whenever $_ {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Although it's also possible to instead write it as:
my $supply = Supply.interval(0.2).share;
my $tap = supply { whenever $supply -> $a { whenever start { say "1. $a"; sleep 5 } {} } }.tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
Which points to the possibility writing a parallelize
Supply
combinator:
my $supply = Supply.interval(0.2).share;
my $tap = parallelize($supply, { say "1. $^a"; sleep 5 }).tap;
my $tap2 = $supply.tap: { say "2. $^a"; };
sleep 5;
sub parallelize(Supply $messages, &operation) {
supply {
whenever $messages -> $value {
whenever start operation($value) {
emit $_;
}
}
}
}
The output of this approach is rather different from the Channel
one, since the operations are all kicked off as soon as the message comes in. Also it doesn't retain message order. There's still an implicit queue (unlike the explicit one with the Channel
approach), it's just that now it's the thread pool scheduler's work queue and the OS scheduler that has to keep track of the in-progress work. And again, there's no back-pressure, but notice that it would be entirely possible to implement that by keeping track of outstanding Promises
and blocking further incoming messages with an await Promise.anyof(@outstanding)
.
Finally, I'll note that there is some consideration of hyper whenever
and race whenever
constructs to provide some language-level mechanism for dealing with parallel processing of Supply
messages. However the semantics of such, and how they play into the supply
-block design goals and safety properties, represent significant design challenges.