How to make the copy command continue its run in redshift even after the lambda function which initiated it has timed out?
Asked Answered
P

2

5

I am trying to run a copy command which loads around 100 GB of data from S3 to redshift. I am using the lambda function to initiate this copy command every day. This is my current code

from datetime import datetime, timedelta
import dateutil.tz
import psycopg2
from config import *

def lambda_handler(event, context):
    con = psycopg2.connect(dbname=dbname, user=user, password=password, host=host, port=port)
    cur = con.cursor()
    
    try:
        query = """BEGIN TRANSACTION;

                COPY """ + table_name + """ FROM '""" + intermediate_path + """' iam_role '""" + iam_role + """' FORMAT AS parquet;

                END TRANSACTION;"""

        print(query)
        cur.execute(query)
    
    except Exception as e:
        subject = "Error emr copy: {}".format(str(datetime.now().date()))
        body = "Exception occured " + str(e)
        print(body)
    
    con.close()

This function is running fine but the only problem is, after the 15 min timeout of the lambda function, the copy command also stops executing in reshift. Therefore, I cannot finish my copy loading from s3 to redshift.

I also tried to include the statement_timeout statement below after the begin statement and before the copy command. It didn't help.

SET statement_timeout to 18000000;

Can someone suggest how do I solve this issue?

Photophilous answered 27/11, 2020 at 14:3 Comment(2)
A good solution depends very much on the data you are loading from S3. Are there multiple files, how are the files structured (one line per table row?!?) etc. Can you please add some more detail.Rasputin
@Sharad Sharma Please check my answer with the codes below. Accepting answer is helpful for others.Gigantic
K
9

The AWS documentation isn't explicit about what happens when timeout occurs. But I think it's safe to say that it transitions into the "Shutdown" phase, at which point the runtime container is forcibly terminated by the environment.

What this means is that the socket connection used by the database connection will be closed, and the Redshift process that is listening to that socket will receive an end-of-file -- a client disconnect. The normal behavior of any database in this situation is to terminate any outstanding queries and rollback their transactions.

The reason that I gave that description is to let you know that you can't extend the life of a query beyond the life of the Lambda that initiates that query. If you want to stick with using a database connection library, you will need to use a service that doesn't timeout: AWS Batch or ECS are two options.

But, there's a better option: the Redshift Data API, which is supported by Boto3.

This API operates asynchronously: you submit a query to Redshift, and get a token that can be used to check the query's operation. You can also instruct Redshift to send a message to AWS Eventbridge when the query completes/fails (so you can create another Lambda to take appropriate action).

Keel answered 27/11, 2020 at 14:56 Comment(0)
G
2

I recommend using Redshift Data API in lambda to load data into Redshift from S3. You can get rid of psycopgs2 package and use built-in boto3 package in lambda.

This will run copy query asynchronously and lambda function won't take more than a few seconds to run it.

I use sentry_sdk to get notifications of runtime error from lambda.

import boto3
import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

sentry_sdk.init(
    "https://[email protected]/aaaaaa",
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=0
)


def execute_redshift_query(sql):
    data_client = boto3.client('redshift-data')
    data_client.execute_statement(
        ClusterIdentifier='redshift-cluster-test',
        Database='db',
        DbUser='db_user',
        Sql=sql,
        StatementName='Test query',
        WithEvent=True,
    )


def handler(event, context):
    query = """
    copy schema.test_table
    from 's3://test-bucket/test.csv'
    IAM_ROLE 'arn:aws:iam::1234567890:role/TestRole'
    region 'us-east-1'
    ignoreheader 1 csv delimiter ','
    """
    execute_redshift_query(query)
    return True

And another lambda function to send error notification if copy query fails. You can add EventBridge lambda trigger using the rule in screenshot below. enter image description here

Here is lambda code to send error notification.

import sentry_sdk
from sentry_sdk.integrations.aws_lambda import AwsLambdaIntegration

sentry_sdk.init(
    "https://[email protected]/aaaaa",
    integrations=[AwsLambdaIntegration(timeout_warning=True)],
    traces_sample_rate=0
)


def lambda_handler(event, context):
    try:
        if event["detail"]["state"] != "FINISHED":
            raise ValueError(str(event))
    except Exception as e:
        sentry_sdk.capture_exception(e)
    return True

You can identify which copy query failed by using StatementName defined in the first lambda function.

Hope it is helpful.

Gigantic answered 22/10, 2021 at 18:30 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.