How do I run parallel threads of computation on a

2019-01-14 19:19发布

I'm trying to distribute an array across threads and have the threads sum up portions of the array in parallel. I want thread 0 to sum elements 0 1 2 and Thread 1 sum elements 3 4 5. Thread 2 to sum 6 and 7. and Thread 3 to sum 8 and 9.

I'm new to Rust but have coded with C/C++/Java before. I've literally thrown everything and the garbage sink at this program and I was hoping I could receive some guidance.

Sorry my code is sloppy but I will clean it up when it is a finished product. Please ignore all poorly named variables/inconsistent spacing/etc.

use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;

static NTHREADS: usize = 4;
static NPROCS: usize = 10;

fn main() {
    let mut a = [0; 10]; // a: [i32; 10]
    let mut endpoint = a.len() / NTHREADS;
    let mut remElements = a.len() % NTHREADS;

    for x in 0..a.len() {
        let secret_number = (rand::random::<i32>() % 100) + 1;
        a[x] = secret_number;
        println!("{}", a[x]);
    }
    let mut b = a;
    let mut x = 0;

    check_sum(&mut a);
    // serial_sum(&mut b);

    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let mut scale: usize = 0;

    for id in 0..NTHREADS {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();
        // Each thread will send its id via the channel

        Thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            let numTougherThreads: usize = NPROCS % NTHREADS;
            let numTasksPerThread: usize = NPROCS / NTHREADS;
            let mut lsum = 0;

            if id < numTougherThreads {
                let mut q = numTasksPerThread+1;
                lsum = 0;

                while q > 0 {
                    lsum = lsum + a[scale];
                    scale+=1;
                    q = q-1;
                }
                println!("Less than numToughThreads lsum: {}", lsum);
            }
            if id >= numTougherThreads {
                let mut z = numTasksPerThread;
                lsum = 0;

                while z > 0 {
                    lsum = lsum + a[scale];
                    scale +=1;
                    z = z-1;
                }    
                println!("Greater than numToughthreads lsum: {}", lsum);
            }
            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
            thread_tx.send(lsum).unwrap();
        });
    }

    // Here, all the messages are collected
    let mut globalSum = 0;
    let mut ids = Vec::with_capacity(NTHREADS);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages      available
        ids.push(rx.recv());
    }
    println!("Global Sum: {}", globalSum);
    // Show the order in which the messages were sent

    println!("ids: {:?}", ids);
}

fn check_sum (arr: &mut [i32]) {
    let mut sum = 0;
    let mut i = 0;
    let mut size = arr.len();
    loop {
        sum += arr[i];
        i+=1;
        if i == size { break; }
    }
    println!("CheckSum is {}", sum);
}

So far I've gotten it to do this much. Can't figure out why threads 0 and 1 have the same sum as well as 2 and 3 doing the same thing:

 -5
 -49
 -32
 99
 45
 -65
 -64
 -29
 -56
 65
 CheckSum is -91
 Greater than numTough lsum: -54
 thread 2 finished
 Less than numTough lsum: -86
 thread 1 finished
 Less than numTough lsum: -86
 thread 0 finished
 Greater than numTough lsum: -54
 thread 3 finished
 Global Sum: 0
 ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]

I managed to rewrite it to work with even numbers by using the below code.

    while q > 0 {
        if id*s+scale == a.len() { break; }
        lsum = lsum + a[id*s+scale];
        scale +=1;
        q = q-1;
    }
    println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
    let mut z = numTasksPerThread;
    lsum = 0;
    let mut scale = 0;

    while z > 0 {
        if id*numTasksPerThread+scale == a.len() { break; }
        lsum = lsum + a[id*numTasksPerThread+scale];
        scale = scale + 1;
        z = z-1;
    }

3条回答
聊天终结者
2楼-- · 2019-01-14 19:56

Using a crate like crossbeam you can write this code:

extern crate rand;
extern crate crossbeam;

use rand::distributions::{IndependentSample, Range};

const NTHREADS: usize = 4;

fn random_vec(length: u32) -> Vec<i32> {
    let between = Range::new(1, 101); // Better distribution than a modulo
    let rng = &mut rand::thread_rng();
    (0..length).map(|_| between.ind_sample(rng)).collect()
}

