Kubeflow Pipeline Termination Notificaiton
Asked Answered
D

1

9

I tried to add a logic that will send slack notification when the pipeline terminated due to some error. I tried to implement this with ExitHandler. But, seems the ExitHandler can’t dependent on any op. Do you have any good idea?

Deflocculate answered 15/8, 2019 at 10:36 Comment(0)
D
16

I found a solution to which uses ExitHandler. I post my code below, hope it can help someone else.


def slack_notification(slack_channel: str, status: str, name: str, is_exit_handler: bool = False):
    """
    performs slack notifications
    """    
    send_slack_op = dsl.ContainerOp(
        name=name,
        image='wenmin.wu/slack-cli:latest',
        is_exit_handler=is_exit_handler,
        command=['sh', '-c'],
        arguments=["/send-message.sh -d {} '{}'".format(slack_channel, status)]
    )
    send_slack_op.add_env_variable(V1EnvVar(name = 'SLACK_CLI_TOKEN', value_from=V1EnvVarSource(config_map_key_ref=V1ConfigMapKeySelector(name='workspace-config', key='SLACK_CLI_TOKEN'))))
    return send_slack_op

@dsl.pipeline(
    name='forecasting-supply',
    description='forecasting supply ...'
)
def ml_pipeline(
    param1,
    param2,
    param3,
):
    exit_task = slack_notification(
        slack_channel = slack_channel,
        name = "supply-forecasting",
        status = "Kubeflow pipeline: {{workflow.name}} has {{workflow.status}}!",
        is_exit_handler = True
    )

    with dsl.ExitHandler(exit_task):
        # put other tasks here

Deflocculate answered 21/8, 2019 at 6:53 Comment(6)
Hi @wenmin-wu how do the {{workflow.xxx}} parameters get resolved? Is that a feature of KFP, not seeing it documented anywhere. Any chance you know how that works?Shay
Hi @AlexLatchford It's a feature of argo and since kubeflow is based on argo, all argo macros can be used. Refer to github.com/argoproj/argo/blob/master/docs/variables.md to check all the argo macros.Deflocculate
Hey Wenmin, thanks so much for the link! Definitely didn't realize this was a feature, thanks for sharing!Shay
@WenminWu how do we get the output from any previous container passed to exit handler? I went through the argo doc, nothing seems to work.Rotate
how to access the workflow variable inside the KFPL python code.Stencil
Hi @RoyS, just get the variable in ml_pipeline functions and pass to python code as a parameter.Deflocculate

© 2022 - 2024 — McMap. All rights reserved.