How do I fail a specific SQS message in a batch from a Lambda?
Asked Answered
J

6

35

I have a Lambda with an SQS trigger. When it gets hit, a batch of records from SQS comes in (usually about 10 at a time, I think). If I return a failed status code from the handler, all 10 messages will be retried. If I return a success code, they'll all be removed from the queue. What if 1 out of those 10 messages failed and I want to retry just that one?

exports.handler = async (event) => {

    for(const e of event.Records){
        try {
            let body = JSON.parse(e.body);
            // do things
        }
        catch(e){
            // one message failed, i want it to be retried
        }        
    }

    // returning this causes ALL messages in 
    // this batch to be removed from the queue
    return {
        statusCode: 200,
        body: 'Finished.'
    };
};

Do I have to manually re-add that ones message back to the queue? Or can I return a status from my handler that indicates that one message failed and should be retried?

Julide answered 3/4, 2019 at 14:42 Comment(1)
it's sad that there is still no simple way to handle such a case.Alleviator
O
8

Yes you have to manually re-add the failed messages back to the queue.

What I suggest doing is setting up a fail count, so that if all messages failed you can simply return a failed status for all messages, otherwise if the fail count is < 10 then you can individually send back the failed messages to the queue.

Outhouse answered 3/4, 2019 at 14:44 Comment(4)
Thanks. I found this article which I've adapted to suit my use case.Julide
what if we do the other way around: remove from the queue the ones that succeeded and fail everything (this intend to make the ones that are in the batch, but were not removed from the queue to rerun). would that work?Todtoday
Yes, you could do it this way also, using delete message call. However, it's better to do it the other way since in most cases you won't have any failures, so it's more optimal to only manually send back the failed ones, otherwise you're always making I/O calls to deleteMessage for the successful ones, which will be costly and inefficientOuthouse
Bit late to the party, but you could only do the individual deletes in the case where the failed count does not equal the message count and it is > 0. That way when everything is fine you can avoid the additional I/O.Middleman
G
19

As per AWS documentation, SQS event source mapping now supports handling of partial failures out of the box. Gist of the linked article is as follows:

  1. Include ReportBatchItemFailures in your EventSourceMapping configuration
  2. The response syntax in case of failures has to be modified to have:
{
  "batchItemFailures": [
    { "itemIdentifier": "id2" },
    { "itemIdentifier": "id4" }
  ]
}

Where id2 and id4 the failed messageIds in a batch.

  1. Quoting the documentation as is:

Lambda treats a batch as a complete success if your function returns any of the following

  • An empty batchItemFailure list
  • A null batchItemFailure list
  • An empty EventResponse
  • A null EventResponse

Lambda treats a batch as a complete failure if your function returns any of the following:

  • An invalid JSON response
  • An empty string itemIdentifier
  • A null itemIdentifier
  • An itemIdentifier with a bad key name
  • An itemIdentifier value with a message ID that doesn't exist
Gardel answered 15/1, 2022 at 21:50 Comment(1)
Step #1 is important. Be sure to add "FunctionResponseTypes": ["ReportBatchItemFailures"] to your Lambda's EventSourceMapping resource in the cloud formation template.Pachydermatous
O
8

Yes you have to manually re-add the failed messages back to the queue.

What I suggest doing is setting up a fail count, so that if all messages failed you can simply return a failed status for all messages, otherwise if the fail count is < 10 then you can individually send back the failed messages to the queue.

Outhouse answered 3/4, 2019 at 14:44 Comment(4)
Thanks. I found this article which I've adapted to suit my use case.Julide
what if we do the other way around: remove from the queue the ones that succeeded and fail everything (this intend to make the ones that are in the batch, but were not removed from the queue to rerun). would that work?Todtoday
Yes, you could do it this way also, using delete message call. However, it's better to do it the other way since in most cases you won't have any failures, so it's more optimal to only manually send back the failed ones, otherwise you're always making I/O calls to deleteMessage for the successful ones, which will be costly and inefficientOuthouse
Bit late to the party, but you could only do the individual deletes in the case where the failed count does not equal the message count and it is > 0. That way when everything is fine you can avoid the additional I/O.Middleman
A
2

