How can you write a supply with a dynamic throttle?
Asked Answered
A

2

6

For a chat bot I'm refactoring to not require locks for managing most of its state, the website it connects to via websocket throttles messages that can be received from regular users to a rate of 0.6s and voiced users to a rate of 0.3s, while administrators have no throttle. Whether or not a user is voiced or an administrator isn't known until some point well after the connection gets made; before then, everyone is considered a regular user.

Currently, I handle throttled messages by putting the react block for listening for messages in a loop that exits once the connection has been forcibly closed. When the throttle gets updated, I call done to enter the next iteration, which updates the whenever block for the supply for messages to send to have the updated throttle. This is terrible, racy code!

What can I do to (a) ensure the connection starts out with a 0.3s throttle that can be used immediately after a websocket connection gets made, (b) make it possible to call a method that updates this throttle when needed, and (c) not keep any state related to this throttle (which can be inferred by other means)?

Edit: I forgot to mention this earlier, but there are unthrottled messages for all types of users, not just administrators.

Aryanize answered 22/1, 2020 at 19:14 Comment(0)
A
6

I found a way to do this using a combination of Supply.migrate, Supply.map, and Supply.merge. If I create suppliers for throttled messages, unthrottled messages, and throttle updates, I can map over the throttle updates supply with throttled message supplies throttled using any throttles emitted, call Supply.migrate on the resulting supply, and merge it with the unthrottled messages supply. This results in one supply that I can use to handle sending all types of messages:

react {
    my Supplier:D $unthrottled .= new;
    my Supplier:D $throttled   .= new;
    my Supplier:D $throttles   .= new;

    whenever Supply.merge(
        $unthrottled.Supply,
        $throttles.Supply.map({ $throttled.Supply.throttle: 1, $_ }).migrate,
    ) -> Str:D $message {
        say sprintf '%s @ %f', $message, now;
        done if ++$ == 12;
    }

    $throttles.emit: 1.2;
    $throttled.emit: "$_" for 1..3;
    whenever Promise.in(1) {
        $unthrottled.emit: "$_" for 7..9;
    }
    whenever Promise.in(5) {
        $throttles.emit: 0.6;
        $throttled.emit: "$_" for 4..6;
        whenever Promise.in(1) {
            $unthrottled.emit: "$_" for 10..12;
        }
    }
}

# OUTPUT:
# 1 @ 1579731916.713831
# 7 @ 1579731917.764047
# 8 @ 1579731917.769012
# 9 @ 1579731917.774584
# 2 @ 1579731917.913512
# 3 @ 1579731919.123057
# 4 @ 1579731921.749377
# 5 @ 1579731922.353073
# 10 @ 1579731922.768212
# 11 @ 1579731922.773777
# 12 @ 1579731922.780446
# 6 @ 1579731922.963087
Aryanize answered 22/1, 2020 at 22:29 Comment(0)
H
1

Interesting. I gave it a go and from what I read in the documentation, which is sparse, something like the following should work:

my $s = Supply.from-list(^Inf);
my $control = Supplier.new.Supply;
my $throttle = $s.throttle( 10, 1, :$control );

react 
{
  whenever $throttle -> $n 
  { 
    $n.say 
  };

  # change speed every 5 seconds
  whenever Supply.interval(5) -> $x
  {
    "limit: { (1..10).pick }".say;
    $control.emit( "limit: { (1..10).roll }" );
  }
}

But it doesn't. The program freezes when it hits $control.emit( ... ). If you comment that out, it runs as expected. Relevant doc parts of the throttle method:

Produces a Supply from a given Supply, but makes sure the number of messages passed through, is limited.

[ ... ]

If the second positional parameter is a numeric value, it is interpreted as the time-unit (in seconds). If you specify .1 as the value, then it makes sure you don't exceed the limit for every tenth of a second.

[ ... ]

The :control named parameter optionally specifies a Supply that you can use to control the throttle while it is in operation. Messages that can be sent, are strings in the form of "key:value".

[ ... ]

These messages can be sent to the :control Supply. A control message consists of a string of the form "key: value", e.g. "limit: 4".

Hoffer answered 22/1, 2020 at 20:11 Comment(1)
Looking over the code for Supply.throttle, emitting "limit:..." to the control supply changes how many messages can be passed through at a time, not the interval.Aryanize

© 2022 - 2024 — McMap. All rights reserved.