How to do data-parallel image processing?

2019-09-20 03:28发布

I'm trying to write a simple data-parallel image process filter in Rust. I've got it working in single thread with the following code.

/// an example of a very simple filter
fn modular_filter_chunk(input: &[u16], slice_width: usize, slice_height: usize, mod_value: u16, output: &mut[u16]) {
    let size = slice_width*slice_height;
    for i in 0..size {
        output[i] = input[i] % mod_value;
    }
}

fn modular_filter_multi(input: &Vec<u16>, width: usize, height: usize, slice_num: usize, mod_value: u16, output: &mut Vec<u16>) {
    // divide image vertically to slices
    let height_per_slice = height / slice_num;
    let size_per_chunk = height_per_slice * width;
    let in_itr = input.chunks(size_per_chunk);
    let out_itr = output.chunks_mut(size_per_chunk);
    for (input, output) in in_itr.zip(out_itr) {
        modular_filter_chunk(input, width, height_per_slice, mod_value, output);
    }
}

fn main() {
    let width: usize = 1024;
    let height: usize = 1024;
    let input = vec![1234; width*height];
    let mut output = vec![0; width*height];
    modular_filter_multi(&input, width, height, 4, 73, &mut output);
}

Now I want to process the for loop in parallel, but I can't figure out a simple way to do this. I tried changing the for loop like the following but can't get through the compile error.

let mut handles = Vec::new();
for (input, output) in in_itr.zip(out_itr) {
    let h = std::thread::spawn(move || {
        modular_filter_chunk(input, width, height_per_slice, mod_value, output);
    });
    handles.push(h);
}
for handle in handles {
    handle.join().unwrap();
}

Compile error message

src\main.rs:25:21: 25:43 error: cannot infer an appropriate lifetime for lifetime parameter 'a in function call due to c
onflicting requirements
src\main.rs:25  let in_itr = input.chunks(size_per_chunk);
                                   ^~~~~~~~~~~~~~~~~~~~~~
src\main.rs:27:25: 27:44 note: first, the lifetime cannot outlive the method call at 27:24...
src\main.rs:27  for (input, output) in in_itr.zip(out_itr) {
                                       ^~~~~~~~~~~~~~~~~~~
note: in expansion of for loop expansion
src\main.rs:27:2: 29:3 note: expansion site
src\main.rs:27:25: 27:31 note: ...so that method receiver is valid for the method call
src\main.rs:27  for (input, output) in in_itr.zip(out_itr) {
                                       ^~~~~~
note: in expansion of for loop expansion
src\main.rs:27:2: 29:3 note: expansion site
src\main.rs:25:15: 25:20 note: but, the lifetime must be valid for the expression at 25:14...
src\main.rs:25  let in_itr = input.chunks(size_per_chunk);
                             ^~~~~
src\main.rs:25:15: 25:20 note: ...so that pointer is not dereferenced outside its lifetime
src\main.rs:25  let in_itr = input.chunks(size_per_chunk);
                             ^~~~~
error: aborting due to previous error
Could not compile `rust_multithread`.

How should I change my code to get the filter working in parallel?

标签: rust
2条回答
我命由我不由天
2楼-- · 2019-09-20 04:02

Let's look at the signature for thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where F: FnOnce() -> T,
          F: Send + 'static,
          T: Send + 'static

This states that spawn takes a type F that implements FnOnce (will be called exactly once) and when called will return something of type T. The types F and T must implement Send and must have at least the lifetime 'static.

The Send bound restricts the possible types to "types able to be transferred across thread boundaries" and 'static means that any references in the type must live for the entire lifetime of the program, from before main starts to after main exits.

This explains the error message you are getting: none of your references are guaranteed to live for the entire life of the program. In fact, they are guaranteed to not live that long.

When you spawn a thread, the new thread no longer has any concrete connection to the thread that spawned it. It is more than possible that the new thread will outlive the parent! This would cause Very Bad Things to happen if you tried to use the reference after the parent thread died.

As you found, thread::scoped provides a solution to this. Scoped threads are required to be joined before any of the references they contain expire. However, scoped threads are unstable for a reason: they are unsound in the presence of reference cycles. There was a RFC to bring it back, but there's some deep details around it, so it's been deferred.

As hamstergene points out, you can use Arc to safely share immutable data in stable Rust. You would need to use Mutex to share your mutable output buffer though. You can tell why people are excited to have thread::scoped reinstated!

查看更多
啃猪蹄的小仙女
3楼-- · 2019-09-20 04:09

OK I got it working with the following steps.

  1. Install Rust Nightly.
  2. Add #![feature(scoped)] to the top of the source code.
  3. Use the following code as a replacement for the modular_filter_chunk for loop.

    let mut handles = Vec::new();
    for (input, output) in in_itr.zip(out_itr) {
        let h = std::thread::scoped(move || {
            modular_filter_chunk(input, width, height_per_slice, mod_value, output);
        });
        handles.push(h);
    }
    for handle in handles {
        handle.join();
    }
    

I initially thought thread::scoped might be a solution, but since it was unstable I couldn't get it to compile. I wonder if there is a way to solve this without using thread::scoped or unsafe.

查看更多
登录 后发表回答