I am using a futures-rs powered version of the Rusoto AWS Kinesis library. I need to spawn a deep pipeline of AWS Kinesis requests to achieve high-throughput because Kinesis has a limit of 500 records per HTTP request. Combined with the 50ms latency of sending a request, I need to start generating many concurrent requests. I am looking to create somewhere on the order of 100 in-flight requests.
The Rusoto put_records
function signature looks like this:
fn put_records(
&self,
input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>
The RusotoFuture
is a wrapper defined like this:
/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> {
inner: Box<Future<Item = T, Error = E> + 'static>,
}
The inner Future
is wrapped but the RusutoFuture
still implements Future::poll()
, so I believe it is compatible with the futures-rs
ecosystem. The RusotoFuture
provides a synchronization call:
impl<T, E> RusotoFuture<T, E> {
/// Blocks the current thread until the future has resolved.
///
/// This is meant to provide a simple way for non-async consumers
/// to work with rusoto.
pub fn sync(self) -> Result<T, E> {
self.wait()
}
}
I can issue a request and sync()
it, getting the result from AWS. I would like to create many requests, put them in some kind of queue/list, and gather finished requests. If the request errored I need to reissue the request (this is somewhat normal in Kinesis, especially when hitting limits on your shard throughput). If the request is completed successfully I should issue a request with new data. I could spawn a thread for each request and sync it but that seems inefficient when I have the async IO thread running.
I have tried using futures::sync::mpsc::channel
from my application thread (not running from inside the Tokio reactor) but whenever I clone the tx
it generates its own buffer, eliminating any kind of backpressure on send
:
fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) {
use futures::sync::mpsc::{ channel, spawn };
use futures::{ Sink, Future, Stream };
use futures::stream::Sender;
use rusoto_core::reactor::DEFAULT_REACTOR;
let client = Arc::new(KinesisClient::simple(Region::UsWest2));
let data = FauxData::new(); // a data generator for testing
let (mut tx, mut rx) = channel(1);
for rec in data {
tx.clone().send(rec);
}
}
Without the clone, I have the error:
error[E0382]: use of moved value: `tx`
--> src/main.rs:150:9
|
150 | tx.send(rec);
| ^^ value moved here in previous iteration of loop
|
= note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait
I have also look at futures::mpsc::sync::spawn
based on recommendations but it takes owner ship of the rx
(as a Stream
) and does not solve my problem with the Copy
of tx
causing unbounded behavior.
I'm hoping if I can get the channel
/spawn
usage working, I will have a system which takes RusotoFuture
s, waits for them to complete, and then provides me an easy way to grab completion results from my application thread.
Stream::buffered
yet? Combine it with a (possiblyunsync
) channel, and maybe it does what you need. You'll probably need to share the handle to thempsc::Sender
through (Rc<RefCell<..>>
orArc<Mutex<..>>
) anyway. – LowranceRc
/Arc
? Shouldn't you be able to clone theSender
? – Sideburnsclone
should be fine. I guess "whenever Iclone
the tx it generates its own buffer" confused me (that shouldn't be true, the buffer should get shared). – Lowrancechannel(1)
is a really small buffer considering you're building cyclic dependencies: if a request can trigger a new request, but has to wait to push a new one until it finishes, it will block forever. I'd useunbounded()
instead. – LowranceStream::buffered
doesn't work? – Sideburns