Optimising GCP costs for a memory-intensive Dataflow Pipeline
Asked Answered
C

2

7

We want to improve the costs of running a specific Apache Beam pipeline (Python SDK) in GCP Dataflow.

We have built a memory-intensive Apache Beam pipeline, which requires approximately 8.5 GB of RAM to be run on each executor. A large machine learning model is currently loaded in a transformation DoFn.setup method so we can precompute recommendations for a few millions of users.

The existing GCP Compute Engine machine types either have a lower memory/vCPU ratio than we require (up to 8GB RAM per vCPU) or a much higher proportion (24GB RAM per vCPU): https://cloud.google.com/compute/docs/machine-types#machine_type_comparison

We have successfully run this pipeline by using the GCP m1-ultramem-40 machine type. However, the hardware usage - and therefore, the costs - were sub-optimal. This machine type has a ratio of 24 GB RAM per vCPU. When using it to run the said pipeline, the VMs used less than 36% of the memory available - but, as expected, we paid for it all.

When attempting to run the same pipeline using a custom-2-13312 machine type (2 vCPU and 13 GB RAM), Dataflow crashed, with the error:

   Root cause: The worker lost contact with the service.

While monitoring the Compute Engine instances running the Dataflow job, it was clear that they were running out of memory. Dataflow tried to load the model in memory twice - once per vCPU - but the available memory was only enough for one.

If we were able to inform Apache Beam/Dataflow that a particular transformation requires a specific amount of memory, the problem would be solved. But we didn't manage to find a way of achieving this.

The other solution we could think of was to try to change the ratio of Dataflow executors per Compute Engine VM. This would allow us to find a ratio in which we would waste as little vCPU as possible while respecting the pipeline memory requirements. While using the previously mentioned custom-2-13312 machine type, we attempted to run the pipeline using the following configurations:

  1. --number_of_worker_harness_threads=1 --experiments=use_runner_v2
  2. --experiments=no_use_multiple_sdk_containers --experiments=beam_fn_api
  3. --sdk_worker_parallelism=1

When using (1), we managed to have a single thread, but Dataflow spawned two Python executor processes per VM. It resulted in the pipeline crashing as there was an attempt of loading the model to memory twice when there was enough space for only one.

When using (2), a single Python process was spawn per VM, but it ran using two threads. Each of those threads tried to load the model, and the VM runs out of memory. Approach (3) had a very similar outcome to (1) and (2).

It was not possible to combine multiple of these configurations.

Would there be a (set of) configuration(s) which would allow us to have control on the number of executors of Dataflow per VM?

Are there any other alternatives to reducing the costs which we might not have though of?

Chuff answered 2/9, 2020 at 12:35 Comment(3)
I have a same problem (I think). How did you check memory usage of the job? job metrics tab only shows CPU usage?Julius
I profiled the memory in the compute engine instances which were running the pipeline. A simple way of doing this is by SSHing into the VMs & using top. By doing this, you should be able to see the memory available decreasing until the VM no longer has any memory available and it gets killed.Chuff
Could you please elaborate on why it was not possible to combine these configurations? I think the configuration --number_of_worker_harness_threads=1 --experiments=use_runner_v2 --experiments=no_use_multiple_sdk_containers should result in 1 containerized sdk process per VM running 1 thread that is processing work, regardless of number of CPU cores, but having a custom VM with 1 CPU core may be more cost effective as mentioned in another answer, unless your code breaks out of GIL (e.g. via a C++ library like TensorFlow) and actually leverages all available cores.Rigi
S
4

We are working on long-term solutions to these problems, but here is a tactical fix that should prevent the model duplication that you saw in approaches 1 and 2:

Share the model in a VM across workers, to avoid it being duplicated in each worker. Use the following utility (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/utils/shared.py), which is available out of the box in Beam 2.24 If you are using an earlier version of Beam, copy just the shared.py to your project and use it as user code.

Satiny answered 9/9, 2020 at 0:21 Comment(0)
D
2

I don't think that at this moment there's an option to control the number of executors per VM, it seems that the closest that you will get there is by using the option (1) and assume a Python executor per core.

Option (1)

--number_of_worker_harness_threads=1 --experiments=use_runner_v2

To compensate on the cpu-mem ratio you need, I'd suggest using custom machines with extended memory. This approach should be more cost-effective.

For example, the cost of a running a single executor and a single thread on a n1-standard-4 machine (4 CPUs - 15GB) will be roughly around 30% more expensive than running the same workload using a custom-1-15360-ext (1 CPU - 15GB) custom machine.

Doleful answered 3/9, 2020 at 21:0 Comment(5)
How could people create custom machine? the page you linked explains how to do during instance creation or after instance is created (requires reboot) but for dataflow you have to specify instance type when you launch job, and dataflow will take care of instance lifecycle.Julius
You can specify a custom machine type when launching the pipeline, for example --workerMachineType=custom-1-15360-extDoleful
How do you create a custom machine?Julius
As you mentioned, for dataflow you do not create the machines beforehand, but rather you specify what machineType you want to use. The machineType for custom machine types based on n1 family is built as follows: custom-[NUMBER_OF_CPUS]-[NUMBER_OF_MB]. The ext flag for extended memory, as well as how to create a custom machine based on other families is explained here , specifically look at the gcloud section of the instructionsDoleful
I think NUMBER_OF_MB needs to be a multiple of 256.Rigi

© 2022 - 2024 — McMap. All rights reserved.