Pattern to continuously listen to AWS SQS messages
Asked Answered
T

6

30

I have a simple class named QueueService with some methods that wrap the methods from the AWS SQS SDK for Java. For example:

public ArrayList<Hashtable<String, String>> receiveMessages(String queueURL) {
        List<Message> messages = this.sqsClient.receiveMessage(queueURL).getMessages();

        ArrayList<Hashtable<String, String>> resultList = new ArrayList<Hashtable<String, String>>();
        for(Message message : messages) {
            Hashtable<String, String> resultItem = new Hashtable<String, String>();
            resultItem.put("MessageId", message.getMessageId());
            resultItem.put("ReceiptHandle", message.getReceiptHandle());
            resultItem.put("Body", message.getBody());
            resultList.add(resultItem);
        }
        return resultList;
    }

I have another another class named App that has a main and creates an instace of the QueueService.

I looking for a "pattern" to make the main in App to listen for new messages in the queue. Right now I have a while(true) loop where I call the receiveMessagesmethod:

while(true) {
            messages = queueService.receiveMessages(queueURL); 
            for(Hashtable<String, String> message: messages) {
                String receiptHandle = message.get("ReceiptHandle");
                String messageBody = message.get("MessageBody");
                System.out.println(messageBody);
                queueService.deleteMessage(queueURL, receiptHandle);
            }
        }

Is this the correct way? Should I use the async message receive method in SQS SDK?

Tortoni answered 20/12, 2017 at 18:1 Comment(0)
C
36

To my knowledge, there is no way in Amazon SQS to support an active listener model where Amazon SQS would "push" messages to your listener, or would invoke your message listener when there are messages.

So, you would always have to poll for messages. There are two polling mechanisms supported for polling - Short Polling and Long Polling. Each has its own pros and cons, but Long Polling is the one you would typically end up using in most cases, although the default one is Short Polling. Long Polling mechanism is definitely more efficient in terms of network traffic, is more cost efficient (because Amazon charges you by the number of requests made), and is also the preferred mechanism when you want your messages to be processed in a time sensitive manner (~= process as soon as possible).

There are more intricacies around Long Polling and Short Polling that are worth knowing, and its somewhat difficult to paraphrase all of that here, but if you like, you can read a lot more details about this through the following blog. It has a few code examples as well that should be helpful.

http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/

In terms of a while(true) loop, I would say it depends. If you are using Long Polling, and you can set the wait time to be (max) 20 seconds, that way you do not poll SQS more often than 20 seconds if there are no messages. If there are messages, you can decide whether to poll frequently (to process messages as soon as they arrive) or whether to always process them in time intervals (say every n seconds).

Another point to note would be that you could read upto 10 messages in a single receiveMessages request, so that would also reduce the number of calls you make to SQS, thereby reducing costs. And as the above blog explains in details, you may request to read 10 messages, but it may not return you 10 even if there are that many messages in the queue.

In general though, I would say you need to build appropriate hooks and exception handling to turn off the polling if you wish to at runtime, in case you are using a while(true) kind of a structure.

Another aspect to consider is whether you would like to poll SQS in your main application thread or you would like to spawn another thread. So another option could be to create a ScheduledThreadPoolExecutor with a single thread in the main to schedule a thread to poll the SQS periodically (every few seconds), and you may not need a while(true) structure.

Crumley answered 26/12, 2017 at 5:56 Comment(3)
This was true in 2017, though on 28 June 2018 it became possible to trigger a Lambda from SQS: cabbagetech.blog/2018/08/09/…Warmblooded
@Warmblooded it is also worthy of note that you can not trigger a Lambda from SQS If you’re using SQS FIFO queues.Islas
Since nov 2019, there is also support to trigger a Lambda from SQS FIFO however they don't guarantee only once delivery aws.amazon.com/blogs/compute/…Acropolis
A
2

There are a few things that you're missing:

  • Use the receiveMessages(ReceiveMessageRequest) and set a wait time to enable long polling.
  • Wrap your AWS calls in try/catch blocks. In particular, pay attention to OverLimitException, which can be thrown from receiveMessages() if you would have too many in-flight messages.
  • Wrap the entire body of the while loop in its own try/catch block, logging any exceptions that are caught (there shouldn't be -- this is here to ensure that your application doesn't crash because AWS changed their API or you neglected to handle an expected exception).

See doc for more information about long polling and possible exceptions.

As for using the async client: do you have any particular reason to use it? If not, then don't: a single receiver thread is much easier to manage.

Anzovin answered 20/12, 2017 at 19:13 Comment(3)
The reason to use the async client is because, in the future, an app (.jar) hosted in AWS Lambda will use the SQS service wrapper to check if there is new messages in the queue to process. Since Lambda charges by event firing or function call, I think it will be cheaper to have a listener (using sqs + jws) that executes only when there are new messages. Please, could you correct me if I'am wrong in this last point?Tortoni
The only thing that the async client does is wrap each non-async request in a Callable then run it on an internal thread pool. I'm not sure what the "SQS service wrapper" is, but SQS isn't currently a supported event source. Most people use SNS instead, some use Kinesis.Need
With "SQS service wrapper" I refer to the class that I've created that wraps the some methods of the SQS SDK. I've found a way of setting a listener lo that triggers when a new message is in the queue. I should have dig deeper in the documentation.Tortoni
A
0

If you want to use SQS and then lambda to process the request you can follow the steps given in the link or you always use lambda instead of SQS and invoke lambda for every request.

