Simultaneous mutable access to arbitrary indices of a large vector that are guaranteed to be disjoint
Asked Answered
D

6

24

Context

I have a case where multiple threads must update objects stored in a shared vector. However, the vector is very large, and the number of elements to update is relatively small.

Problem

In a minimal example, the set of elements to update can be identified by a (hash-)set containing the indices of elements to update. The code could hence look as follows:

let mut big_vector_of_elements = generate_data_vector();

while has_things_to_do() {
    let indices_to_update = compute_indices();
    indices_to_update.par_iter() // Rayon parallel iteration
       .map(|index| big_vector_of_elements[index].mutate())
       .collect()?;
}

This is obviously disallowed in Rust: big_vector_of_elements cannot be borrowed mutably in multiple threads at the same time. However, wrapping each element in, e.g., a Mutex lock seems unnecessary: this specific case would be safe without explicit synchronization. Since the indices come from a set, they are guaranteed to be distinct. No two iterations in the par_iter touch the same element of the vector.

Restating my question

What would be the best way of writing a program that mutates elements in a vector in parallel, where the synchronization is already taken care of by the selection of indices, but where the compiler does not understand the latter?

A near-optimal solution would be to wrap all elements in big_vector_of_elements in some hypothetical UncontendedMutex lock, which would be a variant of Mutex which is ridiculously fast in the uncontended case, and which may take arbitrarily long when contention occurs (or even panics). Ideally, an UncontendedMutex<T> should also be of the same size and alignment as T, for any T.

Related, but different questions:

Multiple questions can be answered with "use Rayon's parallel iterator", "use chunks_mut", or "use split_at_mut":

These answers do not seem relevant here, since those solutions imply iterating over the entire big_vector_of_elements, and then for each element figuring out whether anything needs to be changed. Essentially, this means that such a solution would look as follows:

let mut big_vector_of_elements = generate_data_vector();

while has_things_to_do() {
    let indices_to_update = compute_indices();
    for (index, mut element) in big_vector_of_elements.par_iter().enumerate() {
        if indices_to_update.contains(index) {
            element.mutate()?;
        }
    }
}

This solution takes time proportionate to the size of big_vector_of_elements, whereas the first solution loops only over a number of elements proportionate to the size of indices_to_update.

Danidania answered 1/5, 2019 at 16:40 Comment(5)
IMHO It is not possible in the safe rust, you should use unsafe rust.Zerk
If the compiler is not able to verify that an operation is safe, but you can proof that it is, using unsafe code can be a good choice. One option would be to wrap the objects in the vector in an UnsafeCell.Protecting
@SvenMarnach As far as I see, the UnsafeCell would still not be usable in a parallel iterator, since it's not Sync. I could make a custom type and unsafe impl Sync like it shows in the standard library documentation of struct UnsafeCell, but I'm not sure what responsibilities then fall onto my shoulders. If you feel comfortable with it, feel free to expand your comment to an answer.Danidania
Maybe this approach is useful for you: https://mcmap.net/q/583609/-how-do-i-write-to-a-mutable-slice-from-multiple-threads-at-arbitrary-indexes-without-using-mutexesSedgewick
Thanks @cibercitizen1, when the slice contains values of atomic types (or of types for which atomic versions exist), that solution is indeed very applicable.Danidania
C
8

When the compiler can't enforce that mutable references to a slice elements aren't exclusive, Cell is pretty nice.

You can transform a &mut [T] into a &Cell<[T]> using Cell::from_mut, and then a &Cell<[T]> into a &[Cell<T>] using Cell::as_slice_of_cells. All of this is zero-cost: It's just there to guide the type-system.

A &[Cell<T>] is like a &[mut T], if that were possible to write: A shared reference to a slice of mutable elements. What you can do with Cells is limited to read or replace — you can't get a reference, mutable or not, to the wrapped elements themselves. Rust also knows that Cell isn't thread-safe (it does not implement Sync). This guarantees that everything is safe, at no dynamic cost.

fn main() {
    use std::cell::Cell;

    let slice: &mut [i32] = &mut [1, 2, 3];
    let cell_slice: &Cell<[i32]> = Cell::from_mut(slice);
    let slice_cell: &[Cell<i32>] = cell_slice.as_slice_of_cells();
    
    let two = &slice_cell[1];
    let another_two = &slice_cell[1];

    println!("This is 2: {:?}", two);
    println!("This is also 2: {:?}", another_two);
    
    two.set(42);
    println!("This is now 42!: {:?}", another_two);
}
Contiguous answered 24/6, 2020 at 22:16 Comment(2)
Great answer, thanks. It looks like some features used here weren't available back when I asked the question (as_slice_of_cells was stabilized in august 2019), but at this point, this is probably the best answer.Danidania
So how would this be used in a multy thread context? Because it does not implement Sync the compiler complains std::cell::Cell<i32> cannot be shared between threads safelyAmber
G
8

