rocket/response/stream/mod.rs
1//! Potentially infinite async [`Stream`] response types.
2//!
3//! A [`Stream<Item = T>`] is the async analog of an `Iterator<Item = T>`: it
4//! generates a sequence of values asynchronously, otherwise known as an async
5//! _generator_. Types in this module allow for returning responses that are
6//! streams.
7//!
8//! [`Stream<Item = T>`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
9//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
10//!
11//! # Raw Streams
12//!
13//! Rust does not yet natively support syntax for creating arbitrary generators,
14//! and as such, for creating streams. To ameliorate this, Rocket exports
15//! [`stream!`], which retrofit generator syntax, allowing raw `impl Stream`s to
16//! be defined using `yield` and `for await` syntax:
17//!
18//! ```rust
19//! use rocket::futures::stream::Stream;
20//! use rocket::response::stream::stream;
21//!
22//! fn make_stream() -> impl Stream<Item = u8> {
23//! stream! {
24//! for i in 0..3 {
25//! yield i;
26//! }
27//! }
28//! }
29//! ```
30//!
31//! See [`stream!`] for full usage details.
32//!
33//! # Typed Streams
34//!
35//! A raw stream is not a `Responder`, so it cannot be directly returned from a
36//! route handler. Instead, one of three _typed_ streams may be used. Each typed
37//! stream places type bounds on the `Item` of the stream, allowing for
38//! `Responder` implementation on the stream itself.
39//!
40//! Each typed stream exists both as a type and as a macro. They are:
41//!
42//! * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead`
43//! * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>`
44//! * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef<str>`
45//! * [`struct@EventStream`] ([`EventStream!`]) - Server-Sent [`Event`] stream
46//!
47//! Each type implements `Responder`; each macro can be invoked to generate a
48//! typed stream, exactly like [`stream!`] above. Additionally, each macro is
49//! also a _type_ macro, expanding to a wrapped `impl Stream<Item = $T>`, where
50//! `$T` is the input to the macro.
51//!
52//! As a concrete example, the route below produces an infinite series of
53//! `"hello"`s, one per second:
54//!
55//! ```rust
56//! # use rocket::get;
57//! use rocket::tokio::time::{self, Duration};
58//! use rocket::response::stream::TextStream;
59//!
60//! /// Produce an infinite series of `"hello"`s, one per second.
61//! #[get("/infinite-hellos")]
62//! fn hello() -> TextStream![&'static str] {
63//! TextStream! {
64//! let mut interval = time::interval(Duration::from_secs(1));
65//! loop {
66//! yield "hello";
67//! interval.tick().await;
68//! }
69//! }
70//! }
71//! ```
72//!
73//! The `TextStream![&'static str]` invocation expands to:
74//!
75//! ```rust
76//! # use rocket::response::stream::TextStream;
77//! # use rocket::futures::stream::Stream;
78//! # use rocket::response::stream::stream;
79//! # fn f() ->
80//! TextStream<impl Stream<Item = &'static str>>
81//! # { TextStream::from(stream! { yield "hi" }) }
82//! ```
83//!
84//! While the inner `TextStream! { .. }` invocation expands to:
85//!
86//! ```rust
87//! # use rocket::response::stream::{TextStream, stream};
88//! TextStream::from(stream! { /* .. */ })
89//! # ;
90//! ```
91//!
92//! The expansions are identical for `ReaderStream` and `ByteStream`, with
93//! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively.
94//!
95//! ## Borrowing
96//!
97//! A stream can _yield_ borrowed values with no extra effort:
98//!
99//! ```rust
100//! # use rocket::get;
101//! use rocket::State;
102//! use rocket::response::stream::TextStream;
103//!
104//! /// Produce a single string borrowed from the request.
105//! #[get("/infinite-hellos")]
106//! fn hello(string: &State<String>) -> TextStream![&str] {
107//! TextStream! {
108//! yield string.as_str();
109//! }
110//! }
111//! ```
112//!
113//! If the stream _contains_ a borrowed value or uses one internally, Rust
114//! requires this fact be explicit with a lifetime annotation:
115//!
116//! ```rust
117//! # use rocket::get;
118//! use rocket::State;
119//! use rocket::response::stream::TextStream;
120//!
121//! #[get("/")]
122//! fn borrow1(ctxt: &State<bool>) -> TextStream![&'static str + '_] {
123//! TextStream! {
124//! // By using `ctxt` in the stream, the borrow is moved into it. Thus,
125//! // the stream object contains a borrow, prompting the '_ annotation.
126//! if *ctxt.inner() {
127//! yield "hello";
128//! }
129//! }
130//! }
131//!
132//! // Just as before but yielding an owned yield value.
133//! #[get("/")]
134//! fn borrow2(ctxt: &State<bool>) -> TextStream![String + '_] {
135//! TextStream! {
136//! if *ctxt.inner() {
137//! yield "hello".to_string();
138//! }
139//! }
140//! }
141//!
142//! // As before but _also_ return a borrowed value. Without it, Rust gives:
143//! // - lifetime `'r` is missing in item created through this procedural macro
144//! #[get("/")]
145//! fn borrow3<'r>(ctxt: &'r State<bool>, s: &'r State<String>) -> TextStream![&'r str + 'r] {
146//! TextStream! {
147//! if *ctxt.inner() {
148//! yield s.as_str();
149//! }
150//! }
151//! }
152//! ```
153//!
154//! # Graceful Shutdown
155//!
156//! Infinite responders, like the one defined in `hello` above, will prolong
157//! shutdown initiated via [`Shutdown::notify()`](crate::Shutdown::notify()) for
158//! the defined grace period. After the grace period has elapsed, Rocket will
159//! abruptly terminate the responder.
160//!
161//! To avoid abrupt termination, graceful shutdown can be detected via the
162//! [`Shutdown`](crate::Shutdown) future, allowing the infinite responder to
163//! gracefully shut itself down. The following example modifies the previous
164//! `hello` with shutdown detection:
165//!
166//! ```rust
167//! # use rocket::get;
168//! use rocket::Shutdown;
169//! use rocket::response::stream::TextStream;
170//! use rocket::tokio::select;
171//! use rocket::tokio::time::{self, Duration};
172//!
173//! /// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
174//! #[get("/infinite-hellos")]
175//! fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
176//! TextStream! {
177//! let mut interval = time::interval(Duration::from_secs(1));
178//! loop {
179//! select! {
180//! _ = interval.tick() => yield "hello",
181//! _ = &mut shutdown => {
182//! yield "goodbye";
183//! break;
184//! }
185//! };
186//! }
187//! }
188//! }
189//! ```
190
191mod reader;
192mod bytes;
193mod text;
194mod one;
195mod sse;
196mod raw_sse;
197
198pub(crate) use self::raw_sse::*;
199
200pub use self::one::One;
201pub use self::text::TextStream;
202pub use self::bytes::ByteStream;
203pub use self::reader::ReaderStream;
204pub use self::sse::{Event, EventStream};
205
206crate::export! {
207 /// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax.
208 ///
209 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
210 ///
211 /// This macro takes any series of statements and expands them into an
212 /// expression of type `impl Stream<Item = T>`, a stream that `yield`s
213 /// elements of type `T`. It supports any Rust statement syntax with the
214 /// following extensions:
215 ///
216 /// * `yield expr`
217 ///
218 /// Yields the result of evaluating `expr` to the caller (the stream
219 /// consumer). `expr` must be of type `T`.
220 ///
221 /// * `for await x in stream { .. }`
222 ///
223 /// `await`s the next element in `stream`, binds it to `x`, and
224 /// executes the block with the binding. `stream` must implement
225 /// `Stream<Item = T>`; the type of `x` is `T`.
226 ///
227 /// * `?` short-circuits stream termination on `Err`
228 ///
229 /// # Examples
230 ///
231 /// ```rust
232 /// use rocket::response::stream::stream;
233 /// use rocket::futures::stream::Stream;
234 ///
235 /// fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
236 /// stream! {
237 /// for s in &["hi", "there"]{
238 /// yield s.to_string();
239 /// }
240 ///
241 /// for await n in stream {
242 /// yield format!("n: {}", n);
243 /// }
244 /// }
245 /// }
246 ///
247 /// # rocket::async_test(async {
248 /// use rocket::futures::stream::{self, StreamExt};
249 ///
250 /// let stream = f(stream::iter(vec![3, 7, 11]));
251 /// let strings: Vec<_> = stream.collect().await;
252 /// assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);
253 /// # });
254 /// ```
255 ///
256 /// Using `?` on an `Err` short-circuits stream termination:
257 ///
258 /// ```rust
259 /// use std::io;
260 ///
261 /// use rocket::response::stream::stream;
262 /// use rocket::futures::stream::Stream;
263 ///
264 /// fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
265 /// where S: Stream<Item = io::Result<&'static str>>
266 /// {
267 /// stream! {
268 /// for await s in stream {
269 /// let num = s?.parse();
270 /// let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
271 /// yield Ok(num);
272 /// }
273 /// }
274 /// }
275 ///
276 /// # rocket::async_test(async {
277 /// use rocket::futures::stream::{self, StreamExt};
278 ///
279 /// let e = io::Error::last_os_error();
280 /// let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
281 /// let results: Vec<_> = stream.collect().await;
282 /// assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));
283 /// # });
284 /// ```
285 macro_rules! stream {
286 ($($t:tt)*) => ($crate::async_stream::stream!($($t)*));
287 }
288}
289
290#[doc(hidden)]
291#[macro_export]
292macro_rules! _typed_stream {
293 ($S:ident, $($t:tt)*) => (
294 $crate::__typed_stream! {
295 $crate::response::stream::$S,
296 $crate::response::stream::stream,
297 $crate::futures::stream::Stream,
298 $($t)*
299 }
300 )
301}