Articular answered 21/12, 2017 at 4:40 Comment(3)
Thank you. I've read that post and it uses a time schedule with Cloudwatch that triggers a Lambda function that polls the queue. I was trying to avoid the time scheduler.Tortoni
this year SQS can trigger lambdasBor
@Bor If you want to add an answer I could accept it because SQS triggering a Lambda is what I needed at the moment :)Tortoni
B
0

As of 2019 SQS can trigger lambdas: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html

Bor answered 2/2, 2022 at 15:27 Comment(0)
C
0

I found one solution for actively listening the queue. For Node. I have used the following package and resolved my issue.

sqs-consumer

Link https://www.npmjs.com/package/sqs-consumer

Chidester answered 21/6, 2022 at 9:54 Comment(0)
P
0

It is several years later, but just in case someone searches this topic, i'm posting my solution. I'm not sure if this is the best one but works for us. Note that this uses Project Reactor. Also note that this solution is used for not very time-critical messages - delays of several minutes are ok for us here.

package myapp.amazonsqs;

import static org.apache.logging.log4j.LogManager.getLogger;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.Map;
import java.util.stream.Collectors;

public class MyAmazonSqsMessagingGateway {

    private static final Logger LOGGER = getLogger(MyAmazonSqsMessagingGateway.class);
    private static final long POLLING_PERIOD_SECONDS = 30L;
    // max 20
    private static final int POLL_WAIT_TIME_SECONDS = 20;
    private static final long MINIMUM_RETRY_PERIOD_SECONDS = 30L;
    private final String amazonAwsRegion;
    private final String amazonAwsAccessKeyId;
    private final String amazonAwsAccessKeySecret;
    private final String queueName;
    private AmazonSQS amazonSqsClient;

    public MyAmazonSqsMessagingGateway(
            final String amazonAwsRegion,
            final String amazonAwsAccessKeyId,
            final String amazonAwsAccessKeySecret,
            final String queueName
    ) {
        this.amazonAwsRegion = amazonAwsRegion;
        this.amazonAwsAccessKeyId = amazonAwsAccessKeyId;
        this.amazonAwsAccessKeySecret = amazonAwsAccessKeySecret;
        this.queueName = queueName;
    }

    public void init() {
        this.amazonSqsClient = createClient();
        start();
    }

    private AmazonSQS createClient() {
        return AmazonSQSClientBuilder
                .standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
                        this.amazonAwsAccessKeyId,
                        this.amazonAwsAccessKeySecret
                )))
                .withRegion(Regions.fromName(this.amazonAwsRegion))
                .build();
    }

    private void start() {
        LOGGER.debug("Starting..");
        final String queueUrl = getAndCheckMessageQueueUrl();
        final Duration initialDelay = Duration.ofSeconds(1L);
        final Duration pollingPeriod = Duration.ofSeconds(POLLING_PERIOD_SECONDS);
        final Duration minimumRetryPeriod = Duration.ofSeconds(MINIMUM_RETRY_PERIOD_SECONDS);
        // retry indefinitely with backoff, until this application is stopped
        final long maxNumberOfRetryAttempts = Long.MAX_VALUE;
        Flux.interval(initialDelay, pollingPeriod)
                .map(ignoredParameter -> receiveMessages(this.amazonSqsClient, queueUrl))
                .retryWhen(Retry
                        .backoff(maxNumberOfRetryAttempts, minimumRetryPeriod)
                        .doBeforeRetry(retrySignal -> LOGGER.warn(
                                "Exception when receiving messages, retrying.. ",
                                retrySignal.failure()
                        ))
                        .doAfterRetry(retrySignal -> LOGGER.debug("Retry complete."))
                )
                .subscribe(
                        receiveMessageResult -> receiveMessageResult
                                .getMessages()
                                .forEach(this::handleMessage),
                        throwable -> LOGGER.error(
                                "Non-recoverable exception when receiving messages from Amazon SQS: ",
                                throwable
                        )
                );
        LOGGER.debug("Start completed.");
    }

    private ReceiveMessageResult receiveMessages(final AmazonSQS amazonSqsClient, final String queueUrl) {
        LOGGER.debug("Receiving messages...");
        return amazonSqsClient.receiveMessage(new ReceiveMessageRequest(
                queueUrl).withWaitTimeSeconds(POLL_WAIT_TIME_SECONDS)
                .withMaxNumberOfMessages(10));
    }

    private String getAndCheckMessageQueueUrl() {
        final String queueUrl = amazonSqsClient
                .getQueueUrl(this.queueName)
                .getQueueUrl();
        if (queueUrl == null) {
            throw new IllegalStateException("queueUrl is null, cannot run!");
        } else {
            LOGGER.info(() -> String.format("Listening in queue %s", queueUrl));
        }
        return queueUrl;
    }

    private void handleMessage(final Message message) {
        logMessage(message);
        // do something useful with the message here.
    }

    private static void logMessage(final Message message) {
        if (LOGGER.isDebugEnabled()) {
            final Map<String, String> attributes = message.getAttributes();
            final String attributesAsSingleString = attributes
                    .keySet()
                    .stream()
                    .map(key -> "Attribute " + key + " value = " + attributes.get(key))
                    .collect(Collectors.joining("\n"));
            LOGGER.debug("Message received! id = "
                    + message.getMessageId()
                    + "\nreceipt handle = "
                    + message.getReceiptHandle()
                    + "\n"
                    + attributesAsSingleString
                    + "body:\n"
                    + message.getBody());
        }
    }

}
Pufahl answered 17/7, 2023 at 13:48 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.