AWS glueContext read doesn't allow a sql query
Asked Answered
S

4

5

I want to read filtered data from a Mysql instance using AWS glue job. Since a glue jdbc connection doesnt allow me to push down predicate, I am trying to explicitly create a jdbc connection in my code.

I want to run a select query with where clause against a Mysql database using jdbc connection as shown below

import com.amazonaws.services.glue.GlueContext
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession


object TryMe {

  def main(args: Array[String]): Unit = {
    val sc: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(sc)
    val spark: SparkSession = glueContext.getSparkSession

    // Read data into a DynamicFrame using the Data Catalog metadata
    val t = glueContext.read.format("jdbc").option("url","jdbc:mysql://serverIP:port/database").option("user","username").option("password","password").option("dbtable","select * from table1 where 1=1").option("driver","com.mysql.jdbc.Driver").load()

  }
}

It fails with error

com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'select * from table1 where 1=1 WHERE 1=0' at line 1

Shouldn't this work? How do I retrieve filtered data using JDBC connection without reading the whole table into a data frame?

Sidestroke answered 8/1, 2019 at 14:55 Comment(0)
A
9

I think the problem occured because you didn't use the query in parentheses and provide an alias. In my opinion it should look like in the following example:

 val t = glueContext.read.format("jdbc").option("url","jdbc:mysql://serverIP:port/database").option("user","username").option("password","password").option("dbtable","(select * from table1 where 1=1) as t1").option("driver","com.mysql.jdbc.Driver").load()

More information about parameters in SQL data sources:

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

When it comes to the Glue and the framework which the Glue provides, there is also the option "push_down_predicate", but I have only used this option on the data sources based on S3. I think it doesn't work on other sources than on S3 and non-partitioned data.

https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html

Agapanthus answered 15/1, 2019 at 6:57 Comment(0)
E
2

For anyone who is still searching for further answers/examples, I can confirm that the push_down_predicate option works with ODBC data sources. Here's how I read from SQL Server (in Python).

df = glueContext.read.format("jdbc")
    .option("url","jdbc:sqlserver://server-ip:port;databaseName=db;")
    .option("user","username")
    .option("password","password")
    .option("dbtable","(select t1.*, t2.name from dbo.table1 t1 join dbo.table2 t2 on t1.id = t2.id) as users")
    .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
    .load()

This also works but NOT as I expected. The predicate is not pushed down to the data source.

df = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "db_dbo_table1", push_down_predicate = "(id >= 2850700 AND statusCode = 'ACT')")

The documentation on pushDownPredicate states: The option to enable or disable predicate push-down into the JDBC data source. The default value is true, in which case Spark will push down filters to the JDBC data source as much as possible.

Errol answered 20/5, 2020 at 4:37 Comment(0)
G
1

this are 5 different code snippets that i tried for performance comparison, only 2 actually filtered data on the server level when using profiler, it seems at the moment without creating a custom connector or buying from marketplace the only way to get this to work is using glueContext.read

You can convert DynamicFrames to and from DataFrames (See example)

rds_datasink_temp = DynamicFrame.fromDF(rds_dataframe, glueContext, "nested")

you should also check this while running Sql Server Profiler with all the events from: OLEDB, Stored Procedure, TSQL and Transactions

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.context import DynamicFrame

# list parameter with 2 leading hyphens --param_server_url 
args = getResolvedOptions(sys.argv,['JOB_NAME'])
print("JOB_NAME: ", args['JOB_NAME'])

job_server_url="SERVER URL"
job_db_name="DB NAME"
job_db_user="DB USER"
job_db_password="DB PASSWORD"
job_table_name="TABLE NAME"

job_glue_db_name="GLUE DATA CATALOG DATABASE NAME"
job_glue_conn_name="GLUE DATA CATALOG CONNECTION NAME"
job_glue_table_name="GLUE DATA CATALOG TABLE NAME"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
region = "us-east-1"

