Google Cloud Dataflow Worker Threading
Asked Answered
C

2

8

Say we have one worker with 4 CPU cores. How does parallelism configured in Dataflow worker machines? Do we parallelize beyond # of cores?

Chere answered 12/12, 2017 at 16:48 Comment(0)
G
17

I also previously thought that the dataflow workers used 1 thread per core. However, I have recently found out that this is only true for batch mode. In streaming mode, unless otherwise specified it uses 300 threads as seen here. This is in contrast to the batch worker code. To restrict the number of worker threads, use --numberOfWorkerHarnessThreads=N.

My story of how I found this out: I had a streaming job with a ParDo that would read XML files and parse them. The workers ran out of memory because they tried to chew on too many files at once. I used a static AtomicInteger to count the number of concurrent executions of my ParDo. Also, I logged the threadIds used to execute my ParDo. I could see as many as 300 concurrent executions on one worker, and the number of different threadIds logged also indicated that the worker was using a large number of threads.

I fixed my memory issues by restricting the number of threads used with --numberOfWorkerHarnessThreads=10. I also tried setting that number to 1 thread, but that seemed to result in only one pipeline step being executed at any given time. Not surprising, but I wanted a higher level of parallelism, so 10 seemed like a good number for my use-case.

Edit: Added some extra information. Thanks to @safurudin-mahic for finding the relevant Beam source code.

Great answered 6/8, 2020 at 11:8 Comment(1)
The python variant of this flag is --number_of_worker_harness_threadsLanny
P
1

For batch jobs, one worker thread is used per core, and each worker thread independently processes a chunk of the input space.

For streaming jobs, there can be many more worker threads per core to wait on input.

Purcell answered 12/12, 2017 at 22:22 Comment(6)
So 1 machine with 4 core, parallelism is 4? ThanksChere
Also, can we find this information somewhere in the documentation? Would be super helpful.Chere
I came across this while looking or an answer for my own slightly different question. Can you please clarify if there is a difference between running a DataFlow job locally on, say, a Compute Engine instance with 8 CPUs, vs. running it remotely on 8 N1-standard-1 DataFlow workers? Obviously Auto-Scaling is one difference, but are there any advantages or disadvantages for local vs. remote otherwise? Especially as relates to parallel processing. Thanks!Valdovinos
@VS_FF: The local runner is mostly intended for experimenting and debugging, and has not been as heavily optimized. Besides not being able to use auto-scaling, it performs more operations in memory (like shuffle) and would not scale to nearly as large datasets.Purcell
this answer is not correct for streaming jobs and also it is missing some reference linkCribb
The reference link is: cloud.google.com/dataflow/docs/resources/…Interdiction

© 2022 - 2024 — McMap. All rights reserved.