Limit number of events per time period
Asked Answered
C

5

12

I need to cap the number of events n permitted during a time period deltaT. Any approach I can think of, space is O(m), where m is the maximum number of eventrequests sent per deltaT, or O(deltaT/r), where r is an acceptable resolution.

Edit: deltaT is a sliding time window relative to the timestamp.

For instance: Keep a circular buffer of event timestamps. On event crop all earlier timestamps than t-deltaT. Deny event if the number of timestamps exceeds n. Add timestamp to the buffer.

Or, init a circular bucket buffer of integers of size deltaT/r indexed by time relative to the current with resolution r. Maintain pointer i. On event, increment i by time since last event divided by r. Zero the buffer between the original i and the new one. Increment at i. Deny, if the sum of the bugger exceeds n.

What's a better way?


I just implemented my second suggestion above in c# with a fixed deltaT of 1 s and a fixed resolution of 10 ms.

public class EventCap
{
    private const int RES = 10; //resolution in ms

    private int _max;
    private readonly int[] _tsBuffer;
    private int p = 0;
    private DateTime? _lastEventTime;
    private int _length = 1000 / RES;

    public EventCap(int max)
    {
        _max = max;

        _tsBuffer = new int[_length];
    }

    public EventCap()
    {
    }

    public bool Request(DateTime timeStamp)
    {
        if (_max <= 0)
            return true;

        if (!_lastEventTime.HasValue)
        {
            _lastEventTime = timeStamp;
            _tsBuffer[0] = 1;
            return true;
        }

        //A
        //Mutually redundant with B
        if (timeStamp - _lastEventTime >= TimeSpan.FromSeconds(1))
        {
            _lastEventTime = timeStamp;
            Array.Clear(_tsBuffer, 0, _length);
            _tsBuffer[0] = 1;
            p = 0;
            return true;
        }

        var newP = (timeStamp - _lastEventTime.Value).Milliseconds / RES + p;

        if (newP < _length)
            Array.Clear(_tsBuffer, p + 1, newP - p);

        else if (newP > p + _length)
        {
            //B
            //Mutually redundant with A
            Array.Clear(_tsBuffer, 0, _length);
        }
        else
        {
            Array.Clear(_tsBuffer, p + 1, _length - p - 1);
            Array.Clear(_tsBuffer, 0, newP % _length);
        }

        p = newP % _length;
        _tsBuffer[p]++;
        _lastEventTime = timeStamp;

        var sum = _tsBuffer.Sum();

        return sum <= 10;
    }
}
Chino answered 21/9, 2012 at 12:3 Comment(3)
i want that too..thanks for asking :)Gernhard
is deltaT a sliding window? meaning for every chosen deltaT you want a max of n events, or the deltaT is one after the other.Bes
yes, sliding. @FUD, till we get an answer, check below, might work for you.Chino
B
15

What about having these variables: num_events_allowed, time_before, time_now, time_passed

At init time you will do: time_before = system.timer(), num_events_allowed = n

When an event is received you do the following:

  time_now = system.timer()
  time_passed = time_now - time_before
  time_before = time_now

  num_events_allowed += time_passed * (n / deltaT);

  if num_events_allowed > n 
      num_events_allowed = n

  if num_events_allowed >= 1
      let event through, num_events_allowed -= 1
  else
      ignore event

Whats nice about this algorithm is the num_events_allowed is actually incremented by the time that has passed since the last event and the rate of which events can be received, that way you get an incrementation of the number of events you can send per that time_passed in order to stay in the limit of n. So if you get an event too soon, you will increment it by less than 1, if its after too much time you will increment it by more than one. Of course if the event goes through you decrement the allowance by 1 as you just got an event. If the allowance passes the max events which is n , you return it back to n as you cant allow more than n in any time phase. If the allowance is less than 1, you cant send a whole event, dont let it through!

This is the leaky bucket algorithm: https://en.wikipedia.org/wiki/Leaky_bucket

