3

I want to terminate reading from a tokio::io::lines stream. I merged it with a oneshot future and terminated it, but tokio::run was still working.

use futures::{sync::oneshot, *}; // 0.1.27
use std::{io::BufReader, time::Duration};
use tokio::prelude::*; // 0.1.21

fn main() {
    let (tx, rx) = oneshot::channel::<()>();
    let lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
    let lines = lines.for_each(|item| {
        println!("> {:?}", item);
        Ok(())
    });

    std::thread::spawn(move || {
        std::thread::sleep(Duration::from_millis(5000));
        println!("system shutting down");
        let _ = tx.send(());
    });

    let lines = lines.select2(rx);

    tokio::run(lines.map(|_| ()).map_err(|_| ()));
}

How can I stop reading from this?

New contributor
nkkr is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
1

There's nothing wrong with your strategy, but it will only work with futures that don't execute a blocking operation via Tokio's blocking (the traditional kind of blocking should never be done inside a future).

You can test this by replacing the tokio::io::lines(..) future with a simple interval future:

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

The problem is that tokio::io::Stdin internally uses tokio_threadpool::blocking .

When you use Tokio thread pool blocking (emphasis mine):

NB: The entire task that called blocking is blocked whenever the supplied closure blocks, even if you have used future combinators such as select - the other futures in this task will not make progress until the closure returns. If this is not desired, ensure that blocking runs in its own task (e.g. using futures::sync::oneshot::spawn).

Since this will block every other future in the combinator, your Receiver will not be able to get a signal from the Senderuntil the blocking ends.

Please see How can I read non-blocking from stdin? or you can use tokio-stdin-stdout, which creates a channel to consume data from stdin thread. It also has a line-by-line example.

0

Thank you for your comment and correcting my sentences.

I tried to stop this non-blocking Future and succeeded.

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

My understating is that it would work for this case to wrap the blocking Future with tokio threadpool::blocking. I'll try it later.

Thank you very much.

New contributor
nkkr is a new contributor to this site. Take care in asking for clarification, commenting, and answering. Check out our Code of Conduct.
  • Actually the idea is Inside a Future wrap the blocking behavior with Tokio's Blocking otherwise don't use it inside Future. Only thing to know that closure that you wrap with Tokio's Blocking is permitted to block indefinitely, in your case thats what Stdin does. – Ömer Erden Jun 13 at 14:21
  • Thanks a lot!. Now that I understand my way of thinking, I will study more. – nkkr Jun 13 at 22:28

Your Answer

nkkr is a new contributor. Be nice, and check out our Code of Conduct.

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.