rocket/response/stream/
reader.rs

1use std::{fmt, io};
2use std::task::{Context, Poll};
3use std::pin::Pin;
4
5use futures::stream::Stream;
6use tokio::io::{AsyncRead, ReadBuf};
7use pin_project_lite::pin_project;
8
9use crate::request::Request;
10use crate::response::{self, Response, Responder};
11use crate::response::stream::One;
12
13pin_project! {
14    /// An async reader that reads from a stream of async readers.
15    ///
16    /// A `ReaderStream` can be constructed from any [`Stream`] of items of type
17    /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using
18    /// [`ReaderStream::one()`]. The `AsyncRead` implementation of
19    /// `ReaderStream` progresses the stream forward, returning the contents of
20    /// the inner readers. Thus, a `ReaderStream` can be thought of as a
21    /// _flattening_ of async readers.
22    ///
23    /// `ReaderStream` is designed to be used as a building-block for
24    /// stream-based responders by acting as the `streamed_body` of a
25    /// `Response`, though it may also be used as a responder itself.
26    ///
27    /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
28    ///
29    /// ```rust
30    /// use std::io::Cursor;
31    ///
32    /// use rocket::{Request, Response};
33    /// use rocket::futures::stream::{Stream, StreamExt};
34    /// use rocket::response::{self, Responder, stream::ReaderStream};
35    /// use rocket::http::ContentType;
36    ///
37    /// struct MyStream<S>(S);
38    ///
39    /// impl<'r, S: Stream<Item = String>> Responder<'r, 'r> for MyStream<S>
40    ///     where S: Send + 'r
41    /// {
42    ///     fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
43    ///         Response::build()
44    ///             .header(ContentType::Text)
45    ///             .streamed_body(ReaderStream::from(self.0.map(Cursor::new)))
46    ///             .ok()
47    ///     }
48    /// }
49    /// ```
50    ///
51    /// # Responder
52    ///
53    /// `ReaderStream` is a (potentially infinite) responder. No `Content-Type`
54    /// is set. The body is [unsized](crate::response::Body#unsized), and values
55    /// are sent as soon as they are yielded by the internal stream.
56    ///
57    /// # Example
58    ///
59    /// ```rust
60    /// # use rocket::*;
61    /// use rocket::response::stream::ReaderStream;
62    /// use rocket::futures::stream::{repeat, StreamExt};
63    /// use rocket::tokio::time::{self, Duration};
64    /// use rocket::tokio::fs::File;
65    ///
66    /// // Stream the contents of `safe/path` followed by `another/safe/path`.
67    /// #[get("/reader/stream")]
68    /// fn stream() -> ReaderStream![File] {
69    ///     ReaderStream! {
70    ///         let paths = &["safe/path", "another/safe/path"];
71    ///         for path in paths {
72    ///             if let Ok(file) = File::open(path).await {
73    ///                 yield file;
74    ///             }
75    ///         }
76    ///     }
77    /// }
78    ///
79    /// // Stream the contents of the file `safe/path`. This is identical to
80    /// // returning `File` directly; Rocket responders stream and never buffer.
81    /// #[get("/reader/stream/one")]
82    /// async fn stream_one() -> std::io::Result<ReaderStream![File]> {
83    ///     let file = File::open("safe/path").await?;
84    ///     Ok(ReaderStream::one(file))
85    /// }
86    /// ```
87    ///
88    /// The syntax of [`ReaderStream!`] as an expression is identical to that of
89    /// [`stream!`](crate::response::stream::stream).
90    pub struct ReaderStream<S: Stream> {
91        #[pin]
92        stream: S,
93        #[pin]
94        state: State<S::Item>,
95    }
96}
97
98pin_project! {
99    #[project = StateProjection]
100    #[derive(Debug)]
101    enum State<R> {
102        Pending,
103        Reading { #[pin] reader: R },
104        Done,
105    }
106}
107
108impl<R: Unpin> ReaderStream<One<R>> {
109    /// Create a `ReaderStream` that yields exactly one reader, streaming the
110    /// contents of the reader itself.
111    ///
112    /// # Example
113    ///
114    /// Stream the bytes from a remote TCP connection:
115    ///
116    /// ```rust
117    /// # use rocket::*;
118    /// use std::io;
119    /// use std::net::SocketAddr;
120    ///
121    /// use rocket::tokio::net::TcpStream;
122    /// use rocket::response::stream::ReaderStream;
123    ///
124    /// #[get("/stream")]
125    /// async fn stream() -> io::Result<ReaderStream![TcpStream]> {
126    ///     let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
127    ///     let stream = TcpStream::connect(addr).await?;
128    ///     Ok(ReaderStream::one(stream))
129    /// }
130    /// ```
131    pub fn one(reader: R) -> Self {
132        ReaderStream::from(One::from(reader))
133    }
134}
135
136impl<S: Stream> From<S> for ReaderStream<S> {
137    fn from(stream: S) -> Self {
138        ReaderStream { stream, state: State::Pending }
139    }
140}
141
142impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream<S>
143    where S: Send + 'r, S::Item: AsyncRead + Send,
144{
145    fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
146        Response::build()
147            .streamed_body(self)
148            .ok()
149    }
150}
151
152impl<S: Stream> AsyncRead for ReaderStream<S>
153    where S::Item: AsyncRead + Send
154{
155    fn poll_read(
156        self: Pin<&mut Self>,
157        cx: &mut Context<'_>,
158        buf: &mut ReadBuf<'_>
159    ) -> Poll<io::Result<()>> {
160        let mut me = self.project();
161        loop {
162            match me.state.as_mut().project() {
163                StateProjection::Pending => match me.stream.as_mut().poll_next(cx) {
164                    Poll::Pending => return Poll::Pending,
165                    Poll::Ready(None) => me.state.set(State::Done),
166                    Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }),
167                },
168                StateProjection::Reading { reader } => {
169                    let init = buf.filled().len();
170                    match reader.poll_read(cx, buf) {
171                        Poll::Ready(Ok(())) if buf.filled().len() == init => {
172                            me.state.set(State::Pending);
173                        },
174                        Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
175                        Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
176                        Poll::Pending => return Poll::Pending,
177                    }
178                },
179                StateProjection::Done => return Poll::Ready(Ok(())),
180            }
181        }
182    }
183}
184
185impl<S: Stream + fmt::Debug> fmt::Debug for ReaderStream<S>
186    where S::Item: fmt::Debug
187{
188    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189        f.debug_struct("ReaderStream")
190            .field("stream", &self.stream)
191            .field("state", &self.state)
192            .finish()
193    }
194}
195
196crate::export! {
197    /// Type and stream expression macro for [`struct@ReaderStream`].
198    ///
199    /// See [`stream!`](crate::response::stream::stream) for the syntax
200    /// supported by this macro.
201    ///
202    /// See [`struct@ReaderStream`] and the [module level
203    /// docs](crate::response::stream#typed-streams) for usage details.
204    macro_rules! ReaderStream {
205        ($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*));
206    }
207}