How can I wait for an unknown number of Rust threads to complete without using tracking handles?
Asked Answered
H

2

7

What are some good ways to adapt this Barrier example to handle two differences:

  1. the number of items is not known in advance (for example, in the case where splitting a large file into lines)

  2. without tracking thread handles (e.g. without using the handles vector in the example below). The motivation is that doing so adds additional overhead.

Example code:

use std::sync::{Arc, Barrier};
use std::thread;

let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
    let c = barrier.clone();
    handles.push(thread::spawn(move|| {
        // do some work
        c.wait();
    }));
}
// Wait for other threads to finish.
for handle in handles {
    handle.join().unwrap();
}

Code snippet is adapted slightly from the Barrier docs.

The first thing that crossed my mind would be (if possible) to mutate the inner value of the Barrier; however, the API does not provide mutable access to the num_threads property of the Barrier struct.

Another idea would be to not use the Barrier and instead write custom logic with AtomicUsize.

I'm open to learning the most ergonomic / idiomatic ways to do this in Rust.

Heiskell answered 5/7, 2017 at 3:7 Comment(1)
Are to asking how to synchronize threads without keeping track of thread synchronization mechanisms? You have to keep those handles somewhere if you want to join them later. Since vectors are resizable, what's wrong with making and keeping that variable number of handles and barrier instances?Cop
D
3

You can use spinlock on atomic for waiting for all threads to exit. Of course, instead of using static atomic, you can pass Arc<AtomicUsize> into each thread.

Ordering::SeqCst is probably too strong, but concurrent programming is hard, and I'm not sure how this ordering can be relaxed.

While it can be done this way, cost of creating threads will probably dwarf micro optimization like this. Also it's worth considering that busy wait can decrease performance of a program.

use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::thread;
use std::time::Duration;

static GLOBAL_THREAD_COUNT: AtomicUsize = ATOMIC_USIZE_INIT;

fn main() {
    for i in 0..10 {
        // mark that the thread is about to run
        // we need to do it in the main thread to prevent spurious exits
        GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::SeqCst);
        thread::spawn(move|| {
            // We need to catch panics to reliably signal exit of a thread
            let result = panic::catch_unwind(move || {
                // do some work
                println!("{}-th thread reporting", i+1);
            });
            // process errors
            match result {
                _ => {}
            }
            // signal thread exit
            GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::SeqCst);
        });
    }
    // Wait for other threads to finish.
    while GLOBAL_THREAD_COUNT.load(Ordering::SeqCst) != 0 {
        thread::sleep(Duration::from_millis(1)); 
    }
}

Playground link

Deedeeann answered 5/7, 2017 at 16:15 Comment(0)
V
3

Have each thread send its result (or just Thread::current) to a multiple-producer, single-consumer, Channel that the waiting parent thread is consuming. The standard library provides an mpsc channnel for this purpose (). It's a lot less CPU intensive than a spinlock.

Vaisya answered 31/10, 2021 at 1:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.