Bes answered 21/9, 2012 at 14:2 Comment(7)
In practice it'll work great, for most cases. It will be a bit too strict on bursty eventuell won't it?Chino
Hey @Martin, Well if you want exactly max of n events then it will be a good algorithm, if you want approximately n then it would be too strict as you could probably find something more loose, like having a function check periodically and not for every incoming event. But overall its good because its done in O(1) time and O(1) space. It is only important to keep the function synchronised because if 2 events accidentally enter at the same time the time_passed and num_events_allowed could be used by both events and cause problemsBes
Found that I had solved this in almost exactly this way, two years ago. I was smarter then. :(Chino
Hey @Martin, no worries, I forget how to do stuff all the time, so I go and look at stuff I did a few years back for reference, happens to all of us :)Bes
I know this is old but I just came across it and it met my needs, but with one exception. The var num_events_allowed should be initialized to 0, or you will exceed your rate by one time period. That is if you run for 10 sec at a rate of 10 per sec you will run 110 unless it is initialized to 0. The value must be allowed to accumulate. Also num_allowed_events > n needs to follow num_allowed_event >= 1 to support fractional values of n.Jacky
Even if you initalize it with 0, doing nothing for a period will cause it to go to 100. Then in a short burst use all 100 and moments after it allows use another few. there you go.. Over the limit in one period. This answer is bad.Heighttopaper
@Heighttopaper brilliant point. The only way I see to fix this is to use n/2 instead of n. Then it's a good approximate algorithm. It only lets you burst n/2 at a time at most but you guarantee the window will only ever allow n events in the worst case.Caustic
P
3

One way to keep the sliding window and still have it O(1) + very small O(n) for each incoming request is to make a suitable sized array of ints and keep it as a circular buffer and discretize incoming requests (the requests as integrated as with the sampled levels as in a A/D-converter, or as a histogram if you are a statistican) and keep track of the sum of the circular buffer, like this

assumptions: 
"there can be no more than 1000 request per minute" and 
"we discretize on every second"

int[] buffer = new zeroed int-array with 60 zeroes
int request-integrator = 0 (transactional)
int discretizer-integrator = 0 (transactional)

for each request:
    check if request-integrator < 1000 then
         // the following incs could be placed outside 
         // the if statement for saturation on to many
         // requests (punishment)
         request-integrator++                     // important
         discretizer-integrator++
         proceed with request

once every second:                    // in a transactional memory transaction, for God's saké 
    buffer-index++
    if (buffer-index = 60) then buffer-index=0    // for that circular buffer feeling!
    request-integrator -= buffer[buffer-index]    // clean for things happening one minute ago
    buffer[buffer-index] = discretizer-integrator // save this times value
    discretizer-integrator = 0                    // resetting for next sampling period

Note that the increase of the request-integrator "could" be done just once every second, but that leaves a hole open for saturating it with 1000 requests or worse in one second about once every minute depending on punishment behaviour.

Peirce answered 26/9, 2012 at 22:37 Comment(3)
and! you could of course have a second-based limit as well (ie 50 req/s), just add an or-block in the if-statement for request.Peirce
Interesting, it's a combination of my code above (discretization) and the token buffer suggested by FUD, isn't it?Chino
I guess, this one is inspired by the implementation of a digital low pass filter. I also guess you could make a 1/dT-summation as Yameo above, which is my solution but for events happening at special times - that one is actually the best, since it's smaller and as exact. Head for Yameos solution, mine is less elegant.Peirce
G
2

While reading about the various possible solutions to the problem. I came across token bucket algorithm ( http://en.wikipedia.org/wiki/Token_bucket ). If i understand your question completely you can implement a token bucket algorithm without actually having a bucket with N tokens by instead taking an counter which can be incremented and decremented accordingly. like

syncronized def get_token = 
    if count>0 
       { --count, return true }
    else return false

syncronized def add_token = 
    if count==N
       return;
    else ++count

Now the remaining part is to call the add_token in deltaT/r time repetadly.

To make it completely threadsafe we would need an atomic reference to count. But the above code is to show basic idea of doing it in O(1) memory.

Gernhard answered 21/9, 2012 at 14:19 Comment(1)
From what I can tell this is the same idea as Yarneo's except his implementation adds the appropriate number of tokens on event.Chino
S
1

I wrote the class below (ActionQueue) to limit the frequency of function calls. One of the nice things is that it uses a timer to pop things off the queue... so the CPU is used minimally (or even not at all, if the queue is empty)... as opposed to any polling type of technique.

Example...

    // limit to two call every five seconds
    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(5), 2);
    public void Test()
    {
        for (var i = 0; i < 10; i++)
        {
            _actionQueue.Enqueue((i2) =>
            {
                Console.WriteLineAction " + i2 + ": " + DateTime.UtcNow);
            }, i);
        }
    }

