We want to improve the costs of running a specific Apache Beam pipeline (Python SDK) in GCP Dataflow.
We have built a memory-intensive Apache Beam pipeline, which requires approximately 8.5 GB of RAM to be run on each executor. A large machine learning model is currently loaded in a transformation DoFn.setup
method so we can precompute recommendations for a few millions of users.
The existing GCP Compute Engine machine types either have a lower memory/vCPU ratio than we require (up to 8GB RAM per vCPU) or a much higher proportion (24GB RAM per vCPU): https://cloud.google.com/compute/docs/machine-types#machine_type_comparison
We have successfully run this pipeline by using the GCP m1-ultramem-40
machine type. However, the hardware usage - and therefore, the costs - were sub-optimal. This machine type has a ratio of 24 GB RAM per vCPU. When using it to run the said pipeline, the VMs used less than 36% of the memory available - but, as expected, we paid for it all.
When attempting to run the same pipeline using a custom-2-13312
machine type (2 vCPU and 13 GB RAM), Dataflow crashed, with the error:
Root cause: The worker lost contact with the service.
While monitoring the Compute Engine instances running the Dataflow job, it was clear that they were running out of memory. Dataflow tried to load the model in memory twice - once per vCPU - but the available memory was only enough for one.
If we were able to inform Apache Beam/Dataflow that a particular transformation requires a specific amount of memory, the problem would be solved. But we didn't manage to find a way of achieving this.
The other solution we could think of was to try to change the ratio of Dataflow executors per Compute Engine VM. This would allow us to find a ratio in which we would waste as little vCPU as possible while respecting the pipeline memory requirements. While using the previously mentioned custom-2-13312
machine type, we attempted to run the pipeline using the following configurations:
--number_of_worker_harness_threads=1 --experiments=use_runner_v2
--experiments=no_use_multiple_sdk_containers --experiments=beam_fn_api
--sdk_worker_parallelism=1
When using (1), we managed to have a single thread, but Dataflow spawned two Python executor processes per VM. It resulted in the pipeline crashing as there was an attempt of loading the model to memory twice when there was enough space for only one.
When using (2), a single Python process was spawn per VM, but it ran using two threads. Each of those threads tried to load the model, and the VM runs out of memory. Approach (3) had a very similar outcome to (1) and (2).
It was not possible to combine multiple of these configurations.
Would there be a (set of) configuration(s) which would allow us to have control on the number of executors of Dataflow per VM?
Are there any other alternatives to reducing the costs which we might not have though of?
top
. By doing this, you should be able to see the memory available decreasing until the VM no longer has any memory available and it gets killed. – Chuff--number_of_worker_harness_threads=1 --experiments=use_runner_v2 --experiments=no_use_multiple_sdk_containers
should result in 1 containerized sdk process per VM running 1 thread that is processing work, regardless of number of CPU cores, but having a custom VM with 1 CPU core may be more cost effective as mentioned in another answer, unless your code breaks out of GIL (e.g. via a C++ library like TensorFlow) and actually leverages all available cores. – Rigi