One solution is to create a stream combinator that ends the stream once some threshold of bytes has passed. Here's one possible implementation:
struct TakeBytes<S> {
inner: S,
seen: usize,
limit: usize,
}
impl<S> Stream for TakeBytes<S>
where
S: Stream<Item = Vec<u8>>,
{
type Item = Vec<u8>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.seen >= self.limit {
return Ok(Async::Ready(None)); // Stream is over
}
let inner = self.inner.poll();
if let Ok(Async::Ready(Some(ref v))) = inner {
self.seen += v.len();
}
inner
}
}
trait TakeBytesExt: Sized {
fn take_bytes(self, limit: usize) -> TakeBytes<Self>;
}
impl<S> TakeBytesExt for S
where
S: Stream<Item = Vec<u8>>,
{
fn take_bytes(self, limit: usize) -> TakeBytes<Self> {
TakeBytes {
inner: self,
limit,
seen: 0,
}
}
}
This can then be chained onto the stream before concat2
:
fn limited() -> impl Future<Item = Vec<u8>, Error = ()> {
some_bytes().take_bytes(999).concat2()
}
This implementation has caveats:
- it only works for
Vec<u8>
. You can introduce generics to make it more broadly applicable, of course.
- it allows for more bytes than the limit to come in, it just stops the stream after that point. Those types of decisions are application-dependent.
Another thing to keep in mind is that you want to attempt to tackle this problem as low as you can — if the source of the data has already allocated a gigabyte of memory, placing a limit won't help as much.
body.some_bytes()
, I get "no method named some_bytes found for type hyper::Body in the current scope" – Timberland