Aws integration spring: Extend Visibility Timeout
Asked Answered
A

3

8

Is it possible to extend the visibility time out of a message that is in flight.

See:

http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/AboutVT.html.

Section: Changing a Message's Visibility Timeout.

http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/sqs/AmazonSQSClient.html#changeMessageVisibility-com.amazonaws.services.sqs.model.ChangeMessageVisibilityRequest-

In summary I want to be able to extend the first set visibility timeout for a given message that is in flight.

Example if 15secs have passed I then want to extend the timeout by another 20secs. Better example in java docs above.

From my understanding in the links above you can do this on the amazon side.

Below are my current settings;

  SqsMessageDrivenChannelAdapter adapter =
  new SqsMessageDrivenChannelAdapter(queue);
  adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);
  adapter.setMaxNumberOfMessages(1);
  adapter.setSendTimeout(2000);
  adapter.setVisibilityTimeout(200);
  adapter.setWaitTimeOut(20);

Is it possible to extend this timeout?

Armbruster answered 13/7, 2016 at 15:11 Comment(2)
Sorry, your question isn't clear. We expose only those attributes which are present in the target ReceiveMessageRequest. I just don't understand what you would like to see else.Khorma
@ArtemBilan I hope this is clear now.Armbruster
K
5

OK. Looks like I see your point.

We can change visibility for particular message using API:

AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)

For this purpose in downstream flow you have to get access to (inject) AmazonSQS bean and extract special headers from the Message:

@Autowired
AmazonSQS amazonSqs;

@Autowired
ResourceIdResolver resourceIdResolver;
...


MessageHeaders headers = message.getHeaders();

DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);

String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));

String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);

amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);

But eh, I agree that we should provide something on the matter as out-of-the-box feature. That may be even something similar to QueueMessageAcknowledgment as a new header. Or even just one more changeMessageVisibility method to this one.

Please, raise a GH issue for Spring Cloud AWS project on the matter with link to this SO topic.

Khorma answered 14/7, 2016 at 19:51 Comment(1)
Looks like we have one already: github.com/spring-cloud/spring-cloud-aws/issues/92Khorma
P
6

Spring Cloud AWS supports this starting with Version 2.0. Injecting a Visiblity parameter in your SQS listener method does the trick:

  @SqsListener(value = "my-sqs-queue")
  void onMessageReceived(@Payload String payload, Visibility visibility) {
    ...
    var extension = visibility.extend(20);
    ...
  }

Note, that extend will work asynchronously and will return a Future. So if you want to be sure further down the processing, that the visibility of the message is really extended at the AWS side of things, either block on the Future using extension.get() or query the Future with extension.isDone()

Powwow answered 26/11, 2021 at 9:51 Comment(4)
This looks great on paper, but if I have to do a blocking call on the Future as you proposed, then what's the difference between this and a plain Thread::sleep ?Hornblende
I‘m confused. Where do you want your thread to sleep? After extending the visibility? Why would you want to do that?Powwow
Ideally, I'd like to return the message to the queue with the desirable timeout and have the queue take care of not showing it for that amount of time. If I wait inside my thread then why do I even bother modifying the visibility timeout parameter?Hornblende
That‘s not how SQS works. You can’t return a message to the queue explicitely. Read the docs.Powwow
K
5

OK. Looks like I see your point.

We can change visibility for particular message using API:

AmazonSQS.changeMessageVisibility(String queueUrl, String receiptHandle, Integer visibilityTimeout)

For this purpose in downstream flow you have to get access to (inject) AmazonSQS bean and extract special headers from the Message:

@Autowired
AmazonSQS amazonSqs;

@Autowired
ResourceIdResolver resourceIdResolver;
...


MessageHeaders headers = message.getHeaders();

DestinationResolver destinationResolver = new DynamicQueueUrlDestinationResolver(this.amazonSqs, this.resourceIdResolver);

String queueUrl = destinationResolver.resolveDestination(headers.get(AwsHeaders.QUEUE));

String receiptHandle = headers.get(AwsHeaders.RECEIPT_HANDLE);

amazonSqs.changeMessageVisibility(queueUrl, receiptHandle, YOUR_DESIRED_VISIBILITY_TIMEOUT);

But eh, I agree that we should provide something on the matter as out-of-the-box feature. That may be even something similar to QueueMessageAcknowledgment as a new header. Or even just one more changeMessageVisibility method to this one.

Please, raise a GH issue for Spring Cloud AWS project on the matter with link to this SO topic.

Khorma answered 14/7, 2016 at 19:51 Comment(1)
Looks like we have one already: github.com/spring-cloud/spring-cloud-aws/issues/92Khorma
P
0

TLDR: Yes, you can change the visibility timeout for an SQS message in Spring Cloud AWS using the Visibility parameter in your SQS listener method. The Visibility extend method does not extend your default visibility timeout; it extends the time from when you send this visibility request to SQS. Internally, this visibility method calls sqs.changeMessageVisibilityAsync, which changes the visibility timeout of a specified message in a queue to a new value.

Example: If your default timeout is 300 seconds and, your message failed due to some transient error within 100 seconds and you want to retry the message then, you can use extend(10). This means your listener will read the message again within the next 10 seconds not it reaches 300 seconds. The visibility timeout changes by 10 seconds from the point of invocation within your listener for the message.

import org.springframework.cloud.aws.messaging.listener.Visibility;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class SqsMessageListener {

    @SqsListener(value = "my-sqs-queue")
    public void onMessageReceived(@Payload String payload, Acknowledgment acknowledgment, Visibility visibility) {
        // Your message processing logic here
        try {
            // If processing succeeds, acknowledge the message
            processMessage(payload);
            // Acknowledge the message
            acknowledgment.acknowledge();
        } catch (Exception e) {
            if (e instanceof CustomTransientException) {
                // If you need to change visibility, call visibility.extend()
                // Example: Changing visibility by 20 seconds from now onwards so this message will be re-consumed from the queue after 20 seconds.
                visibility.extend(20);
            } else {
                acknowledgment.acknowledge();
            }
            // Handle exceptions
        }
    }
}
  • Acknowledgment can be used to discard the message from the queue once it's successfully processed.

  • The Visibility extend method does not extend your default visibility timeout; it extends the time from when you send this request to SQS. Internally, this visibility method calls sqs.changeMessageVisibilityAsync, which changes the visibility timeout of a specified message in a queue to a new value.

  • If exceptions occur, you don't need to wait until you reach the visibility timeout; you can set a shorter interval to retrieve the message again.

Pines answered 21/3 at 8:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.