You can sort indices_to_update and extract mutable references by calling split_*_mut.

let len = big_vector_of_elements.len();

while has_things_to_do() {
    let mut tail = big_vector_of_elements.as_mut_slice();

    let mut indices_to_update = compute_indices();
    // I assumed compute_indices() returns unsorted vector
    // to highlight the importance of sorted order
    indices_to_update.sort();

    let mut elems = Vec::new();

    for idx in indices_to_update {
        // cut prefix, so big_vector[idx] will be tail[0]
        tail = tail.split_at_mut(idx - (len - tail.len())).1;

        // extract tail[0]
        let (elem, new_tail) = tail.split_first_mut().unwrap();
        elems.push(elem);

        tail = new_tail;
    }
}

Double check everything in this code; I didn't test it. Then you can call elems.par_iter(...) or whatever.

Goodsized answered 2/5, 2019 at 8:40 Comment(2)
Note: iterating the indices_to_update in reverse order would avoid the index juggling. Apart from that, this seems to be the best way to do this in safe Rust.Morbidezza
Thanks, that looks like the basis for a great solution. If I were to swap the HashSet for something that's easy to recursively split in two (e.g., binary tree, or perhaps hibitset), I could recursively split the vector in two using split_at_mut. This fits nicely onto Rayon's fork/join model. It'll probably be somewhat slower than a 'dirty' unsafe solution, but it does show that the safe abstraction offered by Vec gets us very far. I'm considering your submission as the answer, but I'll wait a bit longer to see if anyone else has neat insights like yours, Sven's or E_net4's.Danidania
C
8

When the compiler can't enforce that mutable references to a slice elements aren't exclusive, Cell is pretty nice.

You can transform a &mut [T] into a &Cell<[T]> using Cell::from_mut, and then a &Cell<[T]> into a &[Cell<T>] using Cell::as_slice_of_cells. All of this is zero-cost: It's just there to guide the type-system.

A &[Cell<T>] is like a &[mut T], if that were possible to write: A shared reference to a slice of mutable elements. What you can do with Cells is limited to read or replace — you can't get a reference, mutable or not, to the wrapped elements themselves. Rust also knows that Cell isn't thread-safe (it does not implement Sync). This guarantees that everything is safe, at no dynamic cost.

fn main() {
    use std::cell::Cell;

    let slice: &mut [i32] = &mut [1, 2, 3];
    let cell_slice: &Cell<[i32]> = Cell::from_mut(slice);
    let slice_cell: &[Cell<i32>] = cell_slice.as_slice_of_cells();
    
    let two = &slice_cell[1];
    let another_two = &slice_cell[1];

    println!("This is 2: {:?}", two);
    println!("This is also 2: {:?}", another_two);
    
    two.set(42);
    println!("This is now 42!: {:?}", another_two);
}
Contiguous answered 24/6, 2020 at 22:16 Comment(2)
Great answer, thanks. It looks like some features used here weren't available back when I asked the question (as_slice_of_cells was stabilized in august 2019), but at this point, this is probably the best answer.Danidania
So how would this be used in a multy thread context? Because it does not implement Sync the compiler complains std::cell::Cell<i32> cannot be shared between threads safelyAmber
L
3

I think this is a reasonable place to use unsafe code. The logic itself is safe but cannot be checked by the compiler because it relies on knowledge outside of the type system (the contract of BTreeSet, which itself relies on the implementation of Ord and friends for usize).

In this sample, we preemptively bounds check all the indices via range, so each call to add is safe to use. Since we take in a set, we know that all the indices are disjoint, so we aren't introducing mutable aliasing. It's important to get the raw pointer from the slice to avoid aliasing between the slice itself and the returned values.

use std::collections::BTreeSet;

fn uniq_refs<'i, 'd: 'i, T>(
    data: &'d mut [T],
    indices: &'i BTreeSet<usize>,
) -> impl Iterator<Item = &'d mut T> + 'i {
    let start = data.as_mut_ptr();
    let in_bounds_indices = indices.range(0..data.len());

    // I copied this from a Stack Overflow answer
    // without reading the text that explains why this is safe
    in_bounds_indices.map(move |&i| unsafe { &mut *start.add(i) })
}