#### aws glue data catalog table info (from ) ####
# Name  job_glue_table_name
# Database  job_glue_db_name
# Classification    sqlserver
# Location  job_db_name.dbo.job_table_name
# Connection    job_glue_conn_name

#### GlueContext Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html

#### DynamicFrame Class ####
# https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

#### Connection Api ####
# https://docs.aws.amazon.com/glue/latest/webapi/API_Connection.html

#### Using connectors and connections with AWS Glue Studio ####
# Link : https://docs.aws.amazon.com/glue/latest/ug/connectors-chapter.html
# Use AWS Secrets Manager for storing credentials
# Filtering the source data with row predicates and column projections 

#### Connection options for type custom.jdbc or marketplace.jdbc ####
# Link : https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html#aws-glue-programming-etl-connect-jdbc
# className – String, required, driver class name.
# connectionName – String, required, name of the connection that is associated with the connector.
# secretId or user/password – String, required, used to retrieve credentials for the URL.
# dbTable or query – String, required, the table or SQL query to get the data from. You can specify either dbTable or query, but not both.
# filterPredicate – String, optional, extra condition clause to filter data from source. For example:

# using \ for new line with more commands
# query="recordid<=5", -- filtering !
print("0001 - df_read_query")
df_read_query = glueContext.read \
    .format("jdbc") \
    .option("url","jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";") \
    .option("query","select recordid from "+job_table_name+" where recordid <= 5") \
    .option("user",job_db_user) \
    .option("password",job_db_password) \
    .load()
print("df_read_query count: ", df_read_query.count())
df_read_query.show(10)
df_read_query.printSchema()

# query="recordid<=5", -- not filtering
print("0002 - df_from_catalog_query")
df_from_catalog_query = glueContext.create_dynamic_frame.from_catalog(
    database = job_glue_db_name, 
    table_name = job_glue_table_name, 
    additional_options={
        "query":"select recordid from "+job_table_name+" where recordid <= 5;",
    },
    transformation_ctx = "df_from_catalog_query", 
)
print("df_from_catalog_query count: ", df_from_catalog_query.count())
df_from_catalog_query.show(10)

# push_down_predicate="recordid<=5", -- not filtering
print("0003 - df_from_catalog_push_down_predicate")
df_from_catalog_push_down_predicate = glueContext.create_dynamic_frame.from_catalog(
    database = job_glue_db_name, 
    table_name = job_db_name+'_dbo_'+job_table_name, 
    push_down_predicate = "recordid<=5",
    transformation_ctx = "df_from_catalog_push_down_predicate",
)
print("df_from_catalog_push_down_predicate count: ", df_from_catalog_push_down_predicate.count())
df_from_catalog_push_down_predicate.show(10)

# filterPredicate="recordid<=5", -- not filtering
print("0004 - df_from_options_sqlserver")
df_from_options_sqlserver = glueContext.create_dynamic_frame.from_options(
    connection_type = "sqlserver", 
    connection_options = {
        "url":"jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";",
        "username":job_db_user,
        "password":job_db_password,
        "location":job_db_name+".dbo."+job_table_name,
        "filterPredicate":"recordid<=5",
    }, 
    transformation_ctx = "df_from_options_sqlserver",
)
print("df_from_options_sqlserver count: ", df_from_options_sqlserver.count())
df_from_options_sqlserver.show(10)

# dbtable="recordid<=5", -- filtering !
print("0005 - df_read_dbtable")
df_read_dbtable = glueContext.read \
    .format("jdbc") \
    .option("url","jdbc:sqlserver://"+job_server_url+":1433;databaseName="+job_db_name+";") \
    .option("user",job_db_user) \
    .option("password",job_db_password) \
    .option("dbtable","(select recordid from "+job_table_name+" where recordid<=5) as t1") \
    .option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()
print("df_read_dbtable count: ", df_read_dbtable.count())
df_read_dbtable.show(10)

job.commit()
Goodrow answered 12/4, 2022 at 13:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.