rocket/util/reader_stream.rs
1use std::pin::Pin;
2use std::task::{Context, Poll};
3
4use bytes::{Bytes, BytesMut};
5use futures::stream::Stream;
6use pin_project_lite::pin_project;
7use tokio::io::AsyncRead;
8
9pin_project! {
10 /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
11 ///
12 /// This stream is fused. It performs the inverse operation of
13 /// [`StreamReader`].
14 ///
15 /// # Example
16 ///
17 /// ```
18 /// # #[tokio::main]
19 /// # async fn main() -> std::io::Result<()> {
20 /// use tokio_stream::StreamExt;
21 /// use tokio_util::io::ReaderStream;
22 ///
23 /// // Create a stream of data.
24 /// let data = b"hello, world!";
25 /// let mut stream = ReaderStream::new(&data[..]);
26 ///
27 /// // Read all of the chunks into a vector.
28 /// let mut stream_contents = Vec::new();
29 /// while let Some(chunk) = stream.next().await {
30 /// stream_contents.extend_from_slice(&chunk?);
31 /// }
32 ///
33 /// // Once the chunks are concatenated, we should have the
34 /// // original data.
35 /// assert_eq!(stream_contents, data);
36 /// # Ok(())
37 /// # }
38 /// ```
39 ///
40 /// [`AsyncRead`]: tokio::io::AsyncRead
41 /// [`StreamReader`]: crate::io::StreamReader
42 /// [`Stream`]: futures_core::Stream
43 #[derive(Debug)]
44 pub struct ReaderStream<R> {
45 // Reader itself.
46 //
47 // This value is `None` if the stream has terminated.
48 #[pin]
49 reader: R,
50 // Working buffer, used to optimize allocations.
51 buf: BytesMut,
52 capacity: usize,
53 done: bool,
54 }
55}
56
57impl<R: AsyncRead> ReaderStream<R> {
58 /// Convert an [`AsyncRead`] into a [`Stream`] with item type
59 /// `Result<Bytes, std::io::Error>`,
60 /// with a specific read buffer initial capacity.
61 ///
62 /// [`AsyncRead`]: tokio::io::AsyncRead
63 /// [`Stream`]: futures_core::Stream
64 pub fn with_capacity(reader: R, capacity: usize) -> Self {
65 ReaderStream {
66 reader,
67 buf: BytesMut::with_capacity(capacity),
68 capacity,
69 done: false,
70 }
71 }
72}
73
74impl<R: AsyncRead> Stream for ReaderStream<R> {
75 type Item = std::io::Result<Bytes>;
76
77 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
78 use tokio_util::io::poll_read_buf;
79
80 let mut this = self.as_mut().project();
81
82 if *this.done {
83 return Poll::Ready(None);
84 }
85
86 if this.buf.capacity() == 0 {
87 this.buf.reserve(*this.capacity);
88 }
89
90 let reader = this.reader;
91 match poll_read_buf(reader, cx, &mut this.buf) {
92 Poll::Pending => Poll::Pending,
93 Poll::Ready(Err(err)) => {
94 *this.done = true;
95 Poll::Ready(Some(Err(err)))
96 }
97 Poll::Ready(Ok(0)) => {
98 *this.done = true;
99 Poll::Ready(None)
100 }
101 Poll::Ready(Ok(_)) => {
102 let chunk = this.buf.split();
103 Poll::Ready(Some(Ok(chunk.freeze())))
104 }
105 }
106 }
107
108 // fn size_hint(&self) -> (usize, Option<usize>) {
109 // self.reader.size_hint()
110 // }
111}
112
113impl<R: AsyncRead> hyper::body::Body for ReaderStream<R> {
114 type Data = bytes::Bytes;
115
116 type Error = std::io::Error;
117
118 fn poll_frame(
119 self: Pin<&mut Self>,
120 cx: &mut Context<'_>,
121 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
122 self.poll_next(cx).map_ok(hyper::body::Frame::data)
123 }
124}