Finding certain messages in SQS
Asked Answered
J

11

38

I know SQS ain't built for that, but I'm curious is it possible to find messages in a queue that meet some criteria?

I can pull messages in a loop, search the message bodies for some pattern (without even deserializing them), and filter the messages I needed. But then it is possible to end up with an infinite loop - the first messages I read will be back to the queue by the time when I reach the end of the queue...

Extending visibility of the messages possible, but how do I know exactly how long it will take to scan the entire queue, and for how long should I extend the visibility? What if I have literally ten thousand messages in there?

Is there any workaround here? I need to scan the queue for some messages, and delete those...

Jephum answered 27/9, 2012 at 22:9 Comment(1)
Maybe SNS and topics could help you here, as I think what you are trying to have is a topic-based consumer setup.Bunghole
E
36

Short answer: no.

Queues are designed for things like tasks. A machine grabs a new task (i.e., message) from the queue, executes the task, then deletes the task.

If you're trying to search the messages to filter them, I can't help but wonder if you're using the wrong tool for the job…

Enswathe answered 5/10, 2012 at 8:33 Comment(2)
What like maybe a cloudy-rabbit cloudamqp.com/plans.html (with the cute looking plans) and queueviewer.com ??? ... oh maybe its not that simple... #9874734Integration
In your lookout what tool is good for publishing and subscribing while also giving the functionality of filtering out messages?Blubber
E
8

I do not think the short or long answers are "No".

Here are two counterpoint solutions that are "Yes".

  1. Traversing the Queue, maintaining a visited list
  2. Using Enterprise Integration Patterns (message routing) to split your messages into downstreams based on criteria

1. Traversing the Queue, maintaining a visited list

Consider the case of a queue with N messages, with no messages being added or deleted. Without additional information (e.g. if you knew how many messages should match your criteria), you need to traverse all N messages.

The key point here is knowing when you've traversed all N messages. There are some issues here.

  1. To know exactly, you'd have to track messages as they are added to the queue
  2. To know approximately, you can get the ApproximateNumberOfMessages attribute of the queue
  3. Or you could receive messages in a loop, maintaining a visited list, and assume that you will eventually sample and exhaust messages from each server your queue is sharded over

To maintain the visited list, as you receive messages and evaluate your match criteria, you could store the message_id of all visited messages.

Message ID's are nearly unique. See this thread

https://forums.aws.amazon.com/message.jspa?messageID=76119

If you went with (3), you wouldn't be certain how many iterations would be required to exhaust the queue. However, if you performed this indefinitely, you'd be guaranteed to exhuast the queue so long as the weighted random distribution over the SQS shard servers gives them all non-zero probability.

2. Using Enterprise Integration Patterns (message routing) to split your messages into downstreams based on criteria

If you have control over your messaging architecture you could use a Message Router as a "front-end" message processor that dispatches messages to various recipients based on criteria.

And specifically you'd use a Content-Based Router:

http://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html

Echolocation answered 4/4, 2017 at 6:39 Comment(3)
Bingo. For an example of #2, look no further than Google Task Queues (on their App Engine platform). You can tag messages on creation, and poll for specific tags when pulling from the queue.Strode
What #2 is basically saying is: if you want SQS to behave like that, you need to put something in front of it, which I think it's exactly what the question was trying to avoid. #1 is just bad: if you have different consumer roles, you'd never be able to control who receives what, and would have to rely on sheer luck that the message gets picked up by the appropriate consumer. It's like going to your post office and requesting the mailman to give them all the letters for your postal code, and you'll give them back the ones that are not yours...Bunghole
Looking at this again in 2021, the answer is still the same. SQS — by itself — is fundamentally not designed for this task. You can build custom software around it, sure. Or you can pre-filter before pushing it into SQS. But SQS alone is not designed for this.Enswathe
L
3

We had a similar requirement and ended up with an architecture described in this "Hands On" tutorial: Filter Messages Published to Topics.

Essentially, instead of publishing events/messages to an SQS queue, you publish them to an SNS topic and each consumer will have their own SQS queue that is subscribed to the topic. You can then use SNS Subscription Filters to ensure only the relevant messages are enqueued to each consumer's queue.

This creates additional infrastructure overhead but it worked well as a solution for us.

Lariat answered 6/4, 2020 at 17:0 Comment(0)
O
2

Tested with different cases. it does not work. The answer is NO

TestData

public void fillQueueWithMessages(){

  MessageAttributeValue value1 = new MessageAttributeValue();
  value1.setDataType("String");
  value1.setStringValue("1");

  SendMessageRequest send_msg_request = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test1").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request);


  MessageAttributeValue value2 = new MessageAttributeValue();
  value2.setDataType("String");
  value2.setStringValue("2");


  SendMessageRequest send_msg_request2 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test2").addMessageAttributesEntry(value2.getStringValue(), value2);
  amazonSqs.sendMessage(send_msg_request2);

  SendMessageRequest send_msg_request3 = new SendMessageRequest()
      .withQueueUrl(env.getProperty("cloud.aws.sqs.readyForTranslation.url"))
      .withMessageBody("test3").addMessageAttributesEntry(value1.getStringValue(), value1);
  amazonSqs.sendMessage(send_msg_request3);

}

Test

public void shouldPollMessagesBasedOnMessageAttribute() throws InterruptedException {

ReceiveMessageRequest request =
    new ReceiveMessageRequest(env.getProperty("cloud.aws.sqs.readyForTranslation.url"));
request.setMaxNumberOfMessages(3);
request.setWaitTimeSeconds(20);
request.withMessageAttributeNames("1");

List<Message> messages = new ArrayList<Message>();

messages = amazonSqs.receiveMessage(request).getMessages();

assertEquals(2, messages.size());
}
Orson answered 3/11, 2017 at 14:40 Comment(0)
I
1

