How to give each CPU core mutable access to a port

2019-07-26 17:27发布

This question already has an answer here:

I've got an embarrassingly parallel bit of graphics rendering code that I would like to run across my CPU cores. I've coded up a test case (the function computed is nonsense) to explore how I might parallelize it. I'd like to code this using std Rust in order to learn about using std::thread. But, I don't understand how to give each thread a portion of the framebuffer. I'll put the full testcase code below, but I'll try to break it down first.

The sequential form is super simple:

let mut buffer0 = vec![vec![0i32; WIDTH]; HEIGHT];
for j in 0..HEIGHT {
    for i in 0..WIDTH {
        buffer0[j][i] = compute(i as i32,j as i32);
    }
}

I thought that it would help to make a buffer that was the same size, but re-arranged to be 3D & indexed by core first. This is the same computation, just a reordering of the data to show the workings.

let mut buffer1 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
for c in 0..num_logical_cores {
    for y in 0..y_per_core {
        let j = y*num_logical_cores + c;
        if j >= HEIGHT {
            break;
        }
        for i in 0..WIDTH {
            buffer1[c][y][i] = compute(i as i32,j as i32)
        }
    }
}

But, when I try to put the inner part of the code in a closure & create a thread, I get errors about the buffer & lifetimes. I basically don't understand what to do & could use some guidance. I want per_core_buffer to just temporarily refer to the data in buffer2 that belongs to that core & allow it to be written, synchronize all the threads & then read buffer2 afterwards. Is this possible?

let mut buffer2 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
let mut handles = Vec::new();
for c in 0..num_logical_cores {
    let per_core_buffer = &mut buffer2[c]; // <<< lifetime error
    let handle = thread::spawn(move || {
        for y in 0..y_per_core {
            let j = y*num_logical_cores + c;
            if j >= HEIGHT {
                break;
            }
            for i in 0..WIDTH {
                per_core_buffer[y][i] = compute(i as i32,j as i32)
            }
        }
    });
    handles.push(handle)
}
for handle in handles {
    handle.join().unwrap();
}

The error is this & I don't understand:

error[E0597]: `buffer2` does not live long enough
  --> src/main.rs:50:36
   |
50 |         let per_core_buffer = &mut buffer2[c]; // <<< lifetime error
   |                                    ^^^^^^^ borrowed value does not live long enough
...
88 | }
   | - borrowed value only lives until here
   |
   = note: borrowed value must be valid for the static lifetime...

The full testcase is:

extern crate num_cpus;
use std::time::Instant;
use std::thread;

fn compute(x: i32, y: i32) -> i32 {
    (x*y) % (x+y+10000)
}

fn main() {
    let num_logical_cores = num_cpus::get();
    const WIDTH: usize = 40000;
    const HEIGHT: usize = 10000;
    let y_per_core = HEIGHT/num_logical_cores + 1;

    // ------------------------------------------------------------
    // Serial Calculation...
    let mut buffer0 = vec![vec![0i32; WIDTH]; HEIGHT];
    let start0 = Instant::now();
    for j in 0..HEIGHT {
        for i in 0..WIDTH {
            buffer0[j][i] = compute(i as i32,j as i32);
        }
    }
    let dur0 = start0.elapsed();

    // ------------------------------------------------------------
    // On the way to Parallel Calculation...
    // Reorder the data buffer to be 3D with one 2D region per core.
    let mut buffer1 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
    let start1 = Instant::now();
    for c in 0..num_logical_cores {
        for y in 0..y_per_core {
            let j = y*num_logical_cores + c;
            if j >= HEIGHT {
                break;
            }
            for i in 0..WIDTH {
                buffer1[c][y][i] = compute(i as i32,j as i32)
            }
        }
    }
    let dur1 = start1.elapsed();

    // ------------------------------------------------------------
    // Actual Parallel Calculation...
    let mut buffer2 = vec![vec![vec![0i32; WIDTH]; y_per_core]; num_logical_cores];
    let mut handles = Vec::new();
    let start2 = Instant::now();
    for c in 0..num_logical_cores {
        let per_core_buffer = &mut buffer2[c]; // <<< lifetime error
        let handle = thread::spawn(move || {
            for y in 0..y_per_core {
                let j = y*num_logical_cores + c;
                if j >= HEIGHT {
                    break;
                }
                for i in 0..WIDTH {
                    per_core_buffer[y][i] = compute(i as i32,j as i32)
                }
            }
        });
        handles.push(handle)
    }
    for handle in handles {
        handle.join().unwrap();
    }
    let dur2 = start2.elapsed();

    println!("Runtime: Serial={0:.3}ms, AlmostParallel={1:.3}ms, Parallel={2:.3}ms",
             1000.*dur0.as_secs() as f64 + 1e-6*(dur0.subsec_nanos() as f64),
             1000.*dur1.as_secs() as f64 + 1e-6*(dur1.subsec_nanos() as f64),
             1000.*dur2.as_secs() as f64 + 1e-6*(dur2.subsec_nanos() as f64));

    // Sanity check
    for j in 0..HEIGHT {
        let c = j % num_logical_cores;
        let y = j / num_logical_cores;
        for i in 0..WIDTH {
            if buffer0[j][i] != buffer1[c][y][i] {
                println!("wtf1? {0} {1} {2} {3}",i,j,buffer0[j][i],buffer1[c][y][i])
            }
            if buffer0[j][i] != buffer2[c][y][i] {
                println!("wtf2? {0} {1} {2} {3}",i,j,buffer0[j][i],buffer2[c][y][i])
            }
        }
    }

}

标签: rust
1条回答
兄弟一词,经得起流年.
2楼-- · 2019-07-26 17:48

Thanks to @Shepmaster for the pointers and clarification that this is not an easy problem for Rust, and that I needed to consider crates to find a reasonable solution. I'm only just starting out in Rust, so this really wasn't clear to me.

I liked the ability to control the number of threads that scoped_threadpool gives, so I went with that. Translating my code from above directly, I tried to use the 4D buffer with core as the most-significant-index and that ran into troubles because that 3D vector does not implement the Copy trait. The fact that it implements Copy makes me concerned about performance, but I went back to the original problem and implemented it more directly & found a reasonable speedup by making each row a thread. Copying each row will not be a large memory overhead.

The code that works for me is:

let mut buffer2 = vec![vec![0i32; WIDTH]; HEIGHT];
let mut pool = Pool::new(num_logical_cores as u32);
pool.scoped(|scope| {
    let mut y = 0;
    for e in &mut buffer2 {
        scope.execute(move || {
            for x in 0..WIDTH {
                (*e)[x] = compute(x as i32,y as i32);
            }
        });
        y += 1;
    }
});

On a 6 core, 12 thread i7-8700K for 400000x4000 testcase this runs in 3.2 seconds serially & 481ms in parallel--a reasonable speedup.

EDIT: I continued to think about this issue and got a suggestion from Rustlang on twitter that I should consider rayon. I converted my code to rayon and got similar speedup with the following code.

let mut buffer2 = vec![vec![0i32; WIDTH]; HEIGHT];
buffer2
    .par_iter_mut()
    .enumerate()
    .map(|(y,e): (usize, &mut Vec<i32>)| {
        for x in 0..WIDTH {
            (*e)[x] = compute(x as i32,y as i32);
        }
    })
    .collect::<Vec<_>>();
查看更多
登录 后发表回答