cannot copy json - Dynamo db Streams to redshift
S

1

11

Following is the use case i am working on: I have configured enable Streams when creating DynamoDB with new and old Image.I have created a Kinesis Firehose delivery stream with Destination as Redshift(Intermediate s3).

From Dynamodb my stream reaches Firhose and from there to the Bucket as JSON (S3 Bucket -Gzip)given below. My Problem is i cannot COPY this JSON to redshift.

Things i am not able to get:

    1. Not Sure what should be the Create table Statement in Redshift
    1. What should be the COPY Syntax in Kinesis firhose.
    1. How should i use JsonPaths here. Kinesis Data firehouse set to return only json to my s3 bucket.
    1. How to mention the Maniphest in the COPY Command

JSON Load to S3 is shown Below:

{
    "Keys": {
        "vehicle_id": {
            "S": "x011"
        }
    },
    "NewImage": {
        "heart_beat": {
            "N": "0"
        },
        "cdc_id": {
            "N": "456"
        },
        "latitude": {
            "N": "1.30951"
        },
        "not_deployed_counter": {
            "N": "1"
        },
        "reg_ind": {
            "N": "0"
        },
        "operator": {
            "S": "x"
        },
        "d_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "z_id": {
            "N": "1267"
        },
        "last_end_trip_dttm": {
            "S": "11/08/2018 1:43:46 PM"
        },
        "land_ind": {
            "N": "1"
        },
        "s_ind": {
            "N": "1"
        },
        "status_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "case_ind": {
            "N": "1"
        },
        "last_po_change_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "violated_duration": {
            "N": "20"
        },
        "vehicle_id": {
            "S": "x011"
        },
        "longitude": {
            "N": "103.7818"
        },
        "file_status": {
            "S": "Trip_Start"
        },
        "unhired_duration": {
            "N": "10"
        },
        "eo_lat": {
            "N": "1.2345"
        },
        "reply_eo_ind": {
            "N": "1"
        },
        "license_ind": {
            "N": "0"
        },
        "indiscriminately_parked_ind": {
            "N": "0"
        },
        "eo_lng": {
            "N": "102.8978"
        },
        "officer_id": {
            "S": "[email protected]"
        },
        "case_status": {
            "N": "0"
        },
        "color_status_cd": {
            "N": "0"
        },
        "parking_id": {
            "N": "2345"
        },
        "ttr_dttm": {
            "S": "11/08/2018 2:43:46 PM"
        },
        "deployed_ind": {
            "N": "1"
        },
        "status": {
            "S": "PI"
        }
    },
    "SequenceNumber": "1200000000000956615967",
    "SizeBytes": 570,
    "ApproximateCreationDateTime": 1535513040,
    "eventName": "INSERT"
}

My Create table Statement :

create table vehicle_status(
    heart_beat integer,
    cdc_id integer,
    latitude integer,   
    not_deployed_counter integer,
    reg_ind integer,
    operator varchar(10),
    d_dttm varchar(30),
    z_id integer,
    last_end_trip_dttm varchar(30),
    land_ind integer,
    s_ind integer,
    status_change_dttm varchar(30), 
    case_ind integer,
    last_po_change_dttm varchar(30),    
    violated_duration integer,
    vehicle_id varchar(8),
    longitude integer,  
    file_status varchar(30),
    unhired_duration integer,
    eo_lat integer,                     
    reply_eo_ind integer,
    license_ind integer,    
    indiscriminately_parked_ind integer,
    eo_lng integer,
    officer_id varchar(50),
    case_status integer,
    color_status_cd integer,
    parking_id integer,
    ttr_dttm varchar(30),
    deployed_ind varchar(3),
  status varchar(8));

And My Copy Statement (Manually trying to reslove this from Redshift):

COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) 
FROM 's3://<my-bucket>/2018/08/29/05/vehicle_status_change-2-2018-08-29-05-24-42-092c330b-e14a-4133-bf4a-5982f2e1f49e.gz' CREDENTIALS 'aws_iam_role=arn:aws:iam::<accountnum>:role/<RedshiftRole>' GZIP json 'auto';

When i try the above procedure - i get to Insert the records - but all the columns and rows are null.

How can i copy this json format to redhsift. Have been stuck here last 3 days.Any help on this would do.

S3 Bucket:

