I'm trying to distribute an array across threads and have the threads sum up portions of the array in parallel. I want thread 0 to sum elements 0 1 2 and Thread 1 sum elements 3 4 5. Thread 2 to sum 6 and 7. and Thread 3 to sum 8 and 9.
I'm new to Rust but have coded with C/C++/Java before. I've literally thrown everything and the garbage sink at this program and I was hoping I could receive some guidance.
Sorry my code is sloppy but I will clean it up when it is a finished product. Please ignore all poorly named variables/inconsistent spacing/etc.
use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;
static NTHREADS: usize = 4;
static NPROCS: usize = 10;
fn main() {
let mut a = [0; 10]; // a: [i32; 10]
let mut endpoint = a.len() / NTHREADS;
let mut remElements = a.len() % NTHREADS;
for x in 0..a.len() {
let secret_number = (rand::random::<i32>() % 100) + 1;
a[x] = secret_number;
println!("{}", a[x]);
}
let mut b = a;
let mut x = 0;
check_sum(&mut a);
// serial_sum(&mut b);
// Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
// where `T` is the type of the message to be transferred
// (type annotation is superfluous)
let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
let mut scale: usize = 0;
for id in 0..NTHREADS {
// The sender endpoint can be copied
let thread_tx = tx.clone();
// Each thread will send its id via the channel
Thread::spawn(move || {
// The thread takes ownership over `thread_tx`
// Each thread queues a message in the channel
let numTougherThreads: usize = NPROCS % NTHREADS;
let numTasksPerThread: usize = NPROCS / NTHREADS;
let mut lsum = 0;
if id < numTougherThreads {
let mut q = numTasksPerThread+1;
lsum = 0;
while q > 0 {
lsum = lsum + a[scale];
scale+=1;
q = q-1;
}
println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
let mut z = numTasksPerThread;
lsum = 0;
while z > 0 {
lsum = lsum + a[scale];
scale +=1;
z = z-1;
}
println!("Greater than numToughthreads lsum: {}", lsum);
}
// Sending is a non-blocking operation, the thread will continue
// immediately after sending its message
println!("thread {} finished", id);
thread_tx.send(lsum).unwrap();
});
}
// Here, all the messages are collected
let mut globalSum = 0;
let mut ids = Vec::with_capacity(NTHREADS);
for _ in 0..NTHREADS {
// The `recv` method picks a message from the channel
// `recv` will block the current thread if there no messages available
ids.push(rx.recv());
}
println!("Global Sum: {}", globalSum);
// Show the order in which the messages were sent
println!("ids: {:?}", ids);
}
fn check_sum (arr: &mut [i32]) {
let mut sum = 0;
let mut i = 0;
let mut size = arr.len();
loop {
sum += arr[i];
i+=1;
if i == size { break; }
}
println!("CheckSum is {}", sum);
}
So far I've gotten it to do this much. Can't figure out why threads 0 and 1 have the same sum as well as 2 and 3 doing the same thing:
-5
-49
-32
99
45
-65
-64
-29
-56
65
CheckSum is -91
Greater than numTough lsum: -54
thread 2 finished
Less than numTough lsum: -86
thread 1 finished
Less than numTough lsum: -86
thread 0 finished
Greater than numTough lsum: -54
thread 3 finished
Global Sum: 0
ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]
I managed to rewrite it to work with even numbers by using the below code.
while q > 0 {
if id*s+scale == a.len() { break; }
lsum = lsum + a[id*s+scale];
scale +=1;
q = q-1;
}
println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
let mut z = numTasksPerThread;
lsum = 0;
let mut scale = 0;
while z > 0 {
if id*numTasksPerThread+scale == a.len() { break; }
lsum = lsum + a[id*numTasksPerThread+scale];
scale = scale + 1;
z = z-1;
}
Using a crate like crossbeam you can write this code:
Scoped threads allow you to pass in a reference that is guaranteed to outlive the thread. You can then use the return value of the thread directly, skipping channels (which are great, just not needed here!).
I used the
rand::distributions::Range
to give a proper uniform distribution. I also changed it to be the range [1,100], as I think that's what you meant. However, your original code is actually [-98,100], which you could also do.Iterator::sum
is used to sum up an iterator of numbers.I threw in some rough performance numbers of the thread work, ignoring the vector construction, working on 100,000,000 numbers:
All your tasks get a copy of the
scale
variable. Thread 1 and 2 both do the same thing since each hasscale
with a value of0
and modifies it in the same manner as the other thread. The same goes for Thread 3 and 4.Rust prevents you from breaking thread safety. If
scale
were shared by the threads, you would have race conditions when accessing the variable.Please read about closures, they explain the variable copying part, and about threading which explains when and how you can share variables between threads.
Welcome to Rust! :)
Not only that! It also gets its own copy of
a
!What you are trying to do could look like the following code. I guess it's easier for you to see a complete working example since you seem to be a Rust beginner and asked for guidance. I deliberately replaced
[i32; 10]
with aVec
since aVec
is not implicitlyCopy
able. It requires an explicitclone()
; we cannot copy it by accident. Please note all the larger and smaller differences. The code also got a little more functional (lessmut
). I commented most of the noteworthy things: