Vertex AI Model Batch prediction, issue with referencing existing model and input file on Cloud Storage
Asked Answered
R

1

10

I'm struggling to correctly set Vertex AI pipeline which does the following:

  1. read data from API and store to GCS and as as input for batch prediction.
  2. get an existing model (Video classification on Vertex AI)
  3. create Batch prediction job with input from point 1.
    As it will be seen, I don't have much experience with Vertex Pipelines/Kubeflow thus I'm asking for help/advice, hope it's just some beginner mistake. this is the gist of the code I'm using as pipeline
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl

from kfp.v2.dsl import component
from kfp.v2.dsl import (
    Output,
    Artifact,
    Model,
)

PROJECT_ID = 'my-gcp-project'
BUCKET_NAME = "mybucket"
PIPELINE_ROOT = "{}/pipeline_root".format(BUCKET_NAME)


@component
def get_input_data() -> str:
    # getting data from API, save to Cloud Storage
    # return GS URI
    gcs_batch_input_path = 'gs://somebucket/file'
    return gcs_batch_input_path


@component(
    base_image="python:3.9",
    packages_to_install=['google-cloud-aiplatform==1.8.0']
)
def load_ml_model(project_id: str, model: Output[Artifact]):
    """Load existing Vertex model"""
    import google.cloud.aiplatform as aip

    model_id = '1234'
    model = aip.Model(model_name=model_id, project=project_id, location='us-central1')



@dsl.pipeline(
    name="batch-pipeline", pipeline_root=PIPELINE_ROOT,
)
def pipeline(gcp_project: str):
    input_data = get_input_data()
    ml_model = load_ml_model(gcp_project)

    gcc_aip.ModelBatchPredictOp(
        project=PROJECT_ID,
        job_display_name=f'test-prediction',
        model=ml_model.output,
        gcs_source_uris=[input_data.output],  # this doesn't work
        # gcs_source_uris=['gs://mybucket/output/'],  # hardcoded gs uri works
        gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
    )


if __name__ == '__main__':
    from kfp.v2 import compiler
    import google.cloud.aiplatform as aip
    pipeline_export_filepath = 'test-pipeline.json'
    compiler.Compiler().compile(pipeline_func=pipeline,
                                package_path=pipeline_export_filepath)
    # pipeline_params = {
    #     'gcp_project': PROJECT_ID,
    # }
    # job = aip.PipelineJob(
    #     display_name='test-pipeline',
    #     template_path=pipeline_export_filepath,
    #     pipeline_root=f'gs://{PIPELINE_ROOT}',
    #     project=PROJECT_ID,
    #     parameter_values=pipeline_params,
    # )

    # job.run()

When running the pipeline it throws this exception when running Batch prediction:
details = "List of found errors: 1.Field: batch_prediction_job.model; Message: Invalid Model resource name. so I'm not sure what could be wrong. I tried to load model in the notebook (outside of component) and it correctly returns.

Second issue I'm having is referencing GCS URI as output from component to batch job input.

   input_data = get_input_data2()
   gcc_aip.ModelBatchPredictOp(
        project=PROJECT_ID,
        job_display_name=f'test-prediction',
        model=ml_model.output,
        gcs_source_uris=[input_data.output],  # this doesn't work
        # gcs_source_uris=['gs://mybucket/output/'],  # hardcoded gs uri works
        gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
    )

During compilation, I get following exception TypeError: Object of type PipelineParam is not JSON serializable, though I think this could be issue of ModelBatchPredictOp component.

Again any help/advice appreciated, I'm dealing with this from yesterday, so maybe I missed something obvious.

libraries I'm using:

google-cloud-aiplatform==1.8.0  
google-cloud-pipeline-components==0.2.0  
kfp==1.8.10  
kfp-pipeline-spec==0.1.13  
kfp-server-api==1.7.1

UPDATE After comments, some research and tuning, for referencing model this works:

@component
def load_ml_model(project_id: str, model: Output[Artifact]):
    region = 'us-central1'
    model_id = '1234'
    model_uid = f'projects/{project_id}/locations/{region}/models/{model_id}'
    model.uri = model_uid
    model.metadata['resourceName'] = model_uid

and then I can use it as intended:

batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=gcp_project,
        job_display_name=f'batch-prediction-test',
        model=ml_model.outputs['model'],
        gcs_source_uris=[input_batch_gcs_path],
gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/test'
    )

UPDATE 2 regarding GCS path, a workaround is to define path outside of the component and pass it as an input parameter, for example (abbreviated):

@dsl.pipeline(
    name="my-pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
        gcp_project: str,
        region: str,
        bucket: str
):
    ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    
    gcs_prediction_input_path = f'gs://{BUCKET_NAME}/prediction_input/video_batch_prediction_input_{ts}.jsonl'
    batch_input_data_op = get_input_data(gcs_prediction_input_path)  # this loads input data to GCS path

    batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=gcp_project,
        model=training_job_run_op.outputs["model"],
        job_display_name='batch-prediction',
        # gcs_source_uris=[batch_input_data_op.output],
        gcs_source_uris=[gcs_prediction_input_path],
        gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/',
    ).after(batch_input_data_op)  # we need to add 'after' so it runs after input data is prepared since get_input_data doesn't returns anything

still not sure, why it doesn't work/compile when I return GCS path from get_input_data component

Rightwards answered 14/12, 2021 at 23:42 Comment(7)
I would like to get more details of your case. About your first issue, did you code fits this description, check for id, upper/lower and regions are common in your project. About second issue, can you post the full stack trace or the name of the file which throw this error.Hubby
thanks for the comment and reference, I updated the description with a solution that works to reference ML ModelRightwards
So, for second issue can you detail where is actually happening? Its only about the input_data.output, right? have you try printing input_data.output before calling the function on a previous step and also only the input_data without .output??Hubby
or are you referring to gcs_source_uris or gcs_destination_output_uri_prefix??Hubby
any update about this? can you help defining your storage issue?Hubby
thanks for the interest (I was referencing to gcs_source_uris), I updated the original question with the workaround I found.Rightwards
This was so helpful, thanks!!Kimberlite
H
-1

I'm glad you solved most of your main issues and found a workaround for model declaration.

For your input.output observation on gcs_source_uris, the reason behind it is because the way the function/class returns the value. If you dig inside the class/methods of google_cloud_pipeline_components you will find that it implements a structure that will allow you to use .outputs from the returned value of the function called.

If you go to the implementation of one of the components of the pipeline you will find that it returns an output array from convert_method_to_component function. So, in order to have that implemented in your custom class/function your function should return a value which can be called as an attribute. Below is a basic implementation of it.

class CustomClass():
     def __init__(self):
       self.return_val = {'path':'custompath','desc':'a desc'}
      
     @property
     def output(self):
       return self.return_val 

hello = CustomClass()
print(hello.output['path'])

If you want to dig more about it you can go to the following pages:

Hubby answered 21/12, 2021 at 14:35 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.