rocket/util/reader_stream.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::{Bytes, BytesMut};
use futures::stream::Stream;
use pin_project_lite::pin_project;
use tokio::io::AsyncRead;
pin_project! {
/// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
///
/// This stream is fused. It performs the inverse operation of
/// [`StreamReader`].
///
/// # Example
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// use tokio_stream::StreamExt;
/// use tokio_util::io::ReaderStream;
///
/// // Create a stream of data.
/// let data = b"hello, world!";
/// let mut stream = ReaderStream::new(&data[..]);
///
/// // Read all of the chunks into a vector.
/// let mut stream_contents = Vec::new();
/// while let Some(chunk) = stream.next().await {
/// stream_contents.extend_from_slice(&chunk?);
/// }
///
/// // Once the chunks are concatenated, we should have the
/// // original data.
/// assert_eq!(stream_contents, data);
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`StreamReader`]: crate::io::StreamReader
/// [`Stream`]: futures_core::Stream
#[derive(Debug)]
pub struct ReaderStream<R> {
// Reader itself.
//
// This value is `None` if the stream has terminated.
#[pin]
reader: R,
// Working buffer, used to optimize allocations.
buf: BytesMut,
capacity: usize,
done: bool,
}
}
impl<R: AsyncRead> ReaderStream<R> {
/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`,
/// with a specific read buffer initial capacity.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: futures_core::Stream
pub fn with_capacity(reader: R, capacity: usize) -> Self {
ReaderStream {
reader,
buf: BytesMut::with_capacity(capacity),
capacity,
done: false,
}
}
}
impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use tokio_util::io::poll_read_buf;
let mut this = self.as_mut().project();
if *this.done {
return Poll::Ready(None);
}
if this.buf.capacity() == 0 {
this.buf.reserve(*this.capacity);
}
let reader = this.reader;
match poll_read_buf(reader, cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
*this.done = true;
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
*this.done = true;
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
// fn size_hint(&self) -> (usize, Option<usize>) {
// self.reader.size_hint()
// }
}
impl<R: AsyncRead> hyper::body::Body for ReaderStream<R> {
type Data = bytes::Bytes;
type Error = std::io::Error;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
self.poll_next(cx).map_ok(hyper::body::Frame::data)
}
}