Throttling Azure Storage Queue processing in Azure Function App
Asked Answered
P

4

36

I have created an Azure Function app with an Azure Storage Queue trigger that processes a queue in which each queue item is a URL. The Function just downloads the content of the URL. I have another function that loads and parses a site's XML Sitemap and adds all the page URLs to the queue. The problem I have is that the Functions app runs too quickly and it hammers the website so it starts returning Server Errors. Is there a way to limit/throttle the speed at which the Functions app runs?

I could, of course, write a simple web job that processed them serially (or with some async but limit the number of concurrent requests), but I really like the simplicity of Azure Functions and wanted to try out "serverless" computing.

Prophase answered 17/10, 2016 at 19:18 Comment(0)
M
50

There are a few options you can consider.

First, there are some knobs that you can configure in host.json that control queue processing (documented here). The queues.batchSize knob is how many queue messages are fetched at a time. If set to 1, the runtime would fetch 1 message at a time, and only fetch the next when processing for that message is complete. This could give you some level of serialization on a single instance.

Another option might be for you to set the NextVisibleTime on the messages you enqueue in such a way that they are spaced out - by default messages that are enqueued become visible and ready for processing immediately.

A final option might be be for you to enqueue a message with the collection of all URLs for a site, rather than one at a time, so when the message is processed, you can process the URLs serially in your function, and limit the parallelism that way.

Meanly answered 17/10, 2016 at 21:52 Comment(6)
Thanks @mathewc. I went with your second suggestion (set NextVisibleTime). It took me a bit to figure out how to make that happen but it's working well. My code is at github.com/alindgren/SiteWarmer in case someone wants to take a peek.Prophase
Is queues.batchSize the number of queue messages fetches at once total? Or just per instance of that function?Alligator
BatchSize is the number of messages fetched on each iteration of our queue polling loop. I.e. it's the number we pass to the Azure Queues GetMessagesAsync call. This polling loop is specific to a function - each queue function has its own.Meanly
host.json documentation has changed to here: learn.microsoft.com/en-us/azure/azure-functions/…Irritative
Hi @mathewc, what would you recommend with Event Hub bindings? Using IoT->eHub->Func. I have historic data feed killing my connection count. I could IoT->sBus->Func with max batch=1, but wouldn't this trigger new instances? So max of 200 func instances?Chondro
Please note that batch size applies to each instance of an azure function and not the function app. This means that an azf could scale to n instances and you'd have n messages pulled off the queue at once which doesn't satisfy serial precessing. Please note @jondow's answer below for more control with out of the box settings available. Another option would be to use sessions to throttle.Guardado
N
12

NextVisibleTime can get messy if there are several parallel functions adding to the queue. Another simple option for anyone having this problem: Create another queue, "throttled-items", and have your original function follow it for the queue triggers. Then, add a simple timer function that moves messages from the original queue every minute, spacing the NextVisibleTime accordingly.

    [FunctionName("ThrottleQueueItems")]
    public static async Task Run([TimerTrigger("0 * * * * *")] TimerInfo timer, ILogger logger)
    {
        var originalQueue = // get original queue here;
        var throttledQueue = // get throttled queue here;
        var itemsPerMinute = 60; // get from app settings
        var individualDelay = 60.0 / itemsPerMinute;
        var totalRetrieved = 0;
        var maxItemsInBatch = 32; // change if you modify the default queue config
        do
        {
            var pending = (await originalQueue.GetMessagesAsync(Math.Min(maxItemsInBatch, itemsPerMinute - totalRetrieved))).ToArray();
            if (!pending.Any())
                break;
            foreach (var message in pending)
            {
                await throttledQueue.AddMessageAsync(new CloudQueueMessage(message.AsString), null,
                                                                                        TimeSpan.FromSeconds(individualDelay * ++totalRetrieved), null, null);
                await originalQueue.DeleteMessageAsync(message);
            }
        } while (itemsPerMinute > totalRetrieved);
    }
Nims answered 21/4, 2019 at 1:46 Comment(0)
W
7

I found this post when trying to solve a similar problem. This might be useful to anyone that arrives here. You can now limit the number of concurrent instances of the function using the WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT app setting. Setting this to 1 combined with a batch limit of 1 would allow you perform serial processing of a queue.

WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT

The maximum number of instances that the function app can scale out to. Default is no limit.

https://learn.microsoft.com/en-gb/azure/azure-functions/functions-app-settings#website_max_dynamic_application_scale_out

Whittier answered 6/2, 2020 at 9:12 Comment(1)
That's useful. Note: per documentation, at the moment the setting is in preview and the recommended way is to update the functionAppScaleLimit app property: learn.microsoft.com/en-gb/azure/azure-functions/…Manyplies
A
0

With ThrottlingTroll this can be achieved as follows.

  1. Configure your HttpClient like this:
static HttpClient myThrottledHttpClient = new HttpClient
(
    new ThrottlingTrollHandler
    (
        // Consider using RedisCounterStore instead, so that rate counters are stored in a distributed cache 
        counterStore: new MemoryCacheCounterStore(),

        config: new ThrottlingTrollEgressConfig
        {
            Rules = new[]
            {
                new ThrottlingTrollRule
                {
                    // No more than 10 requests per second
                    LimitMethod = new FixedWindowRateLimitMethod
                    {
                        PermitLimit = 10,
                        IntervalInSeconds = 1,
                    }
                },
            }
        }
    )
);
  1. Then use it like that:
[Function("QueueTrigger")]
[QueueOutput(QueueName)] 
public async Task<string> Run([QueueTrigger(QueueName)] string msg)
{
    // Making the call via an HttpClient scaffolded with ThrottlingTroll
    var response = await myThrottledHttpClient.GetAsync("https://my-http-endpoint");

    // If request rate limit is exceeded, myThrottledHttpClient will return this status by itself, _without_ making the actual call.
    if (response.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
    {
        // Just to reduce the load to the queue
        await Task.Delay(1000);

        // Re-queueing the same message
        return msg;
    }

    // Do whatever else needed with message and response

    return null;
}

That ThrottlingTroll-equipped HttpClient will limit itself. If the limit is exceeded, it will return 429 TooManyRequests without making the actual call. When that happens, we just put the message back to the same queue.

Assuming your Function has multiple instances, consider using RedisCounterStore to maintain rate limit across all of them.

Antimacassar answered 15/6, 2023 at 13:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.