Even though when requesting specific attributes, the value will just be set to null for the messages that do not contain the attribute, you could still use to filter in a way. Those that do not have the attribute set the way you want can have their visibility set to 1 and then released, so they will remain on the queue. Would give a crude way of doing priority queuing, though you could just as easily do the same based on the message content.

Impress answered 15/8, 2016 at 17:22 Comment(0)
I
1

For the humble devops engineer with total control of everything:

(1) Quickly turn off the consumers, so the message is captured in the queue.

(2) Turn off the source.

(3) Read all the SQS queue looking for your message, while also copying to a 'temp' queue.

(4) Copy all the 'temp' queue back into the SQS queue. Google it there are many tools.

(5) Restart source and consumers.

Another way if you had thought of it beforehand would be to use SNS or something to copy to a auxilary 'devops' queue and read through that when you need to find a message. You could set the retention period of the 'devops' queue shortish to keep it reasonable in size.

Integration answered 12/12, 2019 at 15:58 Comment(0)
O
1

If you have a reasonably small list of message filters you could create a separate queue for each pattern.

For example, if the thing your filtering on is "color" and you know there will only be red, blue and green for values, you could create one queue for each color. That way you don't need to filter--just pull from the appropriate queue. Obviously, if there can be 1000 colors then this approach isn't practical.

Orangewood answered 13/10, 2021 at 15:42 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Calorie
B
0

Old topic, but may be helpful You can use FIFO with GroupId for small list of messages thread which helped me

Brookhouse answered 4/6, 2020 at 7:33 Comment(1)
To save others' time: this doesn't help. The group ID is not something that a consumer can leverage upon.Emory
A
0

You can filter the messages while subscribing to the sns topic. SNS topic has a subscription filter policy. You can add a filter criteria as below to move the messages to two different queues based on a value in the message:

{ "EventType": [ "ce.cm.test-employee" ] }
Adolfoadolph answered 8/5, 2023 at 19:39 Comment(0)
K
-1

Lets understand this via some examples so create 10 message and send it

// Send a message
for (int i = 0; i < 10; i++) {
    System.out.println("Sending a message to MyQueue.\n");
    Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
    // extra code

    String sdate;
    Format formatter;
    Date date = new Date();

    // 2012-12-01
    formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ");
    sdate = formatter.format(date);
    System.out.println(sdate);

    messageAttributes.put("Datestamp"+i, new MessageAttributeValue().withDataType("String").withStringValue(sdate));

    Map<String, MessageAttributeValue> messageAttributes1 = new HashMap<>();
    messageAttributes1.put("attributeName", new MessageAttributeValue().withDataType("String").withStringValue(sdate));
    SendMessageRequest request = new SendMessageRequest();
    request.withMessageBody("A test message body."+sdate);
    request.withQueueUrl(myQueueUrl);
    request.withMessageAttributes(messageAttributes);
    sqs.sendMessage(request);
}

Now even you have 10 message with datetimestamp1 to datetimestamp10
filtering with attribute will not work

lets try filter with some myTag attribute

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

//ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveMessageRequest.withMaxNumberOfMessages(10);
receiveMessageRequest.withMessageAttributeNames("myTag");
List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();

It gives 10 message and myTag value is null

message.getMessageAttributes().get("Datestamp") is null message.getMessageAttributes().get("myTag") is null

So we cannot filter with message attribute as if that key is not found. no message attribute or with All message attribute is same .

So long answer is NOOOOO

Kinsfolk answered 24/5, 2016 at 3:45 Comment(1)
What I want to explain here is you can filter with any attribute cuz data will still show up and will show attribute value is null.Kinsfolk
C
-8

this is actually not all true,

in fact you can 'kinda' filter messages in a queue using messages attributes trick.

each message can contain attributes that you can add while creating the message (you will need to provide 3 things for each attribute: name, type, value).

later on, when you make new ReceiveMessageRequest object, you can use "withMessageAttributeNames" to specify an attribute, and what is actually happen is that your queue get filtered for the messaged containing that specific attribute.

for example:

String queueUrl = sqs.getQueueUrl("myQueue").getQueueUrl();

ReceiveMessageRequest receiveRequest = new ReceiveMessageRequest(queueUrl);

receiveRequest.withMaxNumberOfMessages(10);
receiveRequest.withMessageAttributeNames("myTag");

if your Queue was holding 5 messages but only 1 had the "myTag" attribute then only that specific one will be returned.

this was overwhelming for me as this is not mentioned in the ReceiveMessageRequest API

so basically all you have to do is give each message a unique attribute (please pay attention the attributes limits: The message attribute name can contain the following characters: A-Z, a-z, 0-9, underscore (_), hyphen (-), and period (.). The name must not start or end with a period, and it should not have successive periods. The name is case sensitive and must be unique among all attribute names for the message. The name can be up to 256 characters long. The name cannot start with "AWS." or "Amazon." (or any variations in casing)

Cleo answered 2/4, 2016 at 21:24 Comment(5)
Can someone confirm this? I've tried something similar with Node.js sdk(which option is MessageAttributeNames) and it didn't work, returned all messages as always. Maybe I did it wrongFiliano
@Filiano not working for me neither...tried with nodejs with no luck.Sankey
sqs does not have feature of filtering with message attribute.Kinsfolk
@Filiano I'm pretty sure MessageAttributeNames is for defining the message attributes you wish to receive for each message, not for filtering the actual messages received.Offensive
@MichaelMason yah, I thought that much until someone talked about it as a miraculous undocumented feature: this was overwhelming for me as this is not mentioned in the ReceiveMessageRequest APIFiliano

© 2022 - 2024 — McMap. All rights reserved.