Real world example...

    ActionQueue _actionQueue = new ActionQueue(TimeSpan.FromSeconds(1), 10);

    public override void SendOrderCancelRequest(Order order, SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var parms = (Tuple<Order, SessionID>)state;
            base.SendOrderCancelRequest(parms.Item1, parms.Item2);
        }, new Tuple<Order, SessionID>(order, sessionID));
    }
    public override void SendOrderMassStatusRequest(SessionID sessionID)
    {
        _actionQueue.Enqueue((state) =>
        {
            var sessionID2 = (SessionID)state;
            base.SendOrderMassStatusRequest(sessionID2);
        }, sessionID);
    }

The actual class...

public class ActionQueue
{
    private class ActionState
    {
        public Action<object> Action;
        public object State;
        public ActionState(Action<object> action, object state)
        {
            Action = action;
            State = state;
        }
    }
    Queue<ActionState> _actions = new Queue<ActionState>();
    Queue<DateTime> _times = new Queue<DateTime>();

    TimeSpan _timeSpan;
    int _maxActions;
    public ActionQueue(TimeSpan timeSpan, int maxActions)
    {
        _timeSpan = timeSpan;
        _maxActions = maxActions;           
    }
    public void Enqueue(Action<object> action, object state)
    {
        lock (_times)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);

            if (_times.Count <= _maxActions)
                action(state);
            else
                _actions.Enqueue(new ActionState(action, state));

            CreateDequeueTimerIfNeeded();
        }
    }

    System.Threading.Timer _dequeueTimer;
    protected void CreateDequeueTimerIfNeeded()
    {
        // if we have no timer and we do have times, create a timer
        if (_dequeueTimer == null && _times.Count > 0) 
        {
            var timeSpan = _times.Peek() - DateTime.UtcNow;
            if (timeSpan.TotalSeconds <= 0)
            {
                HandleTimesQueueChange();
            }
            else
            {
                _dequeueTimer = new System.Threading.Timer((obj) =>
                {
                    lock (_times)
                    {
                        _dequeueTimer = null;
                        HandleTimesQueueChange();
                    }
                }, null, timeSpan, System.Threading.Timeout.InfiniteTimeSpan);
            }
        }
    }

    private void HandleTimesQueueChange()
    {
        _times.Dequeue();
        while (_times.Count > 0 && _times.Peek() < DateTime.UtcNow)
            _times.Dequeue();

        while (_actions.Count > 0 && _times.Count < _maxActions)
        {
            _times.Enqueue(DateTime.UtcNow + _timeSpan);
            var actionState = _actions.Dequeue();
            actionState.Action(actionState.State);
        }

        CreateDequeueTimerIfNeeded();
    }
}
Samarasamarang answered 28/12, 2016 at 22:50 Comment(0)
H
1

As the output rate must be capped to n events per deltaT, the appropriate solution is to use the window rate limiter algorithm.

The memory usage requirement is O(n), not O(m) as you assumed, where m is the maximum number of input events in the deltaT time interval. This is because we need to cap the output rate. We thus need only to store the output event stamps for rate measuring, not input events. Note that n <= m is better than your first proposal and more convenient than to guess the value r.

The algorithm is trivial and as you described.

Use a circular FIFO queue with a capacity of at most n time stamps.

For each incoming event, get the current time (now) and compute the corresponding lower time boundary (now-deltaT). Remove all stamps older than the lower time boundary if any from the end of the queue. If the queue is still full, discard the event. Otherwise append the current time (now) to the queue and accept the event.

The processing time is O(1) per event on average, but O(n) in memory storage.

The leaky bucket or token bucket algorithms don't have the O(n) memory usage penalty but they do have the property that the rate may be occasionally higher than n/deltatT. Note that there is an upper limit determined by the bucket capacity. Based on my tests, it never exceeded n/deltaT + bucket_size. This might be OK for some applications, but it doesn't satisfy the requirement of the question.

This algorithm is also better than the queued leaky bucket which introduces latency that might be undesirable in some applications, although it satisfies the requirement given in the question.

EDIT: after more extensive testing of the algorithm, it is confirmed that the rate requirement is satisfied. Unfortunately, it also creates burst of accepted events which may be undesirable in some applications. These bursts are jitter.

These bursts appear because it initially fills in the buffer at the rate of event arrival, then it waits until the rate constrain is matched by discarding all successive events. Once deltaT is reached, it replaces all entries at event arrival rate, and wait again until deltaT is reached.

Hackle answered 14/12, 2023 at 13:2 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.