rocket/response/stream/
mod.rs

1//! Potentially infinite async [`Stream`] response types.
2//!
3//! A [`Stream<Item = T>`] is the async analog of an `Iterator<Item = T>`: it
4//! generates a sequence of values asynchronously, otherwise known as an async
5//! _generator_. Types in this module allow for returning responses that are
6//! streams.
7//!
8//! [`Stream<Item = T>`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
9//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
10//!
11//! # Raw Streams
12//!
13//! Rust does not yet natively support syntax for creating arbitrary generators,
14//! and as such, for creating streams. To ameliorate this, Rocket exports
15//! [`stream!`], which retrofit generator syntax, allowing raw `impl Stream`s to
16//! be defined using `yield` and `for await` syntax:
17//!
18//! ```rust
19//! use rocket::futures::stream::Stream;
20//! use rocket::response::stream::stream;
21//!
22//! fn make_stream() -> impl Stream<Item = u8> {
23//!     stream! {
24//!         for i in 0..3 {
25//!             yield i;
26//!         }
27//!     }
28//! }
29//! ```
30//!
31//! See [`stream!`] for full usage details.
32//!
33//! # Typed Streams
34//!
35//! A raw stream is not a `Responder`, so it cannot be directly returned from a
36//! route handler. Instead, one of three _typed_ streams may be used. Each typed
37//! stream places type bounds on the `Item` of the stream, allowing for
38//! `Responder` implementation on the stream itself.
39//!
40//! Each typed stream exists both as a type and as a macro. They are:
41//!
42//!   * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead`
43//!   * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>`
44//!   * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef<str>`
45//!   * [`struct@EventStream`] ([`EventStream!`]) - Server-Sent [`Event`] stream
46//!
47//! Each type implements `Responder`; each macro can be invoked to generate a
48//! typed stream, exactly like [`stream!`] above. Additionally, each macro is
49//! also a _type_ macro, expanding to a wrapped `impl Stream<Item = $T>`, where
50//! `$T` is the input to the macro.
51//!
52//! As a concrete example, the route below produces an infinite series of
53//! `"hello"`s, one per second:
54//!
55//! ```rust
56//! # use rocket::get;
57//! use rocket::tokio::time::{self, Duration};
58//! use rocket::response::stream::TextStream;
59//!
60//! /// Produce an infinite series of `"hello"`s, one per second.
61//! #[get("/infinite-hellos")]
62//! fn hello() -> TextStream![&'static str] {
63//!     TextStream! {
64//!         let mut interval = time::interval(Duration::from_secs(1));
65//!         loop {
66//!             yield "hello";
67//!             interval.tick().await;
68//!         }
69//!     }
70//! }
71//! ```
72//!
73//! The `TextStream![&'static str]` invocation expands to:
74//!
75//! ```rust
76//! # use rocket::response::stream::TextStream;
77//! # use rocket::futures::stream::Stream;
78//! # use rocket::response::stream::stream;
79//! # fn f() ->
80//! TextStream<impl Stream<Item = &'static str>>
81//! # { TextStream::from(stream! { yield "hi" }) }
82//! ```
83//!
84//! While the inner `TextStream! { .. }` invocation expands to:
85//!
86//! ```rust
87//! # use rocket::response::stream::{TextStream, stream};
88//! TextStream::from(stream! { /* .. */ })
89//! # ;
90//! ```
91//!
92//! The expansions are identical for `ReaderStream` and `ByteStream`, with
93//! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively.
94//!
95//! ## Borrowing
96//!
97//! A stream can _yield_ borrowed values with no extra effort:
98//!
99//! ```rust
100//! # use rocket::get;
101//! use rocket::State;
102//! use rocket::response::stream::TextStream;
103//!
104//! /// Produce a single string borrowed from the request.
105//! #[get("/infinite-hellos")]
106//! fn hello(string: &State<String>) -> TextStream![&str] {
107//!     TextStream! {
108//!         yield string.as_str();
109//!     }
110//! }
111//! ```
112//!
113//! If the stream _contains_ a borrowed value or uses one internally, Rust
114//! requires this fact be explicit with a lifetime annotation:
115//!
116//! ```rust
117//! # use rocket::get;
118//! use rocket::State;
119//! use rocket::response::stream::TextStream;
120//!
121//! #[get("/")]
122//! fn borrow1(ctxt: &State<bool>) -> TextStream![&'static str + '_] {
123//!     TextStream! {
124//!         // By using `ctxt` in the stream, the borrow is moved into it. Thus,
125//!         // the stream object contains a borrow, prompting the '_ annotation.
126//!         if *ctxt.inner() {
127//!             yield "hello";
128//!         }
129//!     }
130//! }
131//!
132//! // Just as before but yielding an owned yield value.
133//! #[get("/")]
134//! fn borrow2(ctxt: &State<bool>) -> TextStream![String + '_] {
135//!     TextStream! {
136//!         if *ctxt.inner() {
137//!             yield "hello".to_string();
138//!         }
139//!     }
140//! }
141//!
142//! // As before but _also_ return a borrowed value. Without it, Rust gives:
143//! // - lifetime `'r` is missing in item created through this procedural macro
144//! #[get("/")]
145//! fn borrow3<'r>(ctxt: &'r State<bool>, s: &'r State<String>) -> TextStream![&'r str + 'r] {
146//!     TextStream! {
147//!         if *ctxt.inner() {
148//!             yield s.as_str();
149//!         }
150//!     }
151//! }
152//! ```
153//!
154//! # Graceful Shutdown
155//!
156//! Infinite responders, like the one defined in `hello` above, will prolong
157//! shutdown initiated via [`Shutdown::notify()`](crate::Shutdown::notify()) for
158//! the defined grace period. After the grace period has elapsed, Rocket will
159//! abruptly terminate the responder.
160//!
161//! To avoid abrupt termination, graceful shutdown can be detected via the
162//! [`Shutdown`](crate::Shutdown) future, allowing the infinite responder to
163//! gracefully shut itself down. The following example modifies the previous
164//! `hello` with shutdown detection:
165//!
166//! ```rust
167//! # use rocket::get;
168//! use rocket::Shutdown;
169//! use rocket::response::stream::TextStream;
170//! use rocket::tokio::select;
171//! use rocket::tokio::time::{self, Duration};
172//!
173//! /// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
174//! #[get("/infinite-hellos")]
175//! fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
176//!     TextStream! {
177//!         let mut interval = time::interval(Duration::from_secs(1));
178//!         loop {
179//!             select! {
180//!                 _ = interval.tick() => yield "hello",
181//!                 _ = &mut shutdown => {
182//!                     yield "goodbye";
183//!                     break;
184//!                 }
185//!             };
186//!         }
187//!     }
188//! }
189//! ```
190
191mod reader;
192mod bytes;
193mod text;
194mod one;
195mod sse;
196mod raw_sse;
197
198pub(crate) use self::raw_sse::*;
199
200pub use self::one::One;
201pub use self::text::TextStream;
202pub use self::bytes::ByteStream;
203pub use self::reader::ReaderStream;
204pub use self::sse::{Event, EventStream};
205
206crate::export! {
207    /// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax.
208    ///
209    /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
210    ///
211    /// This macro takes any series of statements and expands them into an
212    /// expression of type `impl Stream<Item = T>`, a stream that `yield`s
213    /// elements of type `T`. It supports any Rust statement syntax with the
214    /// following extensions:
215    ///
216    ///   * `yield expr`
217    ///
218    ///      Yields the result of evaluating `expr` to the caller (the stream
219    ///      consumer). `expr` must be of type `T`.
220    ///
221    ///   * `for await x in stream { .. }`
222    ///
223    ///      `await`s the next element in `stream`, binds it to `x`, and
224    ///      executes the block with the binding. `stream` must implement
225    ///      `Stream<Item = T>`; the type of `x` is `T`.
226    ///
227    ///   * `?` short-circuits stream termination on `Err`
228    ///
229    /// # Examples
230    ///
231    /// ```rust
232    /// use rocket::response::stream::stream;
233    /// use rocket::futures::stream::Stream;
234    ///
235    /// fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
236    ///     stream! {
237    ///         for s in &["hi", "there"]{
238    ///             yield s.to_string();
239    ///         }
240    ///
241    ///         for await n in stream {
242    ///             yield format!("n: {}", n);
243    ///         }
244    ///     }
245    /// }
246    ///
247    /// # rocket::async_test(async {
248    /// use rocket::futures::stream::{self, StreamExt};
249    ///
250    /// let stream = f(stream::iter(vec![3, 7, 11]));
251    /// let strings: Vec<_> = stream.collect().await;
252    /// assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);
253    /// # });
254    /// ```
255    ///
256    /// Using `?` on an `Err` short-circuits stream termination:
257    ///
258    /// ```rust
259    /// use std::io;
260    ///
261    /// use rocket::response::stream::stream;
262    /// use rocket::futures::stream::Stream;
263    ///
264    /// fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
265    ///     where S: Stream<Item = io::Result<&'static str>>
266    /// {
267    ///     stream! {
268    ///         for await s in stream {
269    ///             let num = s?.parse();
270    ///             let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
271    ///             yield Ok(num);
272    ///         }
273    ///     }
274    /// }
275    ///
276    /// # rocket::async_test(async {
277    /// use rocket::futures::stream::{self, StreamExt};
278    ///
279    /// let e = io::Error::last_os_error();
280    /// let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
281    /// let results: Vec<_> = stream.collect().await;
282    /// assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));
283    /// # });
284    /// ```
285    macro_rules! stream {
286        ($($t:tt)*) => ($crate::async_stream::stream!($($t)*));
287    }
288}
289
290#[doc(hidden)]
291#[macro_export]
292macro_rules! _typed_stream {
293    ($S:ident, $($t:tt)*) => (
294        $crate::__typed_stream! {
295            $crate::response::stream::$S,
296            $crate::response::stream::stream,
297            $crate::futures::stream::Stream,
298            $($t)*
299        }
300    )
301}