rocket/response/stream/reader.rs
1use std::{fmt, io};
2use std::task::{Context, Poll};
3use std::pin::Pin;
4
5use futures::stream::Stream;
6use tokio::io::{AsyncRead, ReadBuf};
7use pin_project_lite::pin_project;
8
9use crate::request::Request;
10use crate::response::{self, Response, Responder};
11use crate::response::stream::One;
12
13pin_project! {
14 /// An async reader that reads from a stream of async readers.
15 ///
16 /// A `ReaderStream` can be constructed from any [`Stream`] of items of type
17 /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using
18 /// [`ReaderStream::one()`]. The `AsyncRead` implementation of
19 /// `ReaderStream` progresses the stream forward, returning the contents of
20 /// the inner readers. Thus, a `ReaderStream` can be thought of as a
21 /// _flattening_ of async readers.
22 ///
23 /// `ReaderStream` is designed to be used as a building-block for
24 /// stream-based responders by acting as the `streamed_body` of a
25 /// `Response`, though it may also be used as a responder itself.
26 ///
27 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
28 ///
29 /// ```rust
30 /// use std::io::Cursor;
31 ///
32 /// use rocket::{Request, Response};
33 /// use rocket::futures::stream::{Stream, StreamExt};
34 /// use rocket::response::{self, Responder, stream::ReaderStream};
35 /// use rocket::http::ContentType;
36 ///
37 /// struct MyStream<S>(S);
38 ///
39 /// impl<'r, S: Stream<Item = String>> Responder<'r, 'r> for MyStream<S>
40 /// where S: Send + 'r
41 /// {
42 /// fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
43 /// Response::build()
44 /// .header(ContentType::Text)
45 /// .streamed_body(ReaderStream::from(self.0.map(Cursor::new)))
46 /// .ok()
47 /// }
48 /// }
49 /// ```
50 ///
51 /// # Responder
52 ///
53 /// `ReaderStream` is a (potentially infinite) responder. No `Content-Type`
54 /// is set. The body is [unsized](crate::response::Body#unsized), and values
55 /// are sent as soon as they are yielded by the internal stream.
56 ///
57 /// # Example
58 ///
59 /// ```rust
60 /// # use rocket::*;
61 /// use rocket::response::stream::ReaderStream;
62 /// use rocket::futures::stream::{repeat, StreamExt};
63 /// use rocket::tokio::time::{self, Duration};
64 /// use rocket::tokio::fs::File;
65 ///
66 /// // Stream the contents of `safe/path` followed by `another/safe/path`.
67 /// #[get("/reader/stream")]
68 /// fn stream() -> ReaderStream![File] {
69 /// ReaderStream! {
70 /// let paths = &["safe/path", "another/safe/path"];
71 /// for path in paths {
72 /// if let Ok(file) = File::open(path).await {
73 /// yield file;
74 /// }
75 /// }
76 /// }
77 /// }
78 ///
79 /// // Stream the contents of the file `safe/path`. This is identical to
80 /// // returning `File` directly; Rocket responders stream and never buffer.
81 /// #[get("/reader/stream/one")]
82 /// async fn stream_one() -> std::io::Result<ReaderStream![File]> {
83 /// let file = File::open("safe/path").await?;
84 /// Ok(ReaderStream::one(file))
85 /// }
86 /// ```
87 ///
88 /// The syntax of [`ReaderStream!`] as an expression is identical to that of
89 /// [`stream!`](crate::response::stream::stream).
90 pub struct ReaderStream<S: Stream> {
91 #[pin]
92 stream: S,
93 #[pin]
94 state: State<S::Item>,
95 }
96}
97
98pin_project! {
99 #[project = StateProjection]
100 #[derive(Debug)]
101 enum State<R> {
102 Pending,
103 Reading { #[pin] reader: R },
104 Done,
105 }
106}
107
108impl<R: Unpin> ReaderStream<One<R>> {
109 /// Create a `ReaderStream` that yields exactly one reader, streaming the
110 /// contents of the reader itself.
111 ///
112 /// # Example
113 ///
114 /// Stream the bytes from a remote TCP connection:
115 ///
116 /// ```rust
117 /// # use rocket::*;
118 /// use std::io;
119 /// use std::net::SocketAddr;
120 ///
121 /// use rocket::tokio::net::TcpStream;
122 /// use rocket::response::stream::ReaderStream;
123 ///
124 /// #[get("/stream")]
125 /// async fn stream() -> io::Result<ReaderStream![TcpStream]> {
126 /// let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
127 /// let stream = TcpStream::connect(addr).await?;
128 /// Ok(ReaderStream::one(stream))
129 /// }
130 /// ```
131 pub fn one(reader: R) -> Self {
132 ReaderStream::from(One::from(reader))
133 }
134}
135
136impl<S: Stream> From<S> for ReaderStream<S> {
137 fn from(stream: S) -> Self {
138 ReaderStream { stream, state: State::Pending }
139 }
140}
141
142impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream<S>
143 where S: Send + 'r, S::Item: AsyncRead + Send,
144{
145 fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
146 Response::build()
147 .streamed_body(self)
148 .ok()
149 }
150}
151
152impl<S: Stream> AsyncRead for ReaderStream<S>
153 where S::Item: AsyncRead + Send
154{
155 fn poll_read(
156 self: Pin<&mut Self>,
157 cx: &mut Context<'_>,
158 buf: &mut ReadBuf<'_>
159 ) -> Poll<io::Result<()>> {
160 let mut me = self.project();
161 loop {
162 match me.state.as_mut().project() {
163 StateProjection::Pending => match me.stream.as_mut().poll_next(cx) {
164 Poll::Pending => return Poll::Pending,
165 Poll::Ready(None) => me.state.set(State::Done),
166 Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }),
167 },
168 StateProjection::Reading { reader } => {
169 let init = buf.filled().len();
170 match reader.poll_read(cx, buf) {
171 Poll::Ready(Ok(())) if buf.filled().len() == init => {
172 me.state.set(State::Pending);
173 },
174 Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
175 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
176 Poll::Pending => return Poll::Pending,
177 }
178 },
179 StateProjection::Done => return Poll::Ready(Ok(())),
180 }
181 }
182 }
183}
184
185impl<S: Stream + fmt::Debug> fmt::Debug for ReaderStream<S>
186 where S::Item: fmt::Debug
187{
188 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
189 f.debug_struct("ReaderStream")
190 .field("stream", &self.stream)
191 .field("state", &self.state)
192 .finish()
193 }
194}
195
196crate::export! {
197 /// Type and stream expression macro for [`struct@ReaderStream`].
198 ///
199 /// See [`stream!`](crate::response::stream::stream) for the syntax
200 /// supported by this macro.
201 ///
202 /// See [`struct@ReaderStream`] and the [module level
203 /// docs](crate::response::stream#typed-streams) for usage details.
204 macro_rules! ReaderStream {
205 ($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*));
206 }
207}