use std::iter::FromIterator;

fn main() {
    let mut scores = vec![1, 2, 3];

    let selected_scores: Vec<_> = {
        // The set can go out of scope after we have used it.
        let idx = BTreeSet::from_iter(vec![0, 2]);
        uniq_refs(&mut scores, &idx).collect()
    };

    for score in selected_scores {
        *score += 1;
    }

    println!("{:?}", scores);
}

Once you have used this function to find all the separate mutable references, you can use Rayon to modify them in parallel:

use rayon::prelude::*; // 1.0.3

fn example(scores: &mut [i32], indices: &BTreeSet<usize>) {
    let selected_scores: Vec<_> = uniq_refs(scores, indices).collect();
    selected_scores.into_par_iter().for_each(|s| *s *= 2);

    // Or

    uniq_refs(scores, indices).par_bridge().for_each(|s| *s *= 2);
}

You may wish to consider using a bitset instead of a BTreeMap to be more efficient, but this answer uses only the standard library.

See also:

Leopard answered 6/5, 2019 at 16:39 Comment(4)
Hi, thanks for your contribution. I have two questions about your answer. First: my question was about parallel iteration over the results. Your solution does not provide support for that as-is, and I don't see how to easily add that due to your use of btree_set::range. Can you clarify how you would suggest to make your solution (rayon-)parallelizable? Second — and perhaps also because of the need for parallelism — I wonder why this solution would be correct without the use of e.g. a std::cell::Cell to inform the compiler about unexpected mutability.Danidania
@Danidania yes, connecting the concepts would be useful, wouldn't it? I've updated with an example. Does that answer both pieces?Leopard
Thanks again. Storing the indices in a vector seems suboptimal, and I'm not sure what par_bridge does internally, but it looks like it too is not as good as an optimal solution could be; if it is safe to index as you propose. Inspired by your solution, I think manually checking the lower and upper bound of the indices, and then using the par_iter directly on the BTreeSet would be better. But that's because I envision the BTreeSet to play nice with rayon's fork/join semantics, but I honestly get lost in the code implementing the parallel iterator. Regardless, thanks, +1Danidania
Is there a way to do this without having to pre-collect all the mutable pointers?, for example by implementing some sort of method that gives access to a mutable pointer of the array?Alula
L
1

I had a related issue. I needed to assign to arbitrary columns of a 2D array in parallel. I used ndarray myarray.axis_chunks_iter_mut(nd::Axis(1), 1) to iterate over every column.

Logical answered 14/1, 2021 at 22:7 Comment(0)
A
0

Since I've been dealing with a similar problem, here's my solution which I don't recommend using unless absolutely necessary:

struct EvilPtr<T> {
    ptr: *mut T,
}
impl<T> EvilPtr<T> {
    fn new(inp: &mut T) -> Self {
        EvilPtr { ptr: inp as *mut T }
    }
    unsafe fn deref(&self) -> *mut T {
        return self.ptr;
    }
}

unsafe impl<T> Sync for EvilPtr<T> {}
unsafe impl<T> Send for EvilPtr<T> {}

Now you can do:

let indices: [usize; 10] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let mut arr: [i32; 10] = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
let e = EvilPtr::new(&mut arr[0]);
unsafe {
    indices.par_iter().for_each(|x: &usize| {
        *e.deref().add(*x) += *x as i32;
    });
}
println!("{:?}", arr);

If you absolutely need to do this, I recommend you bury it under some user friendly interface, where you can be sure no error can occur.

Alula answered 23/10, 2020 at 15:19 Comment(5)
This code produces errors when run in Miri: error: Undefined Behavior: no item granting read access to tag <untagged> found in borrow stack / this indicates a potential bug in the program: it performed an invalid operation, but the rules it violated are still experimental.Leopard
Only tested it on my windows machine using rust 1.47. Yields the expected results.Alula
Yes, that’s one of the possible outcomes of undefined behavior. Another equally valid outcome is that your disk is erased.Leopard
Why is this undefined behavior? all I've done is create a mutable pointer and convince the compiler it's safe to share across threads. As far as I know, undefined behavior can only arise in this context if you point the pointer at something out of bounds.Alula
@Alula No it is not. Rust has strict (and complicated) aliasing rules, that you must always follow.Forelock
T
0

