rocket/util/
reader_stream.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::{Bytes, BytesMut};
5use futures::stream::Stream;
6use pin_project_lite::pin_project;
7use tokio::io::AsyncRead;
8
9pin_project! {
10    /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
11    ///
12    /// This stream is fused. It performs the inverse operation of
13    /// [`StreamReader`].
14    ///
15    /// # Example
16    ///
17    /// ```
18    /// # #[tokio::main]
19    /// # async fn main() -> std::io::Result<()> {
20    /// use tokio_stream::StreamExt;
21    /// use tokio_util::io::ReaderStream;
22    ///
23    /// // Create a stream of data.
24    /// let data = b"hello, world!";
25    /// let mut stream = ReaderStream::new(&data[..]);
26    ///
27    /// // Read all of the chunks into a vector.
28    /// let mut stream_contents = Vec::new();
29    /// while let Some(chunk) = stream.next().await {
30    ///    stream_contents.extend_from_slice(&chunk?);
31    /// }
32    ///
33    /// // Once the chunks are concatenated, we should have the
34    /// // original data.
35    /// assert_eq!(stream_contents, data);
36    /// # Ok(())
37    /// # }
38    /// ```
39    ///
40    /// [`AsyncRead`]: tokio::io::AsyncRead
41    /// [`StreamReader`]: crate::io::StreamReader
42    /// [`Stream`]: futures_core::Stream
43    #[derive(Debug)]
44    pub struct ReaderStream<R> {
45        // Reader itself.
46        //
47        // This value is `None` if the stream has terminated.
48        #[pin]
49        reader: R,
50        // Working buffer, used to optimize allocations.
51        buf: BytesMut,
52        capacity: usize,
53        done: bool,
54    }
55}
56
57impl<R: AsyncRead> ReaderStream<R> {
58    /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59    /// `Result<Bytes, std::io::Error>`,
60    /// with a specific read buffer initial capacity.
61    ///
62    /// [`AsyncRead`]: tokio::io::AsyncRead
63    /// [`Stream`]: futures_core::Stream
64    pub fn with_capacity(reader: R, capacity: usize) -> Self {
65        ReaderStream {
66            reader,
67            buf: BytesMut::with_capacity(capacity),
68            capacity,
69            done: false,
70        }
71    }
72}
73
74impl<R: AsyncRead> Stream for ReaderStream<R> {
75    type Item = std::io::Result<Bytes>;
76
77    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78        use tokio_util::io::poll_read_buf;
79
80        let mut this = self.as_mut().project();
81
82        if *this.done {
83            return Poll::Ready(None);
84        }
85
86        if this.buf.capacity() == 0 {
87            this.buf.reserve(*this.capacity);
88        }
89
90        let reader = this.reader;
91        match poll_read_buf(reader, cx, &mut this.buf) {
92            Poll::Pending => Poll::Pending,
93            Poll::Ready(Err(err)) => {
94                *this.done = true;
95                Poll::Ready(Some(Err(err)))
96            }
97            Poll::Ready(Ok(0)) => {
98                *this.done = true;
99                Poll::Ready(None)
100            }
101            Poll::Ready(Ok(_)) => {
102                let chunk = this.buf.split();
103                Poll::Ready(Some(Ok(chunk.freeze())))
104            }
105        }
106    }
107
108    // fn size_hint(&self) -> (usize, Option<usize>) {
109    //     self.reader.size_hint()
110    // }
111}
112
113impl<R: AsyncRead> hyper::body::Body for ReaderStream<R> {
114    type Data = bytes::Bytes;
115
116    type Error = std::io::Error;
117
118    fn poll_frame(
119        self: Pin<&mut Self>,
120        cx: &mut Context<'_>,
121    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
122        self.poll_next(cx).map_ok(hyper::body::Frame::data)
123    }
124}