I'm using AWS Glue to move multiple files to an RDS instance from S3. Each day I get a new file into S3 which may contain new data, but can also contain a record I have already saved with some updates values. If I run the job multiple times I will of course get duplicate records in the database. Instead of multiple records being inserted I want Glue to try and update that record if it notices a field has changed, each record has a unique id. Is this possible?
I followed the similar approach which is suggested as 2nd option by Yuriy. Get existing data as well as new data and then do some processing to merge to of them and write with ovewrite mode. Following code would help you to get an idea about how to solve this problem.
sc = SparkContext()
glueContext = GlueContext(sc)
#get your source data
src_data = create_dynamic_frame.from_catalog(database = src_db, table_name = src_tbl)
src_df = src_data.toDF()
#get your destination data
dst_data = create_dynamic_frame.from_catalog(database = dst_db, table_name = dst_tbl)
dst_df = dst_data.toDF()
#Now merge two data frames to remove duplicates
merged_df = dst_df.union(src_df)
#Finally save data to destination with OVERWRITE mode
merged_df.write.format('jdbc').options( url = dest_jdbc_url,
user = dest_user_name,
password = dest_password,
dbtable = dest_tbl ).mode("overwrite").save()
Unfortunately there is no elegant way to do it with Glue. If you would write to Redshift you could use postactions
to implement Redshift merge operation. However, it's not possible for other jdbc sinks (afaik).
Alternatively in your ETL script you can load existing data from a database to filter out existing records before saving. However if your DB table is big then the job may take a while to process it.
Another approach is to write into a staging table with mode 'overwrite' first (replace existing staging data) and then make a call to a DB via API to copy new records only into a final table.
I have used INSERT into table .... ON DUPLICATE KEY.. for UPSERTs into the Aurora RDS running mysql engine. Maybe this would be a reference for your use case. We cannot use a JDBC since we have only APPEND, OVERWRITE, ERROR modes currently supported.
I am not sure of the RDS database engine you are using, and following is an example for mysql UPSERTS.
Please see this reference, where i have posted a solution using INSERT INTO TABLE..ON DUPLICATE KEY for mysql :
Error while using INSERT INTO table ON DUPLICATE KEY, using a for loop array
May very well be a bit slower, but Tharsan's solution was raising other issue with the write operation due to MySQL constraints. I decided to filter out the data before writing it back to the Data Catalog, which in turn updated the underlying data store, which was MySQL in my case:
source_years = glueContext.create_dynamic_frame.from_catalog(
database=database,
table_name="source_years",
transformation_ctx="source_years",
)
source_years = ApplyMapping.apply(
frame=source_years,
mappings=[
("YearID", "int", "year_id", "int"),
],
transformation_ctx="source_years_transform",
)
target_years = glueContext.create_dynamic_frame.from_catalog(
database=database,
table_name="target_years",
transformation_ctx="target_years",
)
target_years_list = target_years.toDF().select('year_id').rdd.map(lambda x : x[0]).collect()
source_years = source_years.filter(
f=lambda x: x['year_id'] not in target_years_list
)
glueContext.write_dynamic_frame.from_catalog(
frame=source_years,
database=database,
table_name="target_years",
transformation_ctx="target_years",
)
Update: The issue with the above DynamicFrame filter method is if a field with a null value exists in the data set when using the DynamicFrame filter method, then it will error out about a "void type". I consider this a BUG in the DynamicFrame API. To resolve this, I converted the DynamicFrame into a Spark DataFrame and then used Spark SQL to perform a query to query out any duplicates where id is the same between the two data sets and then I converted it back to a DynamicFrame. The DynamicFrame API is buggy. I read the Glue documentation inside and out with a couple of read throughs. And there is no other way to do it really than converting it to a Spark DataFrame first. Here is my updated filtering solution:
target_year_ids = target_years.toDF().select('id').rdd.map(lambda x : x[0]).collect()
years_frame = source_years.toDF()
years_frame_filtered = years_frame.filter(~years_frame["id"].isin(target_year_ids))
years_frame_filtered.show()
years_filtered_dyf = DynamicFrame.fromDF(years_frame_filtered, glueContext, "years_filtered_dyf")
print('resulting size')
print(years_filtered_dyf.count())
glueContext.write_dynamic_frame.from_catalog(
frame=years_filtered_dyf,
database=database,
table_name="target_years",
transformation_ctx="target_years",
)
Note I used Execution Class Standard and worker type G.2X, and with 9 million records to ETL, it took 4 hours. There may be even a quick way to do it with Spark SQL. I am reading through that documentation now. I'll update if I find something faster.
© 2022 - 2025 — McMap. All rights reserved.
glueContext.write_from_options(frame_or_dfc=df_to_write, connection_type="mysql", connection_options=connection_options_rds) connection_options_rds = { "url": url, "dbtable": dbtable", "user": uname, "password": pwd, "customJdbcDriverS3Path": jar path", "customJdbcDriverClassName": "com.mysql.cj.jdbc", "overwrite": "ignore" } # or "overwrite" or "error" this overwtite: ignore is faling when duplicates flow
– Tallinn