API Rate Limiter Intermittent Hanging
Asked Answered
G

1

6

I wrote a simple (I thought...) rate limiter to keep an event driven system under our licensed API hit limits. For some reason it seizes up sometimes after 400-500 requests are sent through.

My best idea is that I have screwed up the wait function so in some circumstances it never returns, but I have been unable to locate the flawed logic. Another idea is that I have botched the Async/Task interop causing issues. It always works initially and then later. A single instance of ApiRateLimiter is being shared across several components in order to honor hit limits system wide.

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>

type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =

let requestLimit = Math.Max(limitCount,1)

let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox -> 

    let rec waitUntilUnderLimit (recentRequestsTimeSent: seq<DateTimeOffset>) = async{
        let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
        let requestsWithinLimit = 
            recentRequestsTimeSent 
            |> Seq.filter(fun x -> x >= cutoffTime)
            |> Seq.toList

        if requestsWithinLimit.Length >= requestLimit then
            let! _ = Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
            return! waitUntilUnderLimit requestsWithinLimit
        else
            return requestsWithinLimit
    }

    let rec messageLoop (mostRecentRequestsTimeSent: seq<DateTimeOffset>) = async{
        // read a message
        let! keyedRequest,replyChannel = inbox.Receive()
        // wait until we are under our rate limit
        let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent

        let rightNow = DateTimeOffset.UtcNow

        let! response =
            keyedRequest.Request
            |> httpClient.SendAsync
            |> Async.AwaitTask

        replyChannel.Reply { Key = keyedRequest.Key; Response = response }

        return! messageLoop (seq {
            yield rightNow
            yield! remainingRecentRequests
        })
    }

    // start the loop
    messageLoop (Seq.empty<DateTimeOffset>)
)            

member this.QueueApiRequest keyedRequest =
    async {
        return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
    } |> Async.StartAsTask

Some of the requests are large and take a little bit of time, but nothing that could cause the total death of request sending that I am seeing with this thing.

Thanks for taking a second to look!

Graphophone answered 2/4, 2018 at 22:7 Comment(0)
P
3

I notice that you're building up a list of the most recent times a request was sent by using a seq:

seq {
    yield rightNow
    yield! remainingRecentRequests
}

Because F# sequences are lazy, this produces an enumerator that, when asked for its next value, will first yield one value, and then will start iterating through its child seq and yielding a value. Every time you yield a new request, you add a new enumerator -- but when are the old ones disposed? You'd think they would be disposed once they expire, that is, once the Seq.filter in waitUntilUnderLimit returns false. But think about it: how would the F# compiler know that the filter condition will always be false once it has been false once? Without a deep code analysis (which the compiler doesn't do), it can't. So the "old" seqs can never be garbage collected, because they're still being kept around in case they're ever needed. I'm not 100% certain of this because I haven't measured the memory usage of your code, but if you were to measure memory use of your ApiRateLimiter instance I bet you'd see it growing steadily without ever going down.

I also noticed that you are adding the new items on the front of the seq. This is exactly the same semantics that an F# list would use, but with a list, there are no IEnumerable objects to allocate, and once a list item fails the List.filter condition, it will be disposed of. I therefore rewrote your code to use a list of recent times instead of a seq, and I made one other change for efficiency: since the way you create your list guarantees that it will be sorted, with the most recent events first and the oldest last, I replaced List.filter with List.takeWhile. That way the minute the first date is older than the cutoff, it will stop checking older dates.

With this change, you should now have old dates actually expiring, and the memory usage of your ApiRateLimiter class should fluctuate but remain constant-ish. (It will be creating new lists every time waitUntilUnderLimit is called, so it will create some GC pressure, but those should all be in generation 0). I don't know if this will solve your hanging problem or not, but this is the only problem I could see in your code.

BTW, I also replaced your line let! _ = Async.Sleep 100 with do! Async.Sleep 100, which is simpler. No efficiency gains here, but there's no need to use let! _ = to wait for an Async<unit> to return; that's precisely what the do! keyword is for.

type RequestWithReplyChannel = RequestWithKey * AsyncReplyChannel<ResponseWithKey>

type public ApiRateLimiter(httpClient: HttpClient, limitTimePeriod: TimeSpan, limitCount: int) =

    let requestLimit = Math.Max(limitCount,1)

    let agent = MailboxProcessor<RequestWithReplyChannel>.Start(fun inbox -> 

        let rec waitUntilUnderLimit (recentRequestsTimeSent: DateTimeOffset list) = async{
            let cutoffTime = DateTimeOffset.UtcNow.Subtract limitTimePeriod
            let requestsWithinLimit = 
                recentRequestsTimeSent 
                |> List.takeWhile (fun x -> x >= cutoffTime)

            if List.length requestsWithinLimit >= requestLimit then
                do! Async.Sleep 100 //sleep for 100 milliseconds and check request limit again
                return! waitUntilUnderLimit requestsWithinLimit
            else
                return requestsWithinLimit
        }

        let rec messageLoop (mostRecentRequestsTimeSent: DateTimeOffset list) = async{
            // read a message
            let! keyedRequest,replyChannel = inbox.Receive()
            // wait until we are under our rate limit
            let! remainingRecentRequests = waitUntilUnderLimit mostRecentRequestsTimeSent

            let rightNow = DateTimeOffset.UtcNow

            let! response =
                keyedRequest.Request
                |> httpClient.SendAsync
                |> Async.AwaitTask

            replyChannel.Reply { Key = keyedRequest.Key; Response = response }

            return! messageLoop (rightNow :: remainingRecentRequests)
        }

        // start the loop
        messageLoop []
    )            

    member this.QueueApiRequest keyedRequest =
        async {
            return! agent.PostAndAsyncReply(fun replyChannel -> (keyedRequest,replyChannel))
        } |> Async.StartAsTask
Phyllode answered 3/4, 2018 at 4:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.