AWS Glue - Don't know how to save NullType to REDSHIFT
Asked Answered
M

2

7

I have the below simple script for AWS Glue. I have a text file with empty cells and a table which accepts NULL values. When I run the glue job it fails with the exception, "Don't know how to save NullType to REDSHIFT".

How do I work with this or are NULL inserts not supported with RedShift via Glue?

Job script:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "poc_edw", table_name = "delta_orderheader", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("partitionnumber", "int", "partitionnumber", "int"), ("messagekey", "long", "messagekey", "long"), ("applicationversion", "string", "applicationversion", "string"), ("businessdate", "date", "businessdate", "date"), ("change", "decimal(10,2)", "change", "decimal(10,2)"), ("employeeid", "int", "employeeid", "int"), ("employeename", "string", "employeename", "string"), ("employeeuserid", "string", "employeeuserid", "string"), ("meallocation", "int", "meallocation", "int"), ("messageid", "string", "messageid", "string"), ("ordernumber", "int", "ordernumber", "int"), ("ordersourcetypekey", "short", "ordersourcetypekey", "short"), ("posid", "int", "posid", "int"), ("satellitenumber", "int", "satellitenumber", "int"), ("spmhostordercode", "string", "spmhostordercode", "string"), ("storenumber", "int", "storenumber", "int"), ("taxamount", "decimal(10,2)", "taxamount", "decimal(10,2)"), ("taxexempt", "int", "taxexempt", "int"), ("taxinclusiveamount", "decimal(10,2)", "taxinclusiveamount", "decimal(10,2)"), ("terminalnumber", "string", "terminalnumber", "string"), ("transactiondate", "timestamp", "transactiondate", "timestamp"), ("transactionid", "int", "transactionid", "int"), ("version", "decimal(10,2)", "version", "decimal(10,2)"), ("woddescription", "string", "woddescription", "string"), ("wodpromotionid", "int", "wodpromotionid", "int"), ("wodtype", "short", "wodtype", "short"), ("wodvalue", "decimal(10,2)", "wodvalue", "decimal(10,2)"), ("sqlinsertedprocessid", "int", "sqlinsertedprocessid", "int"), ("insertedprocessid", "int", "insertedprocessid", "int"), ("lastupdatedprocessid", "int", "lastupdatedprocessid", "int"), ("createddatetime", "timestamp", "createddatetime", "timestamp"), ("lastupdateddatetime", "timestamp", "lastupdateddatetime", "timestamp"), ("applyprocessid", "int", "applyprocessid", "int"), ("applydatetime", "timestamp", "applydatetime", "timestamp"), ("ordernetamount", "decimal(10,2)", "ordernetamount", "decimal(10,2)"), ("loyaltysubcardid", "string", "loyaltysubcardid", "string"), ("loyaltymemberid", "string", "loyaltymemberid", "string"), ("basepointegersearned", "int", "basepointegersearned", "int"), ("bonuspointegersearned", "int", "bonuspointegersearned", "int"), ("loyaltynetsales", "decimal(10,2)", "loyaltynetsales", "decimal(10,2)"), ("rewardsredeemedamount", "decimal(10,2)", "rewardsredeemedamount", "decimal(10,2)"), ("rewardsabandonedamount", "decimal(10,2)", "rewardsabandonedamount", "decimal(10,2)"), ("loyaltymemberlookuptypekey", "short", "loyaltymemberlookuptypekey", "short"), ("remoteorderid", "string", "remoteorderid", "string")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["applydatetime", "messagekey", "businessdate", "transactiondate", "sqlinsertedprocessid", "ordernetamount", "applicationversion", "messageid", "storenumber", "satellitenumber", "loyaltynetsales", "spmhostordercode", "bonuspointegersearned", "employeeid", "transactionid", "loyaltysubcardid", "employeeuserid", "taxinclusiveamount", "meallocation", "ordernumber", "loyaltymemberlookuptypekey", "applyprocessid", "ordersourcetypekey", "basepointegersearned", "partitionnumber", "insertedprocessid", "wodtype", "loyaltymemberid", "rewardsredeemedamount", "change", "rewardsabandonedamount", "version", "taxexempt", "remoteorderid", "wodpromotionid", "posid", "woddescription", "wodvalue", "lastupdatedprocessid", "taxamount", "terminalnumber", "lastupdateddatetime", "createddatetime", "employeename"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")
## @type: DataSink
## @args: [database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = TempDir, transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "poc_edw", table_name = "derik_edw_derik_stageorderheader", redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()

Reference: AWS Forum Link

UPDATED:

I have made some progress. I thought that the problem was NULL characters (0x00) but that turned out to not be the case. I remade my files without any NULL characters and I had the same issue.

I added this line of code.

df = DropNullFields.apply(frame = resolvechoice4, transformation_ctx = "df")

I do not fully understand why but the best I can gather is that the DynamicFrame inferred some NullType fields which did not exist. After adding this line of code, I had rows inserted but it appears that none of my string fields were included. Only about 1/2 of my fields have values.

Mizuki answered 28/11, 2017 at 0:24 Comment(3)
I cannot seem to figure out how to do an inline replacement of values. For example, the below link references the na.fill() and fillna() functions of the DataFrame class. Glue used a DynamicFrame which is an abstraction of DataFrame which apparently does not implement .fillna() or its aliases. The DropNullFields() function of the DynamicFrame class appears to drop the entire field if it has a NULL value, rather than just omit the NULL character within the field. spark.apache.org/docs/2.1.0/api/python/pyspark.sql.htmlMizuki
We have abandoned using AWS Glue for the time being.Mizuki
I am using AWS Glue to manage my ETL load. What I am doing is that: Every time I need to do some transformation, add column or do some calculation I transform the dynamic frame to a spark dataframe using: dataframe.toDF()Breastwork
I
6

From my experience

Glue has very strange algorithm for columns and types (at least for 02-13-2018). It reads column names and types from data catalog (yes it does) and then try to figure out types again. (Gods tell me why???). And when it deals with empty values it "figures out" null type.

This is especially painful when you try to "cast" value from csv to some numeric type. Also at some times glue just removes columns without values (for example if you add column mapping from type A to type B, but glue recognize column type as C - you will get null column).

All columns with null types leads to nice IllegalArgumentException when you try to save data in orc format:

java.lang.IllegalArgumentException: Error: type expected at the position x of 'int:string:nullstring:int' but 'null' is found.

How to solve

  1. As you mention you have to call DropNullFields
  2. But if you have to use this column in sql statement you will get error that this column can not be found. So you have to "add" again just removed column with right type (code in scala):

    //your glue dynamic frame with
    val glueDynamicFrame: DynamicFrame = ???
    //get spark dataframe
    val sparkDataFrame = glueDynamicFrame.dropNulls().toDF()
    //this is final spark data frame with all columns and right types
    val sparkDataFrameWithColumnAndType =
    if (!sparkDataFrame.columns.toSet.contains("myColumnWithNullType")) {
        //still null value but with type!!!
        import org.apache.spark.sql.types.IntegerType
        import org.apache.spark.sql.functions.lit
        sparkDataFrame.withColumn("myColumnWithNullType", lit(null).cast(IntegerType))
       //or any other type from org.apache.spark.sql.types package
    } else {
       sparkDataFrame
    }
    //convert spark data frame back to glue dynamic
    val newDynamiFramew = DynamicFrame(sparkDataFrameWithColumnAndType, glueContext)
    
Inter answered 13/2, 2018 at 14:25 Comment(0)
D
0

I had the same error. However, transforming "null" into "NULL" solved my issue. I needed to add the following custom transform, in python

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    from pyspark.sql.functions import when, col
    for column in df.columns:
        df = df.withColumn(column, when(col(column) == "null", "NULL").otherwise(col(column)))
    dfNoNull = DynamicFrame.fromDF(df, glueContext, "dfNoNull")
    return(DynamicFrameCollection({"dfNoNull": dfNoNull}, glueContext))
Dulaney answered 18/1 at 10:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.