Trying to create SQS Poller which:
- Do exponential polls (to reduce number of request if there is no messages present in the queue)
- Query SQS more often if there are a lot of messages in queue
- Have backpressure if certain number of messages are received, it stops polling
- Not to be throttled by AWS API rate limit
As an example I'm using this JavaRx implementation which is easily transformed to Project Reactor and enrich it with backpressure.
private static final Long DEFAULT_BACKOFF = 500L;
private static final Long MAX_BACKOFF = 8000L;
private static final Logger LOGGER = LoggerFactory.getLogger(SqsPollerService.class);
private static volatile boolean stopRequested;
public Flux<Message> pollMessages(GetQueueUrlResult q)
{
return Flux.create(sink -> {
long backoff = DEFAULT_BACKOFF;
while (!stopRequested)
{
if (sink.isCancelled())
{
sink.error(new RuntimeException("Stop requested"));
break;
}
Future<ReceiveMessageResult> future = sink.requestedFromDownstream() > 0
? amazonSQS.receiveMessageAsync(createRequest(q))
: completedFuture(new ReceiveMessageResult());
try
{
ReceiveMessageResult result = future.get();
if (result != null && !result.getMessages().isEmpty())
{
backoff = DEFAULT_BACKOFF;
LOGGER.info("New messages found in queue size={}", result.getMessages().size());
result.getMessages().forEach(m -> {
if (sink.requestedFromDownstream() > 0L)
{
sink.next(m);
}
});
}
else
{
if (backoff < MAX_BACKOFF)
{
backoff = backoff * 2;
}
LOGGER.debug("No messages found on queue. Sleeping for {} ms.", backoff);
// This is to prevent rate limiting by the AWS api
Thread.sleep(backoff);
}
}
catch (InterruptedException e)
{
stopRequested = true;
}
catch (ExecutionException e)
{
sink.error(e);
}
}
});
}
Implementation seems working but there are few questions:
- Looks like querying Future results in the loop can be done using Reactor Primitives, tried it with
Flux.generate
but was not able to control number of async call made to SqsClient - In case of
Flux.interval
approach don't understand how to proper implement backoff policy - Don't like
Thread.sleep
call any ideas how to replace it? - How to properly stop loop in case cancel signal? Using
sink.error
is used to cover that case now.
WaitTimeSeconds
to the max value (20) andMaxNumberOfMessages
to the max value (10). SQS will return immediately with up to 10 messages if there are any messages in the queue, otherwise will wait for 1 message to arrive and return it immediately (possibly > 1 if they arrive very, very close together). If none arrive for 20 seconds, the response returns empty. With this, you get messages immediately, and yet a completely idle queue will be polled only 180 times per hour. – Nichani