I want to terminate reading from a tokio::io::lines
stream. I merged it with a oneshot
future and terminated it, but tokio::run
was still working.
use futures::{sync::oneshot, *}; // 0.1.27
use std::{io::BufReader, time::Duration};
use tokio::prelude::*; // 0.1.21
fn main() {
let (tx, rx) = oneshot::channel::<()>();
let lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
let lines = lines.for_each(|item| {
println!("> {:?}", item);
Ok(())
});
std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(5000));
println!("system shutting down");
let _ = tx.send(());
});
let lines = lines.select2(rx);
tokio::run(lines.map(|_| ()).map_err(|_| ()));
}
How can I stop reading from this?
There's nothing wrong with your strategy, but it will only work with futures that don't execute a blocking operation via Tokio's blocking
(the traditional kind of blocking should never be done inside a future).
You can test this by replacing the tokio::io::lines(..)
future with a simple interval future:
let lines = Interval::new(Instant::now(), Duration::from_secs(1));
The problem is that tokio::io::Stdin
internally uses tokio_threadpool::blocking
.
When you use Tokio thread pool blocking (emphasis mine):
NB: The entire task that called blocking is blocked whenever the
supplied closure blocks, even if you have used future combinators such
as select
- the other futures in this task will not make progress
until the closure returns. If this is not desired, ensure that
blocking runs in its
own task (e.g. using
futures::sync::oneshot::spawn
).
Since this will block every other future in the combinator, your Receiver
will not be able to get a signal from the Sender
until the blocking ends.
Please see How can I read non-blocking from stdin? or you can use tokio-stdin-stdout, which creates a channel to consume data from stdin
thread. It also has a line-by-line example.
Thank you for your comment and correcting my sentences.
I tried to stop this non-blocking Future
and succeeded.
let lines = Interval::new(Instant::now(), Duration::from_secs(1));
My understating is that it would work for this case to wrap the blocking Future
with tokio threadpool::blocking
.
I'll try it later.
Thank you very much.