Controlling the number of spawned futures to create backpressure
Asked Answered
O

1

10

I am using a futures-rs powered version of the Rusoto AWS Kinesis library. I need to spawn a deep pipeline of AWS Kinesis requests to achieve high-throughput because Kinesis has a limit of 500 records per HTTP request. Combined with the 50ms latency of sending a request, I need to start generating many concurrent requests. I am looking to create somewhere on the order of 100 in-flight requests.

The Rusoto put_records function signature looks like this:

fn put_records(
    &self,
    input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>

The RusotoFuture is a wrapper defined like this:

/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
    inner: Box<Future<Item = T, Error = E> + 'static>,
}

The inner Future is wrapped but the RusutoFuture still implements Future::poll(), so I believe it is compatible with the futures-rs ecosystem. The RusotoFuture provides a synchronization call:

impl<T, E> RusotoFuture<T, E> {
    /// Blocks the current thread until the future has resolved.
    ///
    /// This is meant to provide a simple way for non-async consumers
    /// to work with rusoto.
    pub fn sync(self) -> Result<T, E> {
        self.wait()
    }
}

I can issue a request and sync() it, getting the result from AWS. I would like to create many requests, put them in some kind of queue/list, and gather finished requests. If the request errored I need to reissue the request (this is somewhat normal in Kinesis, especially when hitting limits on your shard throughput). If the request is completed successfully I should issue a request with new data. I could spawn a thread for each request and sync it but that seems inefficient when I have the async IO thread running.

I have tried using futures::sync::mpsc::channel from my application thread (not running from inside the Tokio reactor) but whenever I clone the tx it generates its own buffer, eliminating any kind of backpressure on send:

fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
    use futures::sync::mpsc::{ channel, spawn };
    use futures::{ Sink, Future, Stream };
    use futures::stream::Sender;
    use rusoto_core::reactor::DEFAULT_REACTOR;

    let client = Arc::new(KinesisClient::simple(Region::UsWest2));
    let data = FauxData::new(); // a data generator for testing

    let (mut tx, mut rx) = channel(1);

    for rec in data {
        tx.clone().send(rec);
    }
}

Without the clone, I have the error:

error[E0382]: use of moved value: `tx`
   --> src/main.rs:150:9
    |
150 |         tx.send(rec);
    |         ^^ value moved here in previous iteration of loop
    |
    = note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait

I have also look at futures::mpsc::sync::spawn based on recommendations but it takes owner ship of the rx (as a Stream) and does not solve my problem with the Copy of tx causing unbounded behavior.

I'm hoping if I can get the channel/spawn usage working, I will have a system which takes RusotoFutures, waits for them to complete, and then provides me an easy way to grab completion results from my application thread.

Outermost answered 15/1, 2018 at 16:54 Comment(7)
Did you have a look at Stream::buffered yet? Combine it with a (possibly unsync) channel, and maybe it does what you need. You'll probably need to share the handle to the mpsc::Sender through (Rc<RefCell<..>> or Arc<Mutex<..>>) anyway.Lowrance
@Lowrance why do you need the Rc / Arc? Shouldn't you be able to clone the Sender?Sideburns
@Sideburns Right, it already does this itself, clone should be fine. I guess "whenever I clone the tx it generates its own buffer" confused me (that shouldn't be true, the buffer should get shared).Lowrance
Also channel(1) is a really small buffer considering you're building cyclic dependencies: if a request can trigger a new request, but has to wait to push a new one until it finishes, it will block forever. I'd use unbounded() instead.Lowrance
Good point about the request triggering a new request causing a deadlock. For now I'm just trying to get the core idea of a backpressure-pipeline of requests. I will take note of this as a future problem.Outermost
@Lowrance @Sideburns the generate its own buffer came from this bit of documentation: ` The channel capacity is equal to buffer + num-senders. In other words, each sender gets a guaranteed slot in the channel capacity, and on top of that there are buffer`.Outermost
@Outermost can you edit your question to explain why using Stream::buffered doesn't work?Sideburns
L
1

As far as I can tell your problem with channel is not that a single clone of the Sender increase the capacity by one, it is that you clone the Sender for every item you're trying to send.

The error you're seeing without clone comes from your incorrect usage of the Sink::send interface. With clone you actually should see the warning:

warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled

That is: your current code doesn't actually ever send anything!

In order to apply backpressure you need to chain those send calls; each one should wait until the previous one finished (and you need to wait for the last one too!); on success you'll get the Sender back. The best way to do this is to generate a Stream from your iterator by using iter_ok and to pass it to send_all.

Now you got one future SendAll that you need to "drive". If you ignore the result and panic on error (.then(|r| { r.unwrap(); Ok::<(), ()>(()) })) you could spawn it as a separate task, but maybe you want to integrate it into your main application (i.e. return it in a Box).

// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))

RusotoFuture::sync and Future::wait

Don't use Future::wait: it is already deprecated in a branch, and it usually won't do what you actually are looking for. I doubt RusotoFuture is aware of the problems, so I recommend avoiding RusotoFuture::sync.

Cloning Sender increases channel capacity

As you correctly stated cloning Sender increases the capacity by one.

This seems to be done to improve performance: A Sender starts in the unblocked ("unparked") state; if a Sender isn't blocked it can send an item without blocking. But if the number of items in the queue hits the configured limit when a Sender sends an item, the Sender becomes blocked ("parked"). (Removing items from the queue will unblock the Sender at a certain time.)

This means that after the inner queue hits the limit each Sender still can send one item, which leads to the documented effect of increased capacity, but only if actually all the Senders are sending items - unused Senders don't increase the observed capacity.

The performance boost comes from the fact that as long as you don't hit the limit it doesn't need to park and notify tasks (which is quite heavy).

The private documentation at the top of the mpsc module describes more of the details.

Lowrance answered 30/1, 2018 at 10:14 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.