I had an issue somewhat similar but in your case, if you delay the messsge getting to the queue, then the you don't have to worry about the delaying in consuming the message (in your case by the lambda
).
As @Ryan mentioned,
when you sent the Delivery delay
in the console (let's say to 5 secs.) it just delays the entire queue and not individual messages in the queue.
Here is a good read to understand Delivery delay
.
But the trick is not to delay the queue but to delay individual message (aka in aws
term it is Message Queue
.
Here is what I did,
I first batch my messages (please read the docs, as of now, you can only batch deliver 10 messages.) and then set the delay for each message and then send the them as a batch.
Setting up to send batch messages
def setting_to_send_batch_messages(inputDict):
"""This function sets up a dict. into a batch message(up to 10) so that it can be sent at once (i.e. as a batch)
Args:
inputDict ([dict]): [dict. that needs to be batched]
Returns:
[lst]: [list of dicts of messages]
"""
stock_cnter = 1 # Iterating stock counter
msg_cnter = 1 # Counter to keep track of number of messages
entryVal_dict = {} # dict. to hold values for each message in the batch
thisMsgAttribute_perStock_dict = {} # dict. to hold Message Attributes per stock
msg_lst = [] # List to hold all dicts (i.e. stock info) per message
# In the batch, per message delay
delay_this_message = 0
# NOTEME: By setting it to 0, means the very first message there is no delay (i.e. sent immediately to the queue) a delay (in seconds) is added to subsequent messages
# looping over dict.
for key,val in inputDict.items():
# dict. holding to message attributes
msgAttributes_dict = {
'fieldID' + str(stock_cnter): {
'StringValue': key,
'DataType': 'String'
},
'ticker' + str(stock_cnter): {
'StringValue': val,
'DataType': 'String'
}
}
# By doing an updating, adding to dict.
thisMsgAttribute_perStock_dict.update(msgAttributes_dict)
# NOTEME: Per aws sqs, max bumber of MessageAttributes per message is 10, making a message can have only 5 stocks
if stock_cnter % 5 == 0 or stock_cnter == len(inputDict): # Checking for 5 stocks OR anything left over grouping by 5
entryVal_dict['Id'] = str(msg_cnter)
entryVal_dict['MessageBody'] = f'This is the message body for message ID no. {msg_cnter}.'
entryVal_dict['DelaySeconds'] = delay_this_message
entryVal_dict['MessageAttributes'] = thisMsgAttribute_perStock_dict
# appending list
msg_lst.append(entryVal_dict)
# resetting dict.
entryVal_dict = {}
delay_this_message += 60 # delaying next message by 1 min.
msg_cnter += 1 # incrementing message counter
# resetting dict.
thisMsgAttribute_perStock_dict = {}
stock_cnter += 1 # Incrementiing stock loop counter
# print (msg_lst)
return msg_lst
Here is my inputDict
,
{'rec1': 'KO', 'rec0': 'HLT', 'rec2': 'HD', 'rec4': 'AFL', 'rec5': 'STOR', 'rec3': 'WMT',...}
Sending a batch messages to SQS accordingly
def send_sqs_batch_message(entries):
# NOTEME: See for more info
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/sqs.html#sending-messages
sqs_client = boto3.client("sqs", region_name='us-east-2')
response = sqs_client.send_message_batch(
QueueUrl= YOUR_QUEUE_URL_GOES_HERE,
Entries = entries
)
# print(response)
return response