How do I add tasks to a Tokio event loop that is r

2019-07-10 02:09发布

问题:

I'd like to spin up a Tokio event loop alongside a Rocket server, then add events to this loop later on. I read Is there a way to launch a tokio::Delay on a new thread to allow the main loop to continue?, but it's still not clear to me how to achieve my goal.

回答1:

As the documentation states:

While current_thread::Runtime does not implement Send and cannot safely be moved to other threads, it provides a Handle that can be sent to other threads and allows to spawn new tasks from there.

Here is an example of spinning up the event loop in one thread and having a second thread spawn tasks on it. The tasks all start at staggered intervals but finish at the same time:

use std::{
    thread,
    time::{Duration, Instant},
};
use tokio::{prelude::*, runtime::current_thread, timer::Delay}; // 0.1.15

fn main() {
    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
    let (handle_tx, handle_rx) = std::sync::mpsc::channel();

    let tokio_thread = thread::spawn(move || {
        let mut runtime = current_thread::Runtime::new().expect("Unable to create the runtime");

        eprintln!("Runtime created");

        // Give a handle to the runtime to another thread.
        handle_tx
            .send(runtime.handle())
            .expect("Unable to give runtime handle to another thread");

        // Continue running until notified to shutdown
        runtime
            .spawn({ shutdown_rx.map_err(|e| panic!("Error on the shutdown channel: {:?}", e)) });

        // Finish all pending tasks
        runtime.run().expect("Unable to run the runtime");

        eprintln!("Runtime finished");
    });

    let another_thread = thread::spawn(move || {
        let handle = handle_rx
            .recv()
            .expect("Could not get a handle to the other thread's runtime");

        eprintln!("Another thread created");
        let two_seconds_after_creation = Instant::now() + Duration::from_secs(2);

        for value in 0..10 {
            // Run this future in the other thread's runtime
            handle
                .spawn(future::lazy(move || {
                    eprintln!("Starting task for value {}", value);

                    Delay::new(two_seconds_after_creation)
                        .inspect(move |_| eprintln!("Finishing task for value {}", value))
                        .map(drop)
                        .map_err(drop)
                }))
                .expect("Unable to spawn a new task on the runtime");

            thread::sleep(Duration::from_millis(100));
        }

        eprintln!("Another thread finished");
    });

    another_thread.join().expect("Another thread panicked");

    shutdown_tx
        .send(())
        .expect("Unable to shutdown runtime thread");

    tokio_thread.join().expect("Tokio thread panicked");
}
Runtime created
Another thread created
Starting task for value 0
Starting task for value 1
Starting task for value 2
Starting task for value 3
Starting task for value 4
Starting task for value 5
Starting task for value 6
Starting task for value 7
Starting task for value 8
Starting task for value 9
Another thread finished
Finishing task for value 0
Finishing task for value 1
Finishing task for value 2
Finishing task for value 3
Finishing task for value 4
Finishing task for value 5
Finishing task for value 6
Finishing task for value 7
Finishing task for value 8
Finishing task for value 9
Runtime finished