Rate limiting core.async channels in Clojure
Asked Answered
P

4

19

I'm using Clojure with core.async, and have a situation where I want to put a rate limit on the number of messages processed through a channel.

In particular I would like to:

  • Define a rate limit e.g. 1,000 messages per second
  • Handle messages normally (and promptly) as long as the number of messages is less than the rate limit
  • Have some kind of sensible alternative handling of events if the rate limit is exceeded (e.g. telling a client to try again later)
  • Have reasonably low overhead

What's the best way to achieve this?

Photoneutron answered 20/2, 2014 at 7:30 Comment(0)
M
17

Problem breakdown:

  1. Define a rate limit e.g. 1,000 messages per second
  2. Handle messages normally (and promptly) as long as the number of messages is less than the rate limit
  3. Have some kind of sensible alternative handling of events if the rate limit is exceeded (e.g. telling a client to try again later)
  4. Have reasonably low overhead

I'm approaching the problem with a solution that simply composes channels in loops.

A common rate limiting algorithm is called Token bucket. You have a fixed-size bucket of tokens and you add tokens at a fixed rate. As long as you have a token, you can send a message.

The size of the bucket determines the "burstiness" (how fast can you catch up to the maximum rate), and the rate determines the maximum average rate. These will be parameters to our code.

Let's make a channel that sends a message (doesn't matter what) at a given rate. (# 1)

(defn rate-chan [burstiness rate]
  (let [c (chan burstiness) ;; bucket size is buffer size
        delta (/ 1000 rate)]
    (go
      (while true
        (>! c :go) ;; send a token, will block if bucket is full
        (<! (timeout delta)))) ;; wait a little
    c))

Now we want a channel that limits another channel by rate. (# 2)

(defn limit-chan [in rc]
  (let [c (chan)]
    (go 
      (while true
        (<! rc) ;; wait for token
        (>! c (<! in)))) ;; pass message along
    c))

Now we can use these channels with a default if there's no message waiting:

(defn chan-with-default [in]
  (let [c (chan)]
    (go
      (while true
        ;; take from in, or if not available, pass useful message
        (>! c (alts! [in] :default :rate-exceeded))))
    c))

Now we have all of the pieces to solve the problem.

(def rchan (-> (chan)
               (limit-chan (rate-chan 100 1000))
               (chan-with-default)))

As far as #4 goes, this is not the absolute fastest solution. But it's one that uses composable parts and will probably be fast enough. If you want it faster, you could make one loop to do all of this (instead of decomposing it into smaller functions). The fastest would be to implement the interfaces yourself.

Mazard answered 29/4, 2014 at 14:1 Comment(1)
If there's nothing going into rchan, isn't it going to just keep emitting :rate-exceeded? Also, isn't requirement #3 about backpressure, letting the producer know about limiting, rather than letting the channel consumer know about it?Goa
M
9

I wrote a little library to solve just this problem. Its implementation is eerily similar to Eric Normand's, but with some measures for high throughput channels (timeout is not precise for near-millisecond sleep times).

It also supports throttling a group of channels globablly, and function throttling.

Check it out here.

Microgamete answered 29/4, 2014 at 15:18 Comment(0)
D
7

Here's one way using an atom to count how many messages are being sent and reseting it to zero periodically:

(def counter (atom 0))

(def time-period 1000) ;milliseconds

(def max-rate 1000) ;max number of messages per time-period

(def ch (chan))

(defn alert-client []
  (println "That's enough!"))

(go (while true (<! (timeout time-period)) (reset! counter 0))) ; reset counter periodically 

(defn process [msg]
  (if (> (swap! counter inc) max-rate) (alert-client) (put! ch msg)))

(doseq [x (range 1001)] (process x)) ; throw some messages at the channel

You'll need to have some more code to consume messages from the channel. If you're not sure that you'll be able to consistently consume messages at the rate you're throttling them at, you will probably want to specify the channel buffer size or channel type (dropping/sliding).

Downhill answered 20/2, 2014 at 10:36 Comment(0)
C
4

What you're looking for is known as a Circuit Breaker. I think the Wikipedia page is rather poor description:

http://en.wikipedia.org/wiki/Circuit_breaker_design_pattern

Though, our Scala friends have done absolutely fantastic:

http://doc.akka.io/docs/akka/2.2.3/common/circuitbreaker.html

There is also a clojure library, but you'll have to do the integration with core.async yourself:

https://github.com/krukow/clojure-circuit-breaker

https://github.com/josephwilk/circuit-breaker

A blog post about circuit breakers and scaling with clojure:

http://blog.josephwilk.net/clojure/building-clojure-services-at-scale.html

It looks like you may want to consider something like netflix Hystrix which provides clojure bindings:

https://github.com/Netflix/Hystrix/tree/master/hystrix-contrib/hystrix-clj

HTH

Clipboard answered 20/2, 2014 at 14:33 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.