You've to programmatically delete each message from after processing it successfully.

So you can have a flag set to true if anyone of the messages failed and depending upon it you can raise error after processing all the messages in a batch so successful messages will be deleted and other messages will be reprocessed based on retry policies.

So as per the below logic only failed and unprocessed messages will get retried.

import boto3

sqs = boto3.client("sqs")

def handler(event, context):
    for message in event['records']:
        queue_url = "form queue url recommended to set it as env variable"
        message_body = message["body"]
        print("do some processing :)")
        message_receipt_handle = message["receiptHandle"]
        sqs.delete_message(
            QueueUrl=queue_url,
            ReceiptHandle=message_receipt_handle
        )

there is also another way to save successfully processed message id into a variable and perform batch delete operation based on message id

response = client.delete_message_batch(
    QueueUrl='string',
    Entries=[
        {
            'Id': 'string',
            'ReceiptHandle': 'string'
        },
    ]
)
Andreasandree answered 23/7, 2020 at 1:0 Comment(3)
If you raise an error after all messages are processed it will still retry all messages (no matter which one failed). Any error during lambda execution will cause a retry of every message in the batch.Alleviator
The point is to delete successfully processed messaged programmatically so if you raise exception only the unprocessed and failed messages will be retriedAndreasandree
Sorry I think I misunderstood your answer but this seems to be a quite decent solution to me. If that really works it would be really helpful if you add an example. I actually think that's the best "simple" solution since you don't have to alter the message body of the original message and you don't need to include retry logic in your lambda function, but simply use the built-in retry mechanism of AWS.Alleviator
I
1

As per the AWS Documentation!, you can now implement partial batch responses.

Here is a code sample using Java. Please note, as per the documentation, you have to to activate ReportBatchItemFailures for your Lambda function first:

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
 
import java.util.ArrayList;
import java.util.List;
 
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
 
         List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
         String messageId = "";
         for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
             try {
                 //process your message
                 messageId = message.getMessageId();
             } catch (Exception e) {
                 //Add failed message identifier to the batchItemFailures list
                 batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
             }
         }
         return new SQSBatchResponse(batchItemFailures);
     }
}
Invite answered 14/6, 2023 at 10:30 Comment(0)
S
0

You need to design your app iin diffrent way here is few ideas not best but will solve your problem.

Solution 1:

Note :When an SQS event source mapping is initially created and enabled, or first appear after a period with no traffic, then the Lambda service will begin polling the SQS queue using five parallel long-polling connections, as per AWS documentation, the default duration for a long poll from AWS Lambda to SQS is 20 seconds.

Solution 2:

Use AWS StepFunction

StepFunction will call lambda and handle the retry logic on failure with configurable exponential back-off if needed.

**Solution 3: **

CloudWatch scheduled event to trigger a Lambda function that polls for FAILED.

Error handling for a given event source depends on how Lambda is invoked. Amazon CloudWatch Events invokes your Lambda function asynchronously.

Sawdust answered 3/4, 2019 at 15:6 Comment(0)
C
0

AWS supports partial batch response. Here is example for Typescript code

type Result = {
  itemIdentifier: string
  status: 'failed' | 'success'
}

const isFulfilled = <T>(
  result: PromiseFulfilledResult<T> | PromiseRejectedResult
): result is PromiseFulfilledResult<T> => result.status === 'fulfilled'

const isFailed = (
  result: PromiseFulfilledResult<Result>
): result is PromiseFulfilledResult<
  Omit<Result, 'status'> & { status: 'failed' }
> => result.value.status === 'failed'

const results = await Promise.allSettled(
 sqsEvent.Records.map(async (record) => {
   try {
     return { status: 'success', itemIdentifier: record.messageId }
   } catch(e) {
     console.error(e);
     return { status: 'failed', itemIdentifier: record.messageId }
   }
  })
)

return results
    .filter(isFulfilled)
    .filter(isFailed)
    .map((result) => ({
      itemIdentifier: result.value.itemIdentifier,
    }))

Constancy answered 9/4, 2022 at 7:29 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.