using amazon sqs in a @MessageDriven bean - pooling / parallel processing
Asked Answered
M

4

5

We need to use queues in our Java EE application and since it is a cloud base application (deployed on OpenShift Online), we like to use amazon sqs.

If I understand the theorie of the receiving part of JMS / Java EE correctly, a @MessageDriven bean is managed by the Java EE container so that a lot of bean instances are created in parallel (according max pool size), if the number of the incoming messages is high. This is of course a big benefit to process high loads.

However, I do not see how we can integrate aws sqs this way in a Java EE application. I know the asynchronous receiver examples from http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:

class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

and then:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

This is the official asynchronous receiver example - which is not a @MessageDriven bean. It is obvious, that we need somewhere the credentials to authenticate (by creating an SQSConnectionFactory, then a connection, then a session - which is also well described in the example).
But I strongly suppose that this example will not process the messages in parallel - i.e. only one bean instance is processing the queue and this is not a good solution for scalable, high loaded applications.

a) How can we go the real Java EE way with Amazon SQS? I just find a planty of Spring examples. But it must be Java EE 7.

b) We use Wildfly (currently 8.2.1). Would it be also possible to let Wildfly manage the connection to AWS and application internally, we could use the queue as if it were an application server managed queue (same approach like data sources for DB access)?

Conclusion after got an answer from stdunbar:
It seems not to be possible in a 'proper way', what I like to do. So what should I do? Implement a ManagedExecutorService as stdunbar described to 'wrap' the queue? - However this implies to have a local queue as well and this is not a good situation for an application, which should be scaleable?! What is about alternatives? We are running the application on OpenShift Online. It would probably be bether to instantiate an own gear with e.g. ApacheMQ Cartridge... there are of course a lot of disadventages like costs and that we are responsible for the 'infrastructure'.

To be honest, I am really disappointed of AWS in this case...

Mountaineer answered 15/12, 2016 at 19:28 Comment(0)
C
4

I don't think that my solution is proper JAVA EE, but in my case it works.

Configuration:

@Singleton
public class SqsMessageManager
{
    private Integer numberOfReceivers = 3;

    public static SQSConnection connection = null;
    public static Queue queue = null;

    @Inject
    SqsMessageReceiver sqsMessageReceiver;

    public void init()
    {
        try
        {
            SQSConnectionFactory connectionFactory =
                    SQSConnectionFactory.builder()
                            .withRegion(Region.getRegion(Regions.EU_WEST_1))
                            .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
                            .build();

            connection = connectionFactory.createConnection();

            queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");

            for (int i = 0; i < numberOfReceivers; i++)
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);

            connection.start();
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}

Then the sender:

@Dependent
public class SqsMessageSender
{
    MessageProducer producer = null;
    Session senderSession = null;

    @PostConstruct
    public void createProducer(){
        try
        {
            // open new session and message producer
            senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = senderSession.createProducer(SqsMessageManager.queue);
        }catch(JMSException | NullPointerException e){
            ;
        }
    }

    @PreDestroy
    public void destroy(){
        try
        {
            // close session
            producer.close();
            senderSession.close();
        }catch(JMSException e){

        }
    }

    // sends a message to aws sqs queue
    public void sendMessage(String txt)
    {
        try
        {
            TextMessage textMessage = senderSession.createTextMessage(txt);
            producer.send(textMessage);
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}

And the receiver:

@Dependent
public class SqsMessageReceiver implements MessageListener
{
    public void onMessage(Message inMessage) {
        ...
    }
}
Caltrop answered 20/12, 2016 at 13:30 Comment(2)
I do not exactly see, how your numberOfReceivers works. You create multiple listeners on same object (you inject sqsMessageReceiver, which is in fact one instance)?Mountaineer
I have done almost the same example but have a problem with EntityManager class if it is declared in SqsMessageReceiver. When I like to persist something I get the: javax.persistence.TransactionRequiredException: JBAS011469: Transaction is required to perform this operation (either use a transaction or extended persistence context). Anybody knows how to solve the problem?Scriptural
S
3

According to some older docs I found

A container allows many instances of a message-driven bean class to be running concurrently, thus allowing for the concurrent processing of a stream of messages.

By using the Amazon JMS integration, coupled with a declarative MDB, you should be good to go. I would not use the setMessageListener interface. You can use the declarative version of JMS since you're on Wildfly 8.x / EE7:

@MessageDriven(activationConfig = { /* your config - i.e. queue name, etc */ })
public class MyListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
    }
}

This allows the container to create as many of the instances as needed. Note that there may need to be some tuning needed in Wildfly for the JMS parameters.

As a side note, let the Amazon libraries take care of the reading of the SQS queue. I've started out rolling my own reader, thinking that I could thread it. But what I discovered is that you can't use the AWS Java libraries with multiple threads reading from the queue as you will get duplicate reads almost every time. I had 4 threads reading the SQS queue and would get 4 of the same message. I finally changed to a single reader putting the message into a LinkedBlockingDeque to be consumed by some number of other threads.

Everything I've shown is pure Java/EE.

EDIT
After playing around with the Amazon SQS/JMS integration some I feel that you're wasting your time if you use it. It is only for JMS 1.1 so it uses the old JMS syntax with JNDI thrown in too. Additionally, it only works for Queues, not Topics too.

I'd strongly recommend creating your own implementation. A ManagedExecutorService that runs a queue reader thread with a short SQS read timeout. Each loop through will read from the SQS queue and put the messages into the JMS queue or topic.

Sorry to have gotten your hopes up on this - Amazon has just not been maintained it enough to be worthwhile.

Subscription answered 15/12, 2016 at 21:50 Comment(3)
Thank you, stdunbar. I am happy to hear that this should work - however, what I do not know is how to bring the needed parameter (credentials) into play. Where do I put the AWS credential? And how do I achieve your note As a side note, let the Amazon libraries take care of the reading of the SQS queue? - I think annotating with @MessageDriven will make the application server to read the queue?! - I appreciate, if you could be a little bit more specific in how to integrate AWS SQS in your code.Mountaineer
I suspect that you are right. These aren't good news. If I understand you correctly, it is just the SDK, which is "not up to date", and not the service per se? - I updated the question with a conclusion and a further question.Mountaineer
The question marked with b) is still unanswered. In the hope to get probably some more ideas, what to do, I start a bounty... thanks anyway so far [+1]!Mountaineer
S
1

Payara Cloud Connectors seems to be pretty new but looks promising. Don't know if this works with other containers as well. It is based on JCA adapters as far as I understand.

Spenser answered 1/3, 2018 at 17:55 Comment(0)
A
0

Generally, to make an MDB "connect" to a remote JMS Q you need a Resource Adapter (RA). In theory such an RA implemented based on JMS spec should work with any spec compliant JMS provider and thus in theory you should be able to reuse for example this implementation.

However as even the README of the above mentioned project says, you should rather use an RA provided by a particular JMS provider rather than a generic one. Unfortunately Amazon does not provide one :(

Quite recently however, some awesome guy created an unofficial open-source one. I'm just evaluating it and will update this answer based on my experience later. (Comments from other users of this RA are highly welcome)

Aristophanes answered 30/5, 2018 at 10:43 Comment(3)
Hey @Aristophanes any updates on how you used the generic-jms-ra resource adapter for Java EE MDB with AWS SQSTom
I don't intend to use the generic one but the unofficial open source one mentioned in my answer. it's my task for the next week, so stay tuned ;)Aristophanes
Has anyone solved the problem with message-driven bean and AWS SQS? Any example?Scriptural

© 2022 - 2024 — McMap. All rights reserved.