I'm using tokio::runtime::current_thread::Runtime
and I want to able to run a future and stop the reactor in the same thread. The example on the page doesn't show how to stop the runtime. Is there any way I can do that?
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
The runtime will automatically shut down when when the future is complete if you use block_on
:
use std::time::{Duration, Instant};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15
fn main() {
let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");
let two_seconds_later = Instant::now() + Duration::from_secs(2);
runtime
.block_on({
Delay::new(two_seconds_later)
.inspect(|_| eprintln!("future complete"))
})
.expect("Unable to run future");
}
If you need to cancel a future, you can create something that will cause future poll
s to succeed. Here's a very simple (and probably not-very-performant) version of such a wrapper:
use std::{
sync::{Arc, Mutex},
thread,
time::{Duration, Instant},
};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15
fn main() {
let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");
let a_long_time = Instant::now() + Duration::from_secs(3600);
let future = Delay::new(a_long_time).inspect(|_| eprintln!("future complete"));
let (future, cancel) = Cancelable::new(future);
let another_thread = thread::spawn(|| {
eprintln!("Another thread started");
thread::sleep(Duration::from_secs(2));
eprintln!("Another thread canceling the future");
cancel();
eprintln!("Another thread exiting");
});
runtime.block_on(future).expect("Unable to run future");
another_thread.join().expect("The other thread panicked");
}
#[derive(Debug)]
struct Cancelable<F> {
inner: F,
info: Arc<Mutex<CancelInfo>>,
}
#[derive(Debug, Default)]
struct CancelInfo {
cancelled: bool,
task: Option<task::Task>,
}
impl<F> Cancelable<F> {
fn new(inner: F) -> (Self, impl FnOnce()) {
let info = Arc::new(Mutex::new(CancelInfo::default()));
let cancel = {
let info = info.clone();
move || {
let mut info = info.lock().unwrap();
info.cancelled = true;
if let Some(task) = &info.task {
task.notify();
}
}
};
let me = Cancelable { inner, info };
(me, cancel)
}
}
impl<F> Future for Cancelable<F>
where
F: Future<Item = ()>,
{
type Item = F::Item;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut info = self.info.lock().unwrap();
if info.cancelled {
Ok(Async::Ready(()))
} else {
let r = self.inner.poll();
if let Ok(Async::NotReady) = r {
info.task = Some(task::current());
}
r
}
}
}