Say we have one worker with 4 CPU cores. How does parallelism configured in Dataflow worker machines? Do we parallelize beyond # of cores?
I also previously thought that the dataflow workers used 1 thread per core. However, I have recently found out that this is only true for batch mode. In streaming mode, unless otherwise specified it uses 300 threads as seen here. This is in contrast to the batch worker code. To restrict the number of worker threads, use --numberOfWorkerHarnessThreads=N
.
My story of how I found this out: I had a streaming job with a ParDo that would read XML files and parse them. The workers ran out of memory because they tried to chew on too many files at once. I used a static AtomicInteger to count the number of concurrent executions of my ParDo. Also, I logged the threadIds used to execute my ParDo. I could see as many as 300 concurrent executions on one worker, and the number of different threadIds logged also indicated that the worker was using a large number of threads.
I fixed my memory issues by restricting the number of threads used with --numberOfWorkerHarnessThreads=10
. I also tried setting that number to 1 thread, but that seemed to result in only one pipeline step being executed at any given time. Not surprising, but I wanted a higher level of parallelism, so 10 seemed like a good number for my use-case.
Edit: Added some extra information. Thanks to @safurudin-mahic for finding the relevant Beam source code.
For batch jobs, one worker thread is used per core, and each worker thread independently processes a chunk of the input space.
For streaming jobs, there can be many more worker threads per core to wait on input.
© 2022 - 2024 — McMap. All rights reserved.
--number_of_worker_harness_threads
– Lanny