What is a good pattern for aggregating the results from Kubeflow Pipleine kfp.ParallelFor?
Aggregate results when using Kubeflow Pipelines kfp.ParallelFor
Asked Answered
Not exactly what you asked for, but our workaround was to write the results of the parallelfor tasks into S3 and simply collect them afterwards in a postprocessing task.
with dsl.ParallelFor(preprocessing_task.output) as plant_item:
predict_plant='{}'.format(plant_item)
forecasting_task = forecasting_op(predict_plant, ....).after(preprocessing_task)
postprocessing_task = postprocessing_op(...).after(forecasting_task)
(After multiple suggested edits: no, the postprocessing step is not inside the loop, it is afterwards. That is exactly what collects the results.) –
Culch
Are you aware of any documentation supporting that? When I try to recreate this approach, one
postprocessing_task
node appears in the graph for each of my forecasting_task
equivalents. –
Spahi Did you put plant_item or predict_plant as input into it? Because then it's understandable. But no, documentation I couldn't find, just trial and error. –
Culch
At the moment this might not be supported:
© 2022 - 2024 — McMap. All rights reserved.