How can I efficiently extract the first element of a futures::Stream in a blocking manner?
Asked Answered
A

1

2

I've got the following method:

pub fn load_names(&self, req: &super::MagicQueryType) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver<String>> {

My goal is to get the very first element of grpcio::ClientSStreamReceiver; I don't care about the other names:

let name: String = load_names(query)?.wait().nth(0)?;

It seems inefficient to call wait() before nth(0) as I believe wait() blocks the stream until it receives all the elements.

How can I write a more efficient solution (i.e., nth(0).wait()) without triggering build errors? Rust's build errors for futures::stream::Stream look extremely confusing to me.

The Rust playground doesn't support grpcio = "0.4.4" so I cannot provide a link.

Angy answered 2/3, 2019 at 22:55 Comment(1)
Just because the Playground doesn't have a crate doesn't mean that you shouldn't provide a MCVE.Mooncalf
M
4

To extract the first element of a futures::Stream in a blocking manner, you should convert the Stream to an iterator by calling executor::block_on_stream and then call Iterator::next.

use futures::{executor, stream, Stream}; // 0.3.4
use std::iter;

fn example() -> impl Stream<Item = i32> {
    stream::iter(iter::repeat(42))
}

fn main() {
    let v = executor::block_on_stream(example()).next();
    println!("{:?}", v);
}

If you are using Tokio, you can convert the Stream into a Future with StreamExt::into_future and annotate a function with #[tokio::main]:

use futures::{stream, Stream, StreamExt}; // 0.3.4
use std::iter;
use tokio; // 0.2.13

fn example() -> impl Stream<Item = i32> {
    stream::iter(iter::repeat(42))
}

#[tokio::main]
async fn just_one() -> Option<i32> {
    let (i, _stream) = example().into_future().await;
    i
}

fn main() {
    println!("{:?}", just_one());
}

See also:

Mooncalf answered 3/3, 2019 at 0:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.