How to run arbitrary / DDL SQL statements or stored procedures using AWS Glue
Asked Answered
K

7

11

Is it possible to execute arbitrary SQL commands like ALTER TABLE from AWS Glue python job? I know I can use it to read data from tables but is there a way to execute other database specific commands?

I need to ingest data into a target database and then run some ALTER commands right after.

Keyway answered 10/11, 2020 at 19:46 Comment(0)
K
16

So after doing extensive research and also opening a case with AWS support, they told me it is not possible from Python shell or Glue pyspark job at this moment. But I just tried something creative and it worked! The idea is to use py4j that sparks relies on already and utilize standard java sql package.

Two huge benefits of this approach:

  1. A huge benefit of this that you can define your database connection as Glue data connection and keep jdbc details and credentials in there without hardcoding them in the Glue code. My example below does that by calling glueContext.extract_jdbc_conf('your_glue_data_connection_name') to get jdbc url and credentials, defined in Glue.

  2. If you need to run SQL commands on a supported out of the box Glue database, you don't even need to use/pass jdbc driver for that database - just make sure you set up Glue connection for that database and add that connection to your Glue job - Glue will upload proper database driver jars.

Remember this code below is executed by a driver process and cannot be executed by Spark workers/executors.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

logger = glueContext.get_logger()

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))

print(conn.getMetaData().getDatabaseProductName())

# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();

conn.close()
Keyway answered 13/11, 2020 at 18:45 Comment(6)
This is great. It's exactly what we needed--to execute a stored procedure after inserting data with Glue. And it works exactly as described. Only two points from me. First a clarification that 'glue_database_connection_name' needs to be replaced with your actual connection name. And second to call a specific stored procedure (not a job) I had to prefix it with the database: DBName.dbo.my_stored_procedure.Chug
@JamesFrank thanks, glad you found it useful. I made a few changes in the code to make it more clearKeyway
@Keyway - thank you for this! What do you mean by "this code below is executed by a driver process and cannot be executed by Spark workers/executors"? Can you also explain what happens if the job fails at some point - will the stored procedure still get executed?Presbyter
@Keyway I'm in the same scenario. I used your code and ran the job. Is giving a connection error. I also harcoded the connection and no luck. I have other glue ETL jobs running against this DB so I know the DB is good. Do I need to run this code in a different way?Tse
@Keyway any idea why I may be getting a "An error occurred while calling z:java.sql.DriverManager.getConnection. The url cannot be null" from your code? my redshift url is spot on..Steadman
Thank you, @Keyway ! I'd been researching and troubleshooting this for about a week. This finally resolved it! I love how it allows us to leverage an existing Glue Connection rather than having to regenerate/reconnect with JDBC URL, etc. from scratch like other methods. THANK YOU!Actinomorphic
C
7

I finally got this working after a couple of hours so hopefully the following will be helpful. My script is heavily influenced by the earlier responses, thank you.

Prerequisites:

  • You will want the Glue connection configured and tested before attempting any scripts.
  • When setting up your AWS Glue job, use Spark, Glue version 2.0 or later, and Python version 3.
  • I recommend to configure this job for just 2 worker threads to save on cost; the bulk of the work is going to be done by the database, not by glue.
  • The following is tested with an AWS RDS PostgreSQL instance, but is hopefully flexible enough to work for other databases.
  • The script needs 3 parameters updated near the top of the script (glue_connection_name, database_name, and stored_proc).
  • The JOB_NAME, connection string, and credentials are retrieved by the script and do not need to be supplied.
  • If your stored proc will return a dataset then replace executeUpdate with executeQuery.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
    
glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
    
#Below this point no changes should be necessary.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']
    
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()
    
logger = glueContext.get_logger()
    
logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
    
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
    
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
    
stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);
    
logger.info("Finished")
Cleric answered 27/1, 2022 at 10:17 Comment(5)
Adding the database name was the key that makes this answer to be the best one. It works perfectly for me. Thank you.Fogbound
You can also use 'fullurl' instead of 'url'Literalism
What is the value of your rs variable (the result of your stored proc call) after this runs?Vlf
@JacobBayer I was using this for calling stored procedures that did not return a recordset. If the job failed, then there was no RS to check, and if it succeeded, I don't recall there being an ability to "select" something from within the stored proc and have that be usable in conjunction with this approach. I recommend using the AWS Glue tools for moving data rather than this annoying workaround for AWS being unable to call standalone procs.Cleric
Is there a way to get this to work with an Oracle database? I have tried this and have also tried using cx_Oracle to try and run statements against the database. I cannot get it to connect to the database unfortunately and run any statements.Hardecanute
P
1

i modified the code shared by mishkin but it did not work for me. So after troubleshooting a bit i realized for me the connetion from the catalog do not work. so I had to modify it manually and tweak code a little bit. Now the its working but thorwoiung exception in the end as its not able to convert the java results to python result. I did a work around so use with caution.

below is my code. 


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#source_jdbc_conf = glueContext.extract_jdbc_conf('redshift_publicschema')

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

print('Trying to connect to DB')
conn = sc._gateway.jvm.DriverManager.getConnection('jdbc:redshift://redshift-cluster-2-url:4000/databasename', 'myusername', 'mypassword')

print('Trying to connect to DB success!')

print(conn.getMetaData().getDatabaseProductName())

# call stored procedure, in this case I call sp_start_job
stmt = conn.createStatement();
#cstmt = conn.prepareCall("call dbname.schemaname.my_storedproc();");
print('Call to proc trying ')

#cstmt.setString("job_name", "testjob");

try:
  rs = stmt.executeQuery('call mySchemaName.my_storedproc()');
except:
  print("An exception occurred but proc has run")
  
#results = cstmt.execute();`enter code here`
conn.close()
Paulettepauley answered 16/6, 2021 at 22:13 Comment(0)
P
1

If you attach a connection object to the glue job, you can easily get connection settings:

glue_client = boto3.client('glue')
getjob=glue_client.get_job(JobName=args["JOB_NAME"])
connection_settings = glue_client.get_connection(Name=getjob['Job']['Connections']['Connections'][0])
conn_name = connection_settings['Connection']['Name']
df = glueContext.extract_jdbc_conf(conn_name)
Peacoat answered 24/2, 2022 at 14:24 Comment(0)
I
0

It depends. If you are using redshift as a target, you have the option of specifying pre and post actions as part of connection options. You would be able to specify alter actions over there. However for the rest of the target types you might need to use some python module like pg8000( in cases of Postgres) and others

Iloilo answered 10/11, 2020 at 22:58 Comment(3)
unfortunately, my target is RDS SQL Server and it does not support pre- and post- actions. I also tried boto3 rds-service (but it only supports Aurora), and I cannot use pyodbc since Glue does not support non-pure packages..Keyway
There are others out there for odbc that are pure pythonIloilo
there are only a few and neglected pure python odbc packages but they require you to install actual odbc drivers on linux machine that you cannot really do with GlueKeyway
L
0

Thanks mishkin for sharing the script. I got below error when I followed the script for Redshift

An error occurred while calling z:java.sql.DriverManager.getConnection. [Amazon]JDSI Required setting ConnSchema is not present in connection settings

It looks like source_jdbc_conf.get('url') is not passing database name in the JDBC url, so I ended up appending database name to the JDBC url.

Lusaka answered 5/10, 2022 at 22:13 Comment(0)
E
0

yes the original post also worked for me but you do need to include database name when you call the stored proc and also you do need to hard code the url on the connection as that was erroring out for me...

Eleni answered 20/7, 2023 at 15:24 Comment(1)
Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.Good

© 2022 - 2024 — McMap. All rights reserved.