How can I test a future that is bound to a tokio TcpStream?
Asked Answered
U

3

7

I have a future which wraps a TCP stream in a Framed using the LinesCodec.

When I try to wrap this in a test, I get the future blocking around 20% of the time, but because I have nothing listening on the socket I'm trying to connect to, I expect to always get the error:

thread 'tokio-runtime-worker-0' panicked at 'error: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }', src/lib.rs:35:24 note: Run with 'RUST_BACKTRACE=1' for a backtrace.

This is the test code I have used:

#[macro_use(try_ready)]
extern crate futures; // 0.1.24
extern crate tokio;   // 0.1.8

use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::TcpStream;
use tokio::prelude::*;

struct MyFuture {
    addr: SocketAddr,
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;
    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(TcpStream::connect(&self.addr).poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture { addr: addr }
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

playground

I have seen patterns in other languages where the test binary itself offers a mechanism to return results asynchronously, but haven't found a good way of using a similar mechanism in Rust.

Uprising answered 19/9, 2018 at 9:3 Comment(0)
I
5

A simple way to test async code may be to use a dedicated runtime for each test: start it, wait for future completion and shutdown the runtime at the end of the test.

#[test]
fn my_case() {

    // setup future f
    // ...

    tokio::run(f);
}

I don't know if there are consolidated patterns already in the Rust ecosystem; see this discussion about the evolution of testing support for future based code.

Why your code does not work as expected

When you invoke poll(), the future is queried to check if a value is available.

If a value is not available, an interest is registered so that poll() will be invoked again when something happens that can resolve the future.

When your MyFuture::poll() is invoked:

  1. TcpStream::connect creates a new future TcpStreamNew
  2. TcpStreamNew::poll is invoked immediately only once on the future's creation at step 1.
  3. The future goes out of scope, so the next time you invoke MyFuture::poll you never resolve the previously created futures.

You have registered an interest for a future that, if not resolved the first time you poll it, you never ask back again (poll) for a resolved value or for an error.

The reason of the "nondeterministic" behavior is because the first poll sometimes resolve immediately with a ConnectionRefused error and sometimes it waits forever for a future connection event or a failure that it is never retrieved.

Look at mio::sys::unix::tcp::TcpStream used by Tokio:

 impl TcpStream {
     pub fn connect(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
         set_nonblock(stream.as_raw_fd())?;

         match stream.connect(addr) {
             Ok(..) => {}
             Err(ref e) if e.raw_os_error() == Some(libc::EINPROGRESS) => {}
             Err(e) => return Err(e),
         }

         Ok(TcpStream {
             inner: stream,
         })
     }

When you connect on a non-blocking socket, the system call may connect/fail immediately or return EINPROGRESS, in this last case a poll must be triggered for retrieving the value of the error.

Ibadan answered 25/9, 2018 at 9:18 Comment(1)
maybe this answer need a little upgrade? Using tokio 1.0 and compiler says: "tokio::run(f); (run) not found in tokio"Rutger
I
4

The issue is not with the test but with the implementation.

This working test case based on yours has no custom future implementation and only calls TcpStream::connect(). It works as you expect it to.

extern crate futures;
extern crate tokio;

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;
    use std::net::SocketAddr;
    use tokio::net::TcpStream;
    use tokio::prelude::*;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = TcpStream::connect(&addr)
            .and_then(|f| {
                println!("connected");
                f.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

playground

You are connecting to the same endpoint over and over again in your poll() method. That's not how a future works. The poll() method will be called repeatedly, with the expectation that at some point it will return either Ok(Async::Ready(..)) or Err(..).

If you initiate a new TCP connection every time poll() is called, it will be unlikely to complete in time.

Here is a modified example that does what you expect:

#[macro_use(try_ready)]
extern crate futures;
extern crate tokio;
use std::io;
use std::net::SocketAddr;
use tokio::codec::{Framed, LinesCodec};
use tokio::net::{ConnectFuture, TcpStream};
use tokio::prelude::*;

struct MyFuture {
    tcp: ConnectFuture,
}

impl MyFuture {
    fn new(addr: SocketAddr) -> MyFuture {
        MyFuture {
            tcp: TcpStream::connect(&addr),
        }
    }
}

impl Future for MyFuture {
    type Item = Framed<TcpStream, LinesCodec>;
    type Error = io::Error;

    fn poll(&mut self) -> Result<Async<Framed<TcpStream, LinesCodec>>, io::Error> {
        let strm = try_ready!(self.tcp.poll());
        Ok(Async::Ready(Framed::new(strm, LinesCodec::new())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::net::Shutdown;

    #[test]
    fn connect() {
        let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
        let fut = MyFuture::new(addr)
            .and_then(|f| {
                println!("connected");
                let cn = f.get_ref();
                cn.shutdown(Shutdown::Both)
            }).map_err(|e| panic!("error: {:?}", e));
        tokio::run(fut)
    }
}

I'm not certain what you intend your future to do, though; I can't comment if it's the right approach.

Incogitable answered 22/9, 2018 at 13:6 Comment(1)
While i think this answer is really high quality too, I thought the accepted one shows an approach to achieve running futures in a harness, which is more valuable to me, and better targeted at the question. I was about to accept your answer, when I saw the other one come in, and it was a close call, because I do recognize that you answered first and correctly. I am sorry if this is a disappointment to you, and am really grateful for your response.Uprising
D
1

to some degree, you can drop in tokio's test library to make this easier; it supports async/await in unit tests.

#[tokio::test]
async fn my_future_test() {
  let addr: SocketAddr = "127.0.0.1:4222".parse().unwrap();
  match MyFuture { addr }.poll().await {
    Ok(f) => assert!("something good")
    Err(e) => assert!("something bad")
  }
}

https://docs.rs/tokio/0.3.3/tokio/attr.test.html

Dumfound answered 3/11, 2020 at 10:16 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.