I want to terminate reading from a tokio::io::lines stream. I merged it with a oneshot future and terminated it, but tokio::run was still working.

use futures::{sync::oneshot, *}; // 0.1.27
use std::{io::BufReader, time::Duration};
use tokio::prelude::*; // 0.1.21

fn main() {
    let (tx, rx) = oneshot::channel::<()>();
    let lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
    let lines = lines.for_each(|item| {
        println!("> {:?}", item);

    std::thread::spawn(move || {
        println!("system shutting down");
        let _ = tx.send(());

    let lines = lines.select2(rx);

    tokio::run(lines.map(|_| ()).map_err(|_| ()));

How can I stop reading from this?

There's nothing wrong with your strategy, but it will only work with futures that don't execute a blocking operation via Tokio's blocking (the traditional kind of blocking should never be done inside a future).

You can test this by replacing the tokio::io::lines(..) future with a simple interval future:

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

The problem is that tokio::io::Stdin internally uses tokio_threadpool::blocking .

When you use Tokio thread pool blocking (emphasis mine):

NB: The entire task that called blocking is blocked whenever the supplied closure blocks, even if you have used future combinators such as select - the other futures in this task will not make progress until the closure returns. If this is not desired, ensure that blocking runs in its own task (e.g. using futures::sync::oneshot::spawn).

Since this will block every other future in the combinator, your Receiver will not be able to get a signal from the Senderuntil the blocking ends.

Please see How can I read non-blocking from stdin? or you can use tokio-stdin-stdout, which creates a channel to consume data from stdin thread. It also has a line-by-line example.


Thank you for your comment and correcting my sentences.

I tried to stop this non-blocking Future and succeeded.

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

My understating is that it would work for this case to wrap the blocking Future with tokio threadpool::blocking. I'll try it later.

Thank you very much.

  • Actually the idea is Inside a Future wrap the blocking behavior with Tokio's Blocking otherwise don't use it inside Future. Only thing to know that closure that you wrap with Tokio's Blocking is permitted to block indefinitely, in your case thats what Stdin does. – Ömer Erden Jun 13 at 14:21
  • Thanks a lot!. Now that I understand my way of thinking, I will study more. – nkkr Jun 13 at 22:28