Amazon S3/<My-bucket>/2018/08/29/05
Amazon S3/<My-bucket>/manifests/2018/08/29/05
Spile answered 29/8, 2018 at 7:30 Comment(0)
L
1

I'm not very much familiar with Amazon, but let me try to answer most of your questions, so that you could move on. Other people are most welcome to edit this answer or additional details. Thank you!

Not Sure what should be the Create table Statement in Redshift

Your create statement create table vehicle_status(...) has no problem, though you could add distribution key, sort key and encoding based on your requirement, refer more here and here

As per AWS Kenesis documents, your table must be present in Redshift, hence you could connect to Redshift using psql command and run the create statement manually.

What should be the COPY Syntax in Kinesis firhose.

The Copy syntax would remain same either you run it via psql or firhose, luckily the copy script you have come up with works without any error, I tried it in my instance with small modification of direct AWS/SECRET key supply rather then it works fine, here the sql I run that worked fine and copied 1 data record to the table vehicle_status.

Actually your json path structure is complex, hence json 'auto' will not work. Here is the working command, I have created a sample jsonpath file for you with 4 example fields and you could follow same structure to create jsonpath file with all the data points.

 COPY vehicle_status (heart_beat, cdc_id, operator, status) FROM 's3://XXX/development/test_file.json' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://XXX/development/yourjsonpathfile';

And your json path file should have content similar of as below.

{
  "jsonpaths": [
    "$['NewImage']['heart_beat']['N']",
    "$['NewImage']['cdc_id']['N']",
    "$['NewImage']['operator']['S']",
    "$['NewImage']['status']['S']"
  ]
}

I have tested it and it works.

How should i use JsonPaths here. Kinesis Data firehouse set to return only json to my s3 bucket.

I have used your example json data only and it works, so I see no issue here.

How to mention the Maniphest in the COPY Command

This is good question, I could try explaining it, I hope, here you are referring menifest.

If you see above copy command, it works fine for one file or couple of files, but think it you have lot of files, here comes the concept of menifest. Straight from Amazon docs, "Instead of supplying an object path for the COPY command, you supply the name of a JSON-formatted text file that explicitly lists the files to be loaded."

In short, if you want to load multiple files in single shot which is preferred way as well by Redshift, you could create a simple menifest with json and supply the same in copy command.

{ "entries": [ {"url":"s3://mybucket-alpha/2013-10-04-custdata", "mandatory":true}, {"url":"s3://mybucket-alpha/2013-10-05-custdata", "mandatory":true},.... ] }

upload the menifest to S3 and use the same in your copy command like below.

 COPY vehicle_status (heart_beat, cdc_id, latitude, not_deployed_counter, reg_ind, operator, d_dttm, z_id, last_end_trip_dttm, land_ind, s_ind, status_change_dttm, case_ind, last_po_change_dttm, violated_duration, vehicle_id, longitude, file_status, unhired_duration, eo_lat, reply_eo_ind, license_ind, indiscriminately_parked_ind, eo_lng, officer_id, case_status, color_status_cd, parking_id, ttr_dttm, deployed_ind, status) FROM 's3://XXX/development/test.menifest' CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXXX;aws_secret_access_key=MYXXXXXXXXXXXXXXXXXXXXXX' json 's3://yourbucket/jsonpath' menifest;

Here is detail reference for menifest.

I hope this gives you some ideas, how to move on and If there is spefic error you see, I would be happy to refocus on the answer.

Logography answered 31/8, 2018 at 12:51 Comment(5)
still have the same problem. insert is successful ,but Null is inserted for all columns.Spile
@Ruser Actually, I forget to take care of jsonpathfile that I have corrected, please try with latest information I have added, it will work for sure, I did a quick test.Logography
How do we generate jsonpath from json. I can do once maunally in creating jsonpath and to write copy command directly in redshift. However I want this to be automated through Kinesis Data firehose to s3 to redshiftSpile
I think, usually jsonpaths doesn't change very frequently, hence I'm not sure if it will help to greater degree, in fact it may be less efficient because you may be executing jsonpath generation file every-time. I'm sorry, I don't have that much knowledge about generating jsonpath automatically.Logography
@RUser I hope this has resolved your problem, you should be seeing data now in Redshift. Does it?Logography

© 2022 - 2024 — McMap. All rights reserved.