I'd like to spin up a Tokio event loop alongside a Rocket server, then add events to this loop later on. I read Is there a way to launch a tokio::Delay on a new thread to allow the main loop to continue?, but it's still not clear to me how to achieve my goal.
可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
回答1:
As the documentation states:
While
current_thread::Runtime
does not implementSend
and cannot safely be moved to other threads, it provides aHandle
that can be sent to other threads and allows to spawn new tasks from there.
Here is an example of spinning up the event loop in one thread and having a second thread spawn tasks on it. The tasks all start at staggered intervals but finish at the same time:
use std::{
thread,
time::{Duration, Instant},
};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15
fn main() {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let (handle_tx, handle_rx) = std::sync::mpsc::channel();
let tokio_thread = thread::spawn(move || {
let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");
eprintln!("Runtime created");
// Give a handle to the runtime to another thread.
handle_tx
.send(runtime.handle())
.expect("Unable to give runtime handle to another thread");
// Continue running until notified to shutdown
runtime
.spawn({ shutdown_rx.map_err(|e| panic!("Error on the shutdown channel: {:?}", e)) });
// Finish all pending tasks
runtime.run().expect("Unable to run the runtime");
eprintln!("Runtime finished");
});
let another_thread = thread::spawn(move || {
let handle = handle_rx
.recv()
.expect("Could not get a handle to the other thread's runtime");
eprintln!("Another thread created");
let two_seconds_after_creation = Instant::now() + Duration::from_secs(2);
for value in 0..10 {
// Run this future in the other thread's runtime
handle
.spawn(future::lazy(move || {
eprintln!("Starting task for value {}", value);
Delay::new(two_seconds_after_creation)
.inspect(move |_| eprintln!("Finishing task for value {}", value))
.map(drop)
.map_err(drop)
}))
.expect("Unable to spawn a new task on the runtime");
thread::sleep(Duration::from_millis(100));
}
eprintln!("Another thread finished");
});
another_thread.join().expect("Another thread panicked");
shutdown_tx
.send(())
.expect("Unable to shutdown runtime thread");
tokio_thread.join().expect("Tokio thread panicked");
}
Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Another thread finished
Finishing task for value 0
Finishing task for value 1
Finishing task for value 2
Finishing task for value 3
Finishing task for value 4
Finishing task for value 5
Finishing task for value 6
Finishing task for value 7
Finishing task for value 8
Finishing task for value 9
Runtime finished