Async consumer for Amazon sqs
Asked Answered
E

5

10

I am new to working with queues. I am able to successfully post messages and receive them synchronously However, I am now trying to async now.

The reference links provided by sqs suggests using jmsclient wrapper. And the link also mentions to use it if you already have a code that is integrated to a jms client.

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/jmsclient.html#samples

But I am starting afresh I referred this example to send and recv messages synchronously.

https://github.com/aws/aws-sdk-java/blob/master/src/samples/AmazonSimpleQueueService/SimpleQueueServiceSample.java

Can I use the same code but implement it with a message listener? Any code examples will be appreciated.

Ecstatics answered 26/8, 2015 at 9:27 Comment(0)
H
10

There's a code sample in the section about Using JMS with Amazon SQS of the Amazon SQS Developer Guide that shows how to asynchronously receive messages using JMS.

First you implement the MessageListener interface:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

And then you set it as the MessageListener for a MessageConsumer:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

// Wait for 1 second. The listener onMessage() method will be invoked when a message is received.
Thread.sleep(1000);
Headpin answered 2/11, 2016 at 23:17 Comment(2)
where do I get the object queue from to pass to the .createConsumer method? I use an existing queue, I do not want to create a new queue.Inscribe
Does this continuously listen to messages from queue or does it shutdown after 1 minute?Inscribe
S
6

You can use sqslistener annotation from SpringCloud framework. If you are developing application with Spring and AWS and you are not using Spring Cloud , it is good time for you to switch.

Here is a sample code to asynchronously receive message from SQS using sqslistener annotation. A good thing is you have to almost zero configuration for using this :

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
import com.example.my.RecoverableException;

@Component
@Slf4j
public class CustomMessageQueue {

    @SqsListener(value = "${build_request_queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void receive(String message) {
        try {
            // write message processing here
        } catch (RecoverableException e) {
            // handle errors here for which message from queue should not be deleted
            // throwing an exception will make receive method fail and hence message does not get deleted
            throw e;

        } catch (Exception e) {
            // suppress exceptions for which message should be deleted.
        }
    }
}

The great thing about sqslistener annotation is its deletionPolicy. So you can decide when a message from SQS gets deleted.

Submersible answered 23/11, 2018 at 18:48 Comment(0)
N
0

Although this is old question I will add another solution I use in one of my Python apps. It might give you an idea how to do it in any language.

I have kind of proxy function (listener) which gets triggered by SQS. In this function I have for loop that loops through messages and invokes another Lambda function by executing invoke method of lambda client. Each message 'knows' function name which should be invoked.

I use this proxy function for all my other functions within the app and triggered by SQS.

This way I can utilise Lambda destinations functionality added recently.

Newhall answered 10/2, 2020 at 18:11 Comment(0)
H
0

I use aws sdk to connect to SQS

AWSCredentials credential = new BasicAWSCredentials(accessKey, secretKey);
AWSCredentialsProvider awsCredentialsProvider = new AWSStaticCredentialsProvider(credential);
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.US_EAST_1).withCredentials(
            awsCredentialsProvider).build();

ReceiveMessageRequest messageRequest = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(10)
            .withMaxNumberOfMessages(10);;

ReceiveMessageResult queueResult = sqs.receiveMessage(messageRequest);
    List<Message> messages = offerchangenotificationQue.getMessages();
    for (Message message : messages) {
        log.info("Receive message from queue {}", message.getBody());
    }
Hovercraft answered 26/5, 2020 at 0:2 Comment(0)
Z
-1

SQS stands for "Simple Queue Service." It is meant to be simple, literally. As a result, it does not support some of the niceties of JMS, in particular asynchronous listeners.

I have written a blog post on this topic: http://thedulinreport.com/2015/05/09/guaranteeing-delivery-of-messages-with-aws-sqs/

Basically, what you need to do is write a poller in an infinite loop, but you want to be smart about it -- you don't want to keep polling too much because you are charged per request.

Zipper answered 26/8, 2015 at 11:47 Comment(2)
Thanks I ll check your post. Instead of us polling, I am looking at options to leverage messagelisteners. Any pointers on that would be very useful.Ecstatics
There is no message listener in the same sense as JMS message listeners in SQS.Zipper

© 2022 - 2024 — McMap. All rights reserved.