AWS SQS to delay message by X seconds, then next message by X seconds again
Asked Answered
D

3

8

Looking for a way to delay a message in a being sent to a lambda by 5 seconds.

So, message 1 received by lambda then 5 seconds later message 2, then 5 seconds later message 3, etc, for say a thousand messages.

Was looking at SQS delay queue and message timers but they're not exactly what I'm looking for.

Step Functions using wait, but that would be expensive at the scale I need.

Ideally need an SQS queue that restricts messages to only being sent every 5 seconds, is there any way to do this?

p.s. not fussed about it being SQS, just need a solution

Decortication answered 23/7, 2020 at 12:54 Comment(5)
Can you explain why SQS delay queue or message-level delays are "not exactly what [you are] looking for"?Solitta
Yeah sure. With delay queue I could end up with, say 10 messages all being delayed by 5 seconds before they are visible, but then they would all become visible at the same time, so could all the consumed at the same time, not 5 seconds apart. Message timer I need to look more closely at, but from a brief look doesn't look like it would work for my use caseDecortication
You could maybe set lambda concurrency to 1 and batch size to 1 in lambda. Also ensuring that lambda executes 4-5 seconds. This way messages would be read one-by-one from the queue in roughly 5 second intervals. EDIT: On the second thought, this would be not the most efficient.Solitta
Good point, definitely an option. Ideally I wouldn't want to pay for the lambda to be on for 5 seconds if my actual execution time was only say 1 second.Decortication
Ideally the source of the messages should submit them if 5 seconds interval. Maybe CloudWatch logs with scheduled events would be also useful.Solitta
I
1

You could use the get_queue_attributes() and retrieve the 'ApproximateNumberOfMessagesDelayed'. This will tell you essentially how many messages you have currently in your queue and you can use this to multiply it with the desired delay time. For this to work you must delay each message individually and not the whole queue. (i.e DelayTime*ApproximateNumberOfMessagesDelayed + DelayTime)

Insipience answered 28/8, 2020 at 1:8 Comment(0)
F
1

I had an issue somewhat similar but in your case, if you delay the messsge getting to the queue, then the you don't have to worry about the delaying in consuming the message (in your case by the lambda).

As @Ryan mentioned,

enter image description here

when you sent the Delivery delay in the console (let's say to 5 secs.) it just delays the entire queue and not individual messages in the queue. Here is a good read to understand Delivery delay.

But the trick is not to delay the queue but to delay individual message (aka in aws term it is Message Queue.

Here is what I did,

I first batch my messages (please read the docs, as of now, you can only batch deliver 10 messages.) and then set the delay for each message and then send the them as a batch.

Setting up to send batch messages

def setting_to_send_batch_messages(inputDict): 
    """This function sets up a dict. into a batch message(up to 10) so that it can be sent at once (i.e. as a batch) 

    Args:
        inputDict ([dict]): [dict. that needs to be batched]

    Returns:
        [lst]: [list of dicts of messages]
    """

    stock_cnter = 1 # Iterating stock counter
    msg_cnter = 1 # Counter to keep track of number of messages

    entryVal_dict = {} # dict. to hold values for each message in the batch
    thisMsgAttribute_perStock_dict = {} # dict. to hold Message Attributes per stock

    msg_lst = [] # List to hold all dicts (i.e. stock info) per message

    # In the batch, per message delay
    delay_this_message = 0
    # NOTEME: By setting it to 0, means the very first message there is no delay (i.e. sent immediately to the queue) a delay (in seconds) is added to subsequent messages 

    # looping over dict.
    for key,val in inputDict.items():

        # dict. holding to message attributes
        msgAttributes_dict = {
            'fieldID' + str(stock_cnter): {
                'StringValue': key,
                'DataType': 'String'
            },
            'ticker' + str(stock_cnter): {
                'StringValue': val,
                'DataType': 'String'
            }
        }

        # By doing an updating, adding to dict. 
        thisMsgAttribute_perStock_dict.update(msgAttributes_dict)

        # NOTEME: Per aws sqs, max bumber of MessageAttributes per message is 10, making a message can have only 5 stocks 
        if stock_cnter % 5 == 0 or stock_cnter == len(inputDict): # Checking for 5 stocks OR anything left over grouping by 5

            entryVal_dict['Id'] = str(msg_cnter)
            entryVal_dict['MessageBody'] =  f'This is the message body for message ID no. {msg_cnter}.'
            entryVal_dict['DelaySeconds'] = delay_this_message
            entryVal_dict['MessageAttributes'] = thisMsgAttribute_perStock_dict

            # appending list
            msg_lst.append(entryVal_dict)

            # resetting dict.
            entryVal_dict = {}

            delay_this_message += 60 # delaying next message by 1 min.

            msg_cnter += 1 # incrementing message counter

            # resetting dict.
            thisMsgAttribute_perStock_dict = {}

        stock_cnter += 1 # Incrementiing stock loop counter

    # print (msg_lst)
    return msg_lst

Here is my inputDict,

{'rec1': 'KO', 'rec0': 'HLT', 'rec2': 'HD', 'rec4': 'AFL', 'rec5': 'STOR', 'rec3': 'WMT',...}

Sending a batch messages to SQS accordingly

def send_sqs_batch_message(entries):    
    # NOTEME: See for more info 
    # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html#sending-messages
    sqs_client = boto3.client("sqs", region_name='us-east-2')

    response = sqs_client.send_message_batch(
        QueueUrl= YOUR_QUEUE_URL_GOES_HERE,
        Entries = entries
    )

    # print(response)
    return response
Fop answered 1/1, 2022 at 17:9 Comment(0)
F
0

You can use FIFO Queues. It supports message ordering and per-queue delays. It's important to know about following caveats

  • The order in which messages are sent and received is strictly preserved and a message is delivered once and remains available until a consumer processes and deletes it.
  • FIFO queues support message groups that allow multiple ordered message groups within a single queue.
  • FIFO queues don't support per-message delays, only per-queue delays. If your application sets the same value of the DelaySeconds parameter on each message, you must modify your application to remove the per-message delay and set DelaySeconds on the entire queue instead.
  • FIFO queues support 300 TPS, per API method (SendMessage, ReceiveMessage, or DeleteMessage). You can extract a TPS of 3000 using batch APIs.

Now, if delay across messages is constant and you don't need to fill/drain the queue at the rate greater than 3000 TPS then FIFO queues can work.

Faustina answered 4/8, 2020 at 7:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.