How to cancel an infinite stream from within the stream itself?
Asked Answered
C

3

6

I'm trying to cancel an interval (interval_timer) after emptying a queue but not sure what is the right strategy.

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

I tried drop as suggested in gitter but that ended up with an error:

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    if some_vars.len() == 1 {
        drop(interval_timer);
    }

    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

The error:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
   |
60 |     let mut interval_timer = tokio_timer::Timer::default();
   |         ------------------ captured outer variable
...
72 |                 drop(interval_timer);
   |                      ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
Confiscatory answered 12/3, 2018 at 12:57 Comment(0)
L
5

For cases where you want to cancel a stream from outside of the stream, see stream-cancel.


For your specific case, it's easiest to convert your collection into a stream and zip it together with the interval timer. This way, the resulting stream naturally stops when the collection is empty:

use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let some_vars = stream::iter_ok(some_vars.into_iter().rev());
        let combined = timer.zip(some_vars);

        combined.for_each(move |(_, item)| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}

Otherwise, you can stop the stream by using and_then to both remove the value from the collection and control if the stream should continue:

use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let limited = timer.and_then(move |_| {
            if some_vars.len() <= 4 {
                Err(())
            } else {
                some_vars.pop().ok_or(())
            }
        });

        limited.for_each(move |item| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}
Lydell answered 5/11, 2019 at 15:49 Comment(0)
S
0

I created a copy of Tokio's Interval struct, adding a reference to a method of my application to indicate when to interrupt early.

In my case, I want to interrupt the Interval to shutdown.

My Interval poll method looks like this:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
    if self.session.read().unwrap().shutdown {
        return Ok(Async::Ready(Some(Instant::now())));
    }

    // Wait for the delay to be done
    let _ = match self.delay.poll() {

Then you need to keep a handle on the task (call task = futures::task::current() when running inside the timeout task).

At any point you can then call task.notify() to kick the interval into action and hit your break out code, interrupting the Interval early.

Inside Interval there is a Delay struct that can be modified, you could create an Interval that you can interrupt and change the timeout, this way you could interrupt once and then continue.

Slavish answered 4/11, 2019 at 21:25 Comment(0)
L
-2

tokio_timer::Interval implements futures::Stream, so try to use the take_while method:

let s = timer
    .take_while(|()| 
        future::ok(is_net_completed()))
    .for_each(move |_| {
        println!("Woke up");
        // ...
    })
Lemma answered 13/3, 2018 at 12:18 Comment(5)
Can you explain further how this cancels the repeating interval?Lydell
I did try take_while, the issue I had with that was I couldn't use some_vars in take_while closure and also for_each (mutable) closure. If the ownership could be solved then it solves the immediate problem.Confiscatory
@Lydell If you need pause the interval stream then it could implemented by using filer method between take_while and for_each parts. Or exactly in for_each closure.Lemma
@Confiscatory So you need to synchronize shard object used for example types from std::sync::*Lemma
I tried take_while and it does not interrupt the interval, it still fires on the same schedule, if you put a println!() in the take_while closure this is pretty clearSlavish

© 2022 - 2024 — McMap. All rights reserved.