1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
use std::{fmt, io};
use std::task::{Context, Poll};
use std::pin::Pin;

use futures::stream::Stream;
use tokio::io::{AsyncRead, ReadBuf};
use pin_project_lite::pin_project;

use crate::request::Request;
use crate::response::{self, Response, Responder};
use crate::response::stream::One;

pin_project! {
    /// An async reader that reads from a stream of async readers.
    ///
    /// A `ReaderStream` can be constructed from any [`Stream`] of items of type
    /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using
    /// [`ReaderStream::one()`]. The `AsyncRead` implementation of
    /// `ReaderStream` progresses the stream forward, returning the contents of
    /// the inner readers. Thus, a `ReaderStream` can be thought of as a
    /// _flattening_ of async readers.
    ///
    /// `ReaderStream` is designed to be used as a building-block for
    /// stream-based responders by acting as the `streamed_body` of a
    /// `Response`, though it may also be used as a responder itself.
    ///
    /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
    ///
    /// ```rust
    /// use std::io::Cursor;
    ///
    /// use rocket::{Request, Response};
    /// use rocket::futures::stream::{Stream, StreamExt};
    /// use rocket::response::{self, Responder, stream::ReaderStream};
    /// use rocket::http::ContentType;
    ///
    /// struct MyStream<S>(S);
    ///
    /// impl<'r, S: Stream<Item = String>> Responder<'r, 'r> for MyStream<S>
    ///     where S: Send + 'r
    /// {
    ///     fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
    ///         Response::build()
    ///             .header(ContentType::Text)
    ///             .streamed_body(ReaderStream::from(self.0.map(Cursor::new)))
    ///             .ok()
    ///     }
    /// }
    /// ```
    ///
    /// # Responder
    ///
    /// `ReaderStream` is a (potentially infinite) responder. No `Content-Type`
    /// is set. The body is [unsized](crate::response::Body#unsized), and values
    /// are sent as soon as they are yielded by the internal stream.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use rocket::*;
    /// use rocket::response::stream::ReaderStream;
    /// use rocket::futures::stream::{repeat, StreamExt};
    /// use rocket::tokio::time::{self, Duration};
    /// use rocket::tokio::fs::File;
    ///
    /// // Stream the contents of `safe/path` followed by `another/safe/path`.
    /// #[get("/reader/stream")]
    /// fn stream() -> ReaderStream![File] {
    ///     ReaderStream! {
    ///         let paths = &["safe/path", "another/safe/path"];
    ///         for path in paths {
    ///             if let Ok(file) = File::open(path).await {
    ///                 yield file;
    ///             }
    ///         }
    ///     }
    /// }
    ///
    /// // Stream the contents of the file `safe/path`. This is identical to
    /// // returning `File` directly; Rocket responders stream and never buffer.
    /// #[get("/reader/stream/one")]
    /// async fn stream_one() -> std::io::Result<ReaderStream![File]> {
    ///     let file = File::open("safe/path").await?;
    ///     Ok(ReaderStream::one(file))
    /// }
    /// ```
    ///
    /// The syntax of [`ReaderStream!`] as an expression is identical to that of
    /// [`stream!`](crate::response::stream::stream).
    pub struct ReaderStream<S: Stream> {
        #[pin]
        stream: S,
        #[pin]
        state: State<S::Item>,
    }
}

pin_project! {
    #[project = StateProjection]
    #[derive(Debug)]
    enum State<R> {
        Pending,
        Reading { #[pin] reader: R },
        Done,
    }
}

impl<R: Unpin> ReaderStream<One<R>> {
    /// Create a `ReaderStream` that yields exactly one reader, streaming the
    /// contents of the reader itself.
    ///
    /// # Example
    ///
    /// Stream the bytes from a remote TCP connection:
    ///
    /// ```rust
    /// # use rocket::*;
    /// use std::io;
    /// use std::net::SocketAddr;
    ///
    /// use rocket::tokio::net::TcpStream;
    /// use rocket::response::stream::ReaderStream;
    ///
    /// #[get("/stream")]
    /// async fn stream() -> io::Result<ReaderStream![TcpStream]> {
    ///     let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
    ///     let stream = TcpStream::connect(addr).await?;
    ///     Ok(ReaderStream::one(stream))
    /// }
    /// ```
    pub fn one(reader: R) -> Self {
        ReaderStream::from(One::from(reader))
    }
}

impl<S: Stream> From<S> for ReaderStream<S> {
    fn from(stream: S) -> Self {
        ReaderStream { stream, state: State::Pending }
    }
}

impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream<S>
    where S: Send + 'r, S::Item: AsyncRead + Send,
{
    fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
        Response::build()
            .streamed_body(self)
            .ok()
    }
}

impl<S: Stream> AsyncRead for ReaderStream<S>
    where S::Item: AsyncRead + Send
{
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>
    ) -> Poll<io::Result<()>> {
        let mut me = self.project();
        loop {
            match me.state.as_mut().project() {
                StateProjection::Pending => match me.stream.as_mut().poll_next(cx) {
                    Poll::Pending => return Poll::Pending,
                    Poll::Ready(None) => me.state.set(State::Done),
                    Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }),
                },
                StateProjection::Reading { reader } => {
                    let init = buf.filled().len();
                    match reader.poll_read(cx, buf) {
                        Poll::Ready(Ok(())) if buf.filled().len() == init => {
                            me.state.set(State::Pending);
                        },
                        Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
                        Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
                        Poll::Pending => return Poll::Pending,
                    }
                },
                StateProjection::Done => return Poll::Ready(Ok(())),
            }
        }
    }
}

impl<S: Stream + fmt::Debug> fmt::Debug for ReaderStream<S>
    where S::Item: fmt::Debug
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ReaderStream")
            .field("stream", &self.stream)
            .field("state", &self.state)
            .finish()
    }
}

crate::export! {
    /// Type and stream expression macro for [`struct@ReaderStream`].
    ///
    /// See [`stream!`](crate::response::stream::stream) for the syntax
    /// supported by this macro.
    ///
    /// See [`struct@ReaderStream`] and the [module level
    /// docs](crate::response::stream#typed-streams) for usage details.
    macro_rules! ReaderStream {
        ($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*));
    }
}