Reactive SQS Poller with backpressue
Asked Answered
D

2

6

Trying to create SQS Poller which:

  • Do exponential polls (to reduce number of request if there is no messages present in the queue)
  • Query SQS more often if there are a lot of messages in queue
  • Have backpressure if certain number of messages are received, it stops polling
  • Not to be throttled by AWS API rate limit

As an example I'm using this JavaRx implementation which is easily transformed to Project Reactor and enrich it with backpressure.

private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;

public Flux<Message> pollMessages(GetQueueUrlResult q)
{
    return Flux.create(sink -> {
        long backoff = DEFAULT_BACKOFF;

        while (!stopRequested)
        {
            if (sink.isCancelled())
            {
                sink.error(new RuntimeException("Stop requested"));
                break;
            }

            Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
                    ? amazonSQS.receiveMessageAsync(createRequest(q))
                    : completedFuture(new ReceiveMessageResult());

            try
            {
                ReceiveMessageResult result = future.get();

                if (result != null && !result.getMessages().isEmpty())
                {
                    backoff = DEFAULT_BACKOFF;

                    LOGGER.info("New messages found in queue size={}", result.getMessages().size());

                    result.getMessages().forEach(m -> {
                        if (sink.requestedFromDownstream() > 0L)
                        {
                            sink.next(m);
                        }
                    });
                }
                else
                {
                    if (backoff < MAX_BACKOFF)
                    {
                        backoff = backoff * 2;
                    }

                    LOGGER.debug("No messages found on queue.  Sleeping for {} ms.", backoff);

                    // This is to prevent rate limiting by the AWS api
                    Thread.sleep(backoff);
                }
            }
            catch (InterruptedException e)
            {
                stopRequested = true;
            }
            catch (ExecutionException e)
            {
                sink.error(e);
            }
        }
    });
}

Implementation seems working but there are few questions:

  • Looks like querying Future results in the loop can be done using Reactor Primitives, tried it with Flux.generate but was not able to control number of async call made to SqsClient
  • In case of Flux.interval approach don't understand how to proper implement backoff policy
  • Don't like Thread.sleep call any ideas how to replace it?
  • How to properly stop loop in case cancel signal? Using sink.error is used to cover that case now.
Disulfide answered 16/6, 2018 at 0:1 Comment(2)
There is no need for backoff with SQS, and it seems like you may have misunderstood some SQS behavior. Set your WaitTimeSeconds to the max value (20) and MaxNumberOfMessages to the max value (10). SQS will return immediately with up to 10 messages if there are any messages in the queue, otherwise will wait for 1 message to arrive and return it immediately (possibly > 1 if they arrive very, very close together). If none arrive for 20 seconds, the response returns empty. With this, you get messages immediately, and yet a completely idle queue will be polled only 180 times per hour.Nichani
I understand SQS part and planing to go with 'long polling' for production code but still interested in how to solve such issues using reactive approach.Disulfide
T
3

Previous answers seem to be missing the point:

Interval polling doesn't fit the criteria, so that approach should not be taken, as it will request the same amount of queries to SQS regardless of having or not messages. We can delegate that to SQS long polling mechanism.

Future.get() is blocking the thread, causing starvation and leading to poor performance, you should never use future.get() or .wait() or any blocking operation but rather use async callbacks, or in this case Flux.fromFuture should do the trick.

You can use Flux.generate, which will trigger for every requested item request(N) from the downstream, so what you need to control really is how the downstream requests to the upstream.

Your Flux.generate can just be a Flux<Mono<ReceiveMessageResponse>> so that generate just creates a Mono (async) query to SQS.

Then you can make use of .flatMap async behaviour with concurrency set, so you limit the amount of parallel queries to SQS at any given time, preventing your app from issuing too many requests to SQS.

So in essence, something like this should work:

var receiveRequest = ReceiveMessageRequest.builder()
                .queueUrl(queueUrl)
                .waitTimeSeconds(20) // Long polling when there is no messages (non-blocking io)
                .maxNumberOfMessages(10) // Batching 
                .visibilityTimeout(1) // Sample
                .messageAttributeNames("All") // Sample
                .build();

Flux.generate((SynchronousSink<Mono<ReceiveMessageResponse>> sink) ->  
                sink.next(Mono.fromFuture(this.sqsClient.receiveMessage(receiveRequest))))
                .flatMap(asyncSqsQuery -> asyncSqsQuery, 4) // Defaults to 256
                .flatMapIterable(ReceiveMessageResponse::messages);

Remember that .flatMap is async in nature, so the concurrency set to (4) will allow up to 4 concurrent requests to the upstream (the generator), in essence it's transforming (or executing) the Mono upon subscription.

If you place anything in between it may change this behaviour, effectively requesting more elements to the generator and breaking that limit of concurrency to the sqs queries.

Other notes: Thread.sleep takes the thread and blocks it, instead you can return Mono.delay in order to prevent someone to request more items, this could be handy if for example SQS is down and queries fail, as there is no long-polling of 20s but rather will fail faster. You can make your generator Mono to be either a response from AWS or an empty response upon error, delayed some amount of time, like 1second.

Hope that helps.

Update

I have implemented a java lib to do precisely this, as using a fixed flatMap will make you trade-off amount of requests while there is no messages (billing costs) with max-throughput, and would not back-off when there is 429s, and that's not ideal.

https://github.com/juancarrey/reactor-poller

Main logic being here: https://github.com/juancarrey/reactor-poller/blob/main/reactor-poller-core/src/main/java/com/jcarrey/reactor/poller/core/AdaptativeConcurrencyControl.java

It covers your use case, as it adapts the concurrent polling based on the SQS response.

Tenebrific answered 3/4, 2023 at 14:1 Comment(0)
C
1

What do you think about the following solution:

    private static final Integer batchSize = 1;
    private static final Integer intervalRequest = 3000;
    private static final Integer waitTimeout = 10;
    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private static final SqsAsyncClient sqsAsync =
       SqsAsyncClient
         .builder()
         .endpointOverride(URI.create(queueUrl))
         .build();

    public static Flux<Message> sqsPublisher =
        Flux.create(sink -> {
                if (sink.isCancelled()) {
                    sink.error(new RuntimeException("Stop requested"));
                }

            scheduler.scheduleWithFixedDelay(() -> {
                long numberOfRequests = Math.min(sink.requestedFromDownstream(), batchSize);
                if (numberOfRequests > 0) {
                    ReceiveMessageRequest request = ReceiveMessageRequest
                            .builder()
                            .queueUrl(queueUrl)
                            .maxNumberOfMessages((int) numberOfRequests)
                            .waitTimeSeconds(waitTimeout).build();

                    CompletableFuture<ReceiveMessageResponse> response = sqsAsync.receiveMessage(request);

                    response.thenApply(responseValue -> {
                        if (responseValue != null && responseValue.messages() != null && !responseValue.messages().isEmpty()) {
                            responseValue.messages().stream().limit(numberOfRequests).forEach(sink::next);
                        }
                        return responseValue;
                    });

                }
            }, intervalRequest, intervalRequest, TimeUnit.MILLISECONDS);
        });
Collencollenchyma answered 17/12, 2019 at 16:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.