I have a naive approach to threading on disjoint subsets of vectors. It consists of the following high level steps:

  • create non-overlapping subsets
  • create threads that await as many elements on a channel as many we operate on in one thread
  • use iter_mut and send each mutable element into the right channel/thread
  • wait for the concurrent threads to complete before starting a new wave of threads

social golfing implemented in this manner:

use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

pub fn social_golf(myvec: &mut Vec<f64>) {
    let len_vec = myvec.len();
    let len_vec_p2 = len_vec / 2;
    let mut symmetry = false;
    if len_vec % 2 == 0 {
        symmetry = true;
    }

    // interact each element "x" with element "x+i (mod vec_len)"
    // i ~ distance of elements
    // note that we only have to iterate through distances 1.. len/2 if the interaction is commutative
    // results in all disjoint pairs interacting

    for i in 1..=len_vec_p2 {
        let mut exit_level = false;
        let mut done: HashSet<usize> = HashSet::new();

        // since we cant do (x+i, x+i+i) while x is started and x+i is being used,
        // we will need 2 or 3 waves of starting threads for each distance level

        while !exit_level {
            let mut skip: HashSet<usize> = HashSet::new();
            let mut pull: HashMap<usize, usize> = HashMap::new();

            let mut instructions_chan: Vec<mpsc::Sender<&mut f64>> = vec![];

            let mut first_open_channel: usize = 0;

            // find disjoint non-overlapping pairs
            // save done interactions in "done"
            // save currently used elements in "skip" to be avoided
            // if i == len/2 pairs are symmetric so save pair as "done" as well
            // pull contains the registry of thread number for each element so each pair goes to a matching thread

            for k in 0..len_vec {
                let pair = modular_offset_in_range(k as u32, i as u32, 0, (len_vec - 1) as u32);
                if !done.contains(&k) && !skip.contains(&k) && !skip.contains(&(pair as usize)) {
                    pull.insert(k, first_open_channel);
                    pull.insert(pair as usize, first_open_channel);
                    first_open_channel += 1;
                    done.insert(k);
                    skip.insert(k);
                    skip.insert(pair as usize);
                    if symmetry && i == len_vec_p2 {
                        done.insert(pair as usize);
                    }
                }
            }

            thread::scope(|s| {
                let mut handles: Vec<thread::ScopedJoinHandle<()>> = vec![];

                // spawn as many threads as many operations we could safely select for the current wave

                for _ in 0..pull.len() / 2 {
                    let (tx, rx) = mpsc::channel();
                    instructions_chan.push(tx);
                    let handle = s.spawn(move || {
                        println!("started thread");
                        let mut a = rx.recv().unwrap();
                        let mut b = rx.recv().unwrap();
                        play_golf(&mut a, &mut b);

                        let tsn3 = Duration::from_secs(2);
                        thread::sleep(tsn3);

                        println!("finished thread");
                    });
                    handles.push(handle);
                }

                // send each element to the thread that uses it

                for (k, a) in myvec.iter_mut().enumerate() {
                    if pull.contains_key(&k) {
                        instructions_chan[pull[&k]].send(a).unwrap();
                    }
                }

                // wait for current bunch of threads to complete before starting next wave on the current distance level

                for h in handles {
                    h.join().unwrap();
                }
            });

            // since we only inserted vector indexes into "done", if done.len == vec.len,
            // all elements have interacted with their pairs for the current distance

            if done.len() == len_vec {
                exit_level = true;
            }
        }
    }
}

pub fn modular_offset(begin: u32, offset: u32, modulo: u32) -> u32 {
    if begin + offset >= modulo {
        return begin + offset - modulo;
    } else {
        return begin + offset;
    }
}

pub fn modular_offset_in_range(begin: u32, offset: u32, modulo_begin: u32, modulo_end: u32) -> u32 {
    if begin + offset > modulo_end {
        return modulo_begin
            + modular_offset(begin - modulo_begin, offset, modulo_end - modulo_begin + 1);
    } else {
        return begin + offset;
    }
}

pub fn play_golf(a: &mut f64, b: &mut f64) {
    *a = *a + 101.0;
    *b = *b + 101.0;
}

fn main() {
    let mut myvec1 = vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0];

    let mut myvec2 = vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0];

    social_golf(&mut myvec1);

    println!("{:#?}", myvec1);

    social_golf(&mut myvec2);

    println!("{:#?}", myvec2);
}

playground link note the 1 second wait is only to show threads are started/finished concurrently not sequentially

Tasse answered 13/4 at 11:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.