What is the best approach to encapsulate blocking

2019-01-09 16:22发布

I read the tokio documentation and I wonder what is the best approach for encapsulating costly synchronous I/O in a future.

With the reactor framework, we get the advantage of a green threading model: a few OS threads handle a lot of concurrent tasks through an executor.

The future model of tokio is demand driven, which means the future itself will poll its internal state to provide informations about its completion; allowing backpressure and cancellation capabilities. As far as I understand, the polling phase of the future must be non-blocking to work well.

The I/O I want to encapsulate can be seen as a long atomic and costly operation. Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

The two only options I see are:

  • Include the blocking I/O in the poll function of the future.
  • spawn an OS thread to perform the I/O and use the future mechanism to poll its state, as shown in the documentation

As I understand it, neither solution is optimal and don't get the full advantage of the green-threading model (first is not advised in documentation and second don't pass through the executor provided by reactor framework). Is there another solution?

1条回答
虎瘦雄心在
2楼-- · 2019-01-09 16:34

Ideally, an independent task would perform the I/O and the associated future would poll the I/O thread for the completion status.

Yes, this is what Tokio recommends and what crates like futures-cpupool and tokio-threadpool were created for. Note that this is not restricted to I/O, but is valid for any long-running synchronous task!

In this case, you schedule a closure to run in the pool. The pool itself performs the work to check to see if the blocking closure is completed yet and fulfills the Future trait.

extern crate futures;
extern crate futures_cpupool;

use futures::{future, Future};
use futures_cpupool::CpuPool;
use std::thread;
use std::time::Duration;

fn main() {
    let pool = CpuPool::new(8);

    let a = pool.spawn_fn(|| {
        thread::sleep(Duration::from_secs(3));
        future::ok::<_, ()>(3)
    });
    let b = pool.spawn_fn(|| {
        thread::sleep(Duration::from_secs(1));
        future::ok::<_, ()>(1)
    });

    let c = a.join(b).map(|(a, b)| a + b);

    let result = c.wait();
    println!("{:?}", result);
}

Note that this is not an efficient way of sleeping, it's just a placeholder for some blocking operation. If you actually need to sleep, use something like futures-timer or tokio-timer.

You can see that the total time is only 3 seconds:

$ time ./target/debug/example
Ok(4)

real    0m3.021s
user    0m0.007s
sys     0m0.010s

Likewise, you can use tokio-threadpool for the same result:

extern crate tokio; // 0.1.7
extern crate tokio_threadpool; // 0.1.2

use std::{thread, time::Duration};
use tokio::{prelude::*, runtime::Runtime};

fn delay_for(seconds: u64) -> impl Future<Item = u64, Error = tokio_threadpool::BlockingError> {
    future::poll_fn(move || {
        tokio_threadpool::blocking(|| {
            thread::sleep(Duration::from_secs(seconds));
            seconds
        })
    })
}

fn main() {
    let a = delay_for(3);
    let b = delay_for(1);
    let sum = a.join(b).map(|(a, b)| a + b);

    let mut runtime = Runtime::new().expect("Unable to start the runtime");
    let result = runtime.block_on(sum);
    println!("{:?}", result);
}

neither solution is optimal and don't get the full advantage of the green-threading model

That's correct - because you don't have something that is asynchronous! You are trying to combine two different methodologies and there has to be an ugly bit somewhere to translate between them.

second don't pass through the executor provided by reactor framework

I'm not sure what you mean here. There's only one executor in the example above; the one implicitly created by wait. The thread pool has some internal logic that checks to see if a thread is done, but that should only be triggered when the user's executor polls it.

查看更多
登录 后发表回答