AWS Sagemaker using parquet file for batch transform job?
Asked Answered
S

3

5

I am trying to run batch transform inference job using parquet data file but could not find anything. Everywhere it says batch transform accepts format type as text/csv or json only. For a test purpose, I did try using a lambda function inside AWS account to invoke the parque data but the batch transform job never succeeded. Having ClientError: 400, Error parsing data.

request = \
        {
            "TransformJobName": batch_job_name,
            "ModelName": model_name,
            "BatchStrategy": "MultiRecord",
            "TransformOutput": {
                "S3OutputPath": batch_output
            },
            "TransformInput": {
                "DataSource": {
                    "S3DataSource": {
                        "S3DataType": "S3Prefix",
                        "S3Uri": batch_input

                    }
                },
                "ContentType": "application/x-parquet", 
                "SplitType": "Line",
                "CompressionType": "None"
            },
            "TransformResources": {
                "InstanceType": "ml.m4.xlarge",
                "InstanceCount": 1
            }
        }
    client.create_transform_job(**request)
    return "Done"

Currently I am trying to run the sagemaker batch transform job locally using a parque data file. I have the docker image which I can run to 'serve' in my local terminal and I can call the data using REST API service Postman from "localhost:8080/invocations" using "Binary" input function to upload the parque data file. It's working fine and I can see the data populating in postman body. However, I am not able to use parque data for batch transform.

Has anyone successfully used parquet file to convert and make prediction using sagemaker batch transform?

Simas answered 16/6, 2020 at 18:25 Comment(0)
E
9

A bit late, but hope this help someone else. Just to add on what @Setu Shah mentioned, this is what worked for me for serializing and deserializing parquet files in Sagemaker:

from io import BytesIO
from typing import BinaryIO
import pandas as pd
from botocore.response import StreamingBody

def input_fn(
  serialized_input_data: StreamingBody,
  content_type: str = "application/x-parquet",
) -> pd.DataFrame:
  """Deserialize inputs"""
  if content_type == "application/x-parquet":
    data = BytesIO(serialized_input_data)
    df = pd.read_parquet(data)
    return df
  else:
    raise ValueError(
      "Expected `application/x-parquet`."
    )

def output_fn(output: pd.DataFrame, accept: str = "application/x-parquet") -> BinaryIO:
  """Model output handler"""
  if accept == "application/x-parquet":
    buffer = BytesIO()
    output.to_parquet(buffer)
    return buffer.getvalue()
  else:
    raise Exception("Requested unsupported ContentType in Accept: " + accept)
Elegize answered 21/9, 2021 at 7:9 Comment(4)
For my understanding, would you use this as an entrypoint script for a custom container?Tarantula
@Tarantula iirc not necessarily from a custom container. You can use the Sagemaker containers and specify your own entry point. This would be for the inference script on parquet dataElegize
I've done this before for live endpoints. This is possible from say a boto3 call for create_transform_job? I don't see in the docs where you could pass an entry point script for a batch transform job.Tarantula
I'm a bit foggy on the details (it's been a while), but I think Batch Transform Jobs basically create an endpoint, throw your data at it and then destroy the endpoint, so they work similar. When you do a boto3 create_transform_job call you just specify the Sagemaker model (on ModelName). These functions are used when you create the model (I think the ModelDataUrl parameter in create_model, which points to the script in S3).Elegize
N
2

For SageMaker Batch Transform (or any serving jobs) it is possible to supply the training method with a custom input_fn that can use any other type of input as long as there is custom logic to handle it. I have had success using it for avro inputs.

Something like below should work for parquet files:

def input_fn(serialized_input_data, content_type="application/parquet"):
    logger.info("Deserializing input data.")
    ...
    # Process the serialized input.
    logger.info(f"Input deserialized.")
    return input_data

More information about custom functions here. This links out to the scikit-learn Estimator page but I think it is supported by all types of SageMaker Estimator objects.

Nabob answered 21/7, 2020 at 21:10 Comment(1)
What's the logic for chunking the serialized_input_data? Let's say you have a parquet file that's larger than 6MB; how does SageMaker split the file? If it splits in the middle of a parquet row group, do you have to just parse as much as you can and keep the rest in a byte buffer until the next chunk is received?Bartonbartosch
V
1

Sagemaker Batch Transform does not seem to support parquet format, so you will have to have your own workaround to work with parquet dataset. You can convert your parquet dataset into the dataset your inference endpoint supports (e.g text/csv or application/json), and use this converted dataset in batch transform. In a spark cluster you can do this by doing as simple as:

sqlContext.read.parquet("input/parquet/location/").write.json("output/json/location")
Virtuosic answered 25/6, 2020 at 19:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.