How to subscribe a SQS queue to a SNS topic in Java
Asked Answered
D

3

7

When I create a new queue and subscribe it to a topic in Java, no message comes. The same via the AWS web console works fine.

I guess I have to confirm the subscription somehow, but the sns.confirmSubscription method needs a token - where shall I get it?

This is my Java code:

String queueURL = sqs.createQueue("my-queue").getQueueUrl();

sns.subscribe(myTopicARN, "sqs", queueURL);

sns.publish(myTopicARN, "{\"payload\":\"test\"}");

sqs.receiveMessage(queueURL).getMessages()
        .forEach(System.out::println);  // nothing

What am I doing wrong?

Diplosis answered 8/2, 2019 at 11:16 Comment(6)
Your IAM user may have required permission when doing it from console. You may need to check credentials used by the SDK have correct permissions.Dia
@Dia I actually created an admin user and using its credentials by setting AWS_PROFILE=user-from-credentialsDiplosis
@Dia anyway, in this case I would expect an exception...Diplosis
true. have you enabled long polling in the queue?Dia
No, I wrote just the actual code you can see above. Do I need this when the message is sent and received immediately?Diplosis
by default SQS uses short polling and message published to SNS topic may not be available in the queue immediately when you call receiveMessage api. Try enabling long-pollingDia
M
6

Check this out: https://aws.amazon.com/blogs/developer/subscribing-queues-to-topics/

You should subscribe like this:

Topics.subscribeQueue(sns, sqs, myTopicARN, queueURL);

This convinient method creates a policy for the subscription to allow the topic to send messages to the queue.

Madoc answered 8/2, 2019 at 11:44 Comment(1)
In what artifact can Topics be found?Ferree
C
2

For completeness, you can subscribe an SQS queue to a SNS topic with the v2 java sdk like this:

        CreateTopicResponse topic = snsClient.createTopic(CreateTopicRequest.builder()
                .name("topic-name")
                .build());

        String topicArn = topic.topicArn();

        CreateQueueResponse
                queue
                = sqsClient.createQueue(CreateQueueRequest.builder()
                .queueName("queue-name")
                .build());

        String queueUrl = queue.queueUrl();

        GetQueueAttributesResponse
                queueAttributes
                = sqsClient.getQueueAttributes(GetQueueAttributesRequest.builder()
                .attributeNames(QueueAttributeName.QUEUE_ARN)
                .queueUrl(queueUrl)
                .build());

        String queueArn = queueAttributes.attributes().get(QueueAttributeName.QUEUE_ARN);

        snsClient.subscribe(SubscribeRequest.builder()
                .topicArn(topicArn)
                .protocol("sqs")
                .endpoint(queueArn)
                .build());

I have not tried this against actual AWS but it runs fine against localstack for testing.

Note that you must subscribe the SQS queue via its ARN not via the queueUrl.

Cumulus answered 18/1 at 14:49 Comment(1)
Using the queueUrl works for me.Ferree
M
0

subscribing the queue to sns does not automatically create a policy to allow sns to send messages to the queue (based on my experience with sns/sqs) so you need to create the policy yourself and give permission to sns to send messages to your queue this is an example on how to do it using queue url , queue arn and topic arn

import static com.amazonaws.auth.policy.Principal.All;
import static com.amazonaws.auth.policy.Statement.Effect.Allow;
import static com.amazonaws.auth.policy.actions.SQSActions.SendMessage;
import static com.amazonaws.auth.policy.conditions.ArnCondition.ArnComparisonType.ArnEquals;

final Statement mainQueueStatements = new Statement(Allow) //imported above
        .withActions(SendMessage) //imported above
            .withPrincipals(All) //imported above
            .withResources(new Resource(queueArn)) // your queue arn
            .withConditions(
                    new Condition()
                            .withType(ArnEquals.name()) //imported above
                            .withConditionKey(SOURCE_ARN_CONDITION_KEY) //imported above
                            .withValues(topicArn) // your topic arn
            );
    final Policy mainQueuePolicy = ()
            .withId("MainQueuePolicy")
            .withStatements(mainQueueStatements);
    final HashMap<QueueAttributeName, String> attributes = new HashMap<>();
     attributes.put(QueueAttributeName.Policy.toString(), mainQueuePolicy.toJson());
    amazonSQS.setQueueAttributes(new SetQueueAttributesRequest().withAttributes(attributes).withQueueUrl(queueUrl)); // your queue url
Mississippian answered 1/4, 2021 at 14:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.