Consider the following code
extern crate futures; // v0.1 (old)
use std::sync::{atomic, Arc};
use futures::*;
struct F(Arc<atomic::AtomicBool>);
impl Future for F {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
println!("Check if flag is set");
if self.0.load(atomic::Ordering::Relaxed) {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
}
fn main() {
let flag = Arc::new(atomic::AtomicBool::new(false));
let future = F(flag.clone());
::std::thread::spawn(move || {
::std::thread::sleep_ms(10);
println!("set flag");
flag.store(true, atomic::Ordering::Relaxed);
});
// ::std::thread::sleep_ms(20);
let result = future.wait();
println!("result: {:?}", result);
}
The spawned thread sets a flag, which the future waits for.
We also sleep the spawned thread, so the initial .poll()
call from .wait()
is before the flag is set. This causes .wait()
to block (seemingly) indefinitely. If we uncomment the other thread::sleep_ms
, .wait()
returns, and prints out the result (()
).
I would expect the current thread to try to resolve the future by calling poll
multiple times, since we're blocking the current thread. However, this is not happening.
I have tried to read some docs, and it seems like the problem is that the thread is park
ed after getting NotReady
from the poll
the first time. However, it is not clear to me why this is, or how it is possible to work around this.
What am I missing?
Why would you need to park a waiting future instead of polling it repeatedly? The answer is rather obvious, IMHO. Because at the end of the day it's faster and more efficient!
To repeatedly poll a future (which might be dubbed "busy-waiting") the library would have to decide whether to do it often or seldom and neither answer is satisfactory. Do it often and you're wasting the CPU cycles, do it seldom and the code is slow to react.
So yeah, you need to park the task when you're waiting for something and then unpark it when you've done waiting. Like this:
#![allow(deprecated)]
extern crate futures;
use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};
struct Status {
ready: bool,
task: Option<Task>,
}
#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);
impl Future for F {
type Item = ();
type Error = ();
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
println!("Check if flag is set");
let mut status = self.0.lock().expect("!lock");
if status.ready {
Ok(Async::Ready(()))
} else {
status.task = Some(park());
Ok(Async::NotReady)
}
}
}
#[test]
fn test() {
let flag = Arc::new(Mutex::new(Status {
ready: false,
task: None,
}));
let future = F(flag.clone());
::std::thread::spawn(move || {
::std::thread::sleep_ms(10);
println!("set flag");
let mut status = flag.lock().expect("!lock");
status.ready = true;
if let Some(ref task) = status.task {
task.unpark()
}
});
let result = future.wait();
println!("result: {:?}", result);
}
Note that Future::poll
is doing several things here: it's checking for an external condition and it's parking the task, so it's possible to have a race, like when:
- the
poll
checks the variable and finds it to be false
;
- the outer code sets the variable to
true
;
- the outer code checks if the task is parked and finds that it's not;
- the
poll
parks the task, but boom! it is too late, nobody is going to unpark it any longer.
In order to avoid any races, I've used a Mutex
to synchronize these interactions.
P.S. If all you need is to wrap a thread result into a Future
then consider using the oneshot
channel: it has the Receiver
that implements the Future
interface already.