Using crossbeam scoped threads with vector chunks

2019-09-10 17:31发布

I have a vector of tuples, which I split into chunks, then hand each chunk off to its own thread for processing, and recombine them at the end. I've been using the following code which uses std::thread, and it's working well, but requires a lot of cloning and reconstituting which I'd like to eliminate.

// convert_bng is just an expensive function
let orig: Vec<(&f32, &f32)>
// orig gets populated here
let mut guards: Vec<JoinHandle<Vec<(i32, i32)>>> = vec![];
    // split into slices
    let mut size = orig.len() / NUMTHREADS;
    if orig.len() % NUMTHREADS > 0 {
        size += 1;
    }
    // if orig.len() == 0, we need another adjustment
    size = std::cmp::max(1, size);
    for chunk in orig.chunks(size) {
        let chunk = chunk.to_owned();
        let g = thread::spawn(move || {
            chunk.into_iter()
                 .map(|elem| convert_bng(elem.0, elem.1))
                 .collect()
        });
        guards.push(g);
    }
    let mut result: Vec<IntTuple> = Vec::with_capacity(orig.len());
    for g in guards {
        result.extend(g.join()
                       .unwrap()
                       .into_iter()
                       .map(|ints| {
                           IntTuple {
                               a: ints.0 as u32,
                               b: ints.1 as u32,
                           }
                       }));
    }

Is is possible to simplify this with crossbeam or scoped_threadpool? Something like:

let mut size = orig.len() / NUMTHREADS;
if orig.len() % NUMTHREADS > 0 {
    size += 1;
}
size = std::cmp::max(1, size);
    crossbeam::scope(|scope| {
        for chunk in orig.chunks_mut(size) {
            scope.spawn(move || chunk.iter().map(|elem| convert_bng(elem.0, elem.1)).collect());
        }
    });
    let mut result = orig.into_iter()
                     .map(|ints| {
                         IntTuple {
                             a: ints.0 as u32,
                             b: ints.1 as u32,
                         }
                     })
                     .collect();
}

(edited with code from the question to which shepmaster linked)

However, this gives me an error in let result …:
casting &f32 as u32 is invalid, which indicates that the map call inside scope.spawn() isn't returning its result into the chunk, or is being discarded due to a type mismatch (the function returns (i32, i32), but orig holds (&f32, &f32)). If I substitute a valid dummy vector for result, I get a completely different error related to spawn:

error: unable to infer enough type information about `_`; type annotations or generic parameter binding required [E0282]
scope.spawn(move || chunk.iter().map(|elem| convert_bng(elem.0, elem.1)).collect());

2条回答
手持菜刀,她持情操
2楼-- · 2019-09-10 18:08

which indicates that the map call inside scope.spawn() isn't returning its result into the chunk

That's true, because that's not how that method works. The general idea is you establish a new scope and then you can spawn threads that are guaranteed to end before that scope does.

crossbeam::scope establishes the scope, and is defined as:

pub fn scope<'a, F, R>(f: F) -> R 
    where F: FnOnce(&Scope<'a>) -> R

That is, you give a closure to scope. The closure will be given a reference to a Scope. Whatever the closure returns will be returned from `scope.

Inside that closure, you can spawn threads with Scope::spawn:

fn spawn<F, T>(&self, f: F) -> ScopedJoinHandle<T> 
    where F: FnOnce() -> T + Send + 'a,
          T: Send + 'a

spawn takes a closure that takes no arguments and returns some type. That type will be the result of the spawn method (modulo some error handling).

Nothing of crossbeam inherently modifies any data, that's up to your code. So let's look at what you are doing in the thread:

chunk.iter().map(|elem| convert_bng(elem.0, elem.1)).collect()

You take your &mut [T] and iterate over it, converting each element by passing it to convert_bng. You don't have the definition of convert_bng, so let's say it returns a bool. You then collect this iterator of bools. However, there's a multitude of possible collections that you could be targeting, so we need to know what concrete collection is desired. In most cases, this is done by something like let foo: ConcreteCollection = iter.collect().

Since this collect is the last expression, it is also the return value of the spawn, so we can look to see how that return value is used. Turns out it isn't, which would be equivalent to just saying iterator.collect();, which doesn't have enough information to be compiled.

Beyond that, you need to decide if you are trying to modify a collection in-place or not. The fact that you do chunks_mut seems to indicate that you want each thread to perform work on the vector without any extra allocation. However, you ignore that it's mutable and return newly allocated collections (of an indeterminate type). Since I don't know what convert_bng returns, it's hard to say if it's even possible. Additionally, map is only used when transforming a type to a new type, not to mutate a value in place. You can't mutate a slice in-place to put a different type in there, as it would no longer be a slice!

查看更多
看我几分像从前
3楼-- · 2019-09-10 18:11

My current approach, which works well, is to re-write convert_bng to the following signature:
fn convert_bng(longitude: &f32, latitude: &f32) -> (i32, i32) { … }

Which then allows me to use crossbeam as follows:

pub extern "C" fn convert_to_bng(longitudes: Array, latitudes: Array) -> Array {
    let orig: Vec<(&f32, &f32)> = // orig is populated here
    let mut result = vec![(1, 1); orig.len()];
    let mut size = orig.len() / NUMTHREADS;
    if orig.len() % NUMTHREADS > 0 {
        size += 1;
    }
    size = std::cmp::max(1, size);
    crossbeam::scope(|scope| {
        for (res_chunk, orig_chunk) in result.chunks_mut(size).zip(orig.chunks(size)) {
            scope.spawn(move || {
                for (res_elem, orig_elem) in res_chunk.iter_mut().zip(orig_chunk.iter()) {
                    *res_elem = convert_bng(orig_elem.0, orig_elem.1);
                }
            });
        }
    });
    Array::from_vec(result)
}

This code is considerably more clear and simple, and runs at essentially the same speed (it's < 1% faster, on average) as the original.

The use of a pre-filled result vector doesn't seem ideal to me, but the approach of trying to mutate orig in-place doesn't seem fruitful: convert_bng's return type is unavoidably different to orig's, and trying to work around this is beyond me at the moment.

查看更多
登录 后发表回答