fn main() {
    let numbers = random_vec(10);
    let num_tasks_per_thread = numbers.len() / NTHREADS;

    crossbeam::scope(|scope| {
        // The `collect` is important to eagerly start the threads!
        let threads: Vec<_> = numbers
            .chunks(num_tasks_per_thread)
            .map(|chunk| scope.spawn(move || chunk.iter().cloned().sum::<i32>()))
            .collect();

        let thread_sum: i32 = threads.into_iter().map(|t| t.join()).sum();
        let no_thread_sum: i32 = numbers.iter().cloned().sum();

        println!("global sum via threads    : {}", thread_sum);
        println!("global sum single-threaded: {}", no_thread_sum);
    });
}

Scoped threads allow you to pass in a reference that is guaranteed to outlive the thread. You can then use the return value of the thread directly, skipping channels (which are great, just not needed here!).

I used the rand::distributions::Range to give a proper uniform distribution. I also changed it to be the range [1,100], as I think that's what you meant. However, your original code is actually [-98,100], which you could also do.

Iterator::sum is used to sum up an iterator of numbers.

I threw in some rough performance numbers of the thread work, ignoring the vector construction, working on 100,000,000 numbers:

| threads | time (ns) | relative time (%) |
|       1 |  24529055 |            100.00 |
|       2 |  19201725 |             78.28 |
|       3 |  19689506 |             80.27 |
|       4 |  18920759 |             77.14 |
|       5 |  19004255 |             77.48 |
|       6 |  19914923 |             81.19 |
|       7 |  18787972 |             76.59 |
|       8 |  19284852 |             78.62 |
查看更多
戒情不戒烟
3楼-- · 2019-01-14 19:56

All your tasks get a copy of the scale variable. Thread 1 and 2 both do the same thing since each has scale with a value of 0 and modifies it in the same manner as the other thread. The same goes for Thread 3 and 4.

Rust prevents you from breaking thread safety. If scale were shared by the threads, you would have race conditions when accessing the variable.

Please read about closures, they explain the variable copying part, and about threading which explains when and how you can share variables between threads.

查看更多
萌系小妹纸
4楼-- · 2019-01-14 20:00

Welcome to Rust! :)

Yeah at first I didn't realize each thread gets it's own copy of scale

Not only that! It also gets its own copy of a!

What you are trying to do could look like the following code. I guess it's easier for you to see a complete working example since you seem to be a Rust beginner and asked for guidance. I deliberately replaced [i32; 10] with a Vec since a Vec is not implicitly Copyable. It requires an explicit clone(); we cannot copy it by accident. Please note all the larger and smaller differences. The code also got a little more functional (less mut). I commented most of the noteworthy things:

extern crate rand;

use std::sync::Arc;
use std::sync::mpsc;
use std::thread;

const NTHREADS: usize = 4; // I replaced `static` by `const`

// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
    let mut s = 0;
    for x in iter {
        s += x;
    }
    s
}

fn main() {
    // We don't want to clone the whole vector into every closure.
    // So we wrap it in an `Arc`. This allows sharing it.
    // I also got rid of `mut` here by moving the computations into
    // the initialization.
    let a: Arc<Vec<_>> =
        Arc::new(
            (0..10)
                .map(|_| {
                    (rand::random::<i32>() % 100) + 1
                })
                .collect()
        );

    let (tx, rx) = mpsc::channel(); // types will be inferred

    { // local scope, we don't need the following variables outside
        let num_tasks_per_thread = a.len() / NTHREADS; // same here
        let num_tougher_threads = a.len() % NTHREADS; // same here
        let mut offset = 0;
        for id in 0..NTHREADS {
            let chunksize =
                if id < num_tougher_threads {
                    num_tasks_per_thread + 1
                } else {
                    num_tasks_per_thread
                };
            let my_a = a.clone();  // refers to the *same* `Vec`
            let my_tx = tx.clone();
            thread::spawn(move || {
                let end = offset + chunksize;
                let partial_sum =
                    sum( (&my_a[offset..end]).iter().cloned() );
                my_tx.send(partial_sum).unwrap();
            });
            offset += chunksize;
        }
    }

    // We can close this Sender
    drop(tx);

    // Iterator magic! Yay! global_sum does not need to be mutable
    let global_sum = sum(rx.iter());
    println!("global sum via threads    : {}", global_sum);
    println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}
查看更多
登录 后发表回答