rocket/shutdown/
handle.rs

1use std::future::Future;
2use std::task::{Context, Poll};
3use std::pin::Pin;
4
5use futures::{FutureExt, StreamExt};
6
7use crate::shutdown::{ShutdownConfig, TripWire};
8use crate::request::{FromRequest, Outcome, Request};
9
10/// A request guard and future for graceful shutdown.
11///
12/// A server shutdown is manually requested by calling [`Shutdown::notify()`]
13/// or, if enabled, through [automatic triggers] like `Ctrl-C`. Rocket will stop
14/// accepting new requests, finish handling any pending requests, wait a grace
15/// period before cancelling any outstanding I/O, and return `Ok()` to the
16/// caller of [`Rocket::launch()`]. Graceful shutdown is configured via
17/// [`ShutdownConfig`](crate::config::ShutdownConfig).
18///
19/// [`Rocket::launch()`]: crate::Rocket::launch()
20/// [automatic triggers]: crate::shutdown::Shutdown#triggers
21///
22/// # Detecting Shutdown
23///
24/// `Shutdown` is also a future that resolves when [`Shutdown::notify()`] is
25/// called. This can be used to detect shutdown in any part of the application:
26///
27/// ```rust
28/// # use rocket::*;
29/// use rocket::Shutdown;
30///
31/// #[get("/wait/for/shutdown")]
32/// async fn wait_for_shutdown(shutdown: Shutdown) -> &'static str {
33///     shutdown.await;
34///     "Somewhere, shutdown was requested."
35/// }
36/// ```
37///
38/// See the [`stream`](crate::response::stream#graceful-shutdown) docs for an
39/// example of detecting shutdown in an infinite responder.
40///
41/// Additionally, a completed shutdown request resolves the future returned from
42/// [`Rocket::launch()`](crate::Rocket::launch()):
43///
44/// ```rust,no_run
45/// # #[macro_use] extern crate rocket;
46/// #
47/// use rocket::Shutdown;
48///
49/// #[get("/shutdown")]
50/// fn shutdown(shutdown: Shutdown) -> &'static str {
51///     shutdown.notify();
52///     "Shutting down..."
53/// }
54///
55/// #[rocket::main]
56/// async fn main() {
57///     let result = rocket::build()
58///         .mount("/", routes![shutdown])
59///         .launch()
60///         .await;
61///
62///     // If the server shut down (by visiting `/shutdown`), `result` is `Ok`.
63///     result.expect("server failed unexpectedly");
64/// }
65/// ```
66#[derive(Debug, Clone)]
67#[must_use = "`Shutdown` does nothing unless polled or `notify`ed"]
68pub struct Shutdown {
69    wire: TripWire,
70}
71
72#[derive(Debug, Clone)]
73pub struct Stages {
74    pub start: Shutdown,
75    pub grace: Shutdown,
76    pub mercy: Shutdown,
77}
78
79impl Shutdown {
80    fn new() -> Self {
81        Shutdown {
82            wire: TripWire::new(),
83        }
84    }
85
86    /// Notify the application to shut down gracefully.
87    ///
88    /// This function returns immediately; pending requests will continue to run
89    /// until completion or expiration of the grace period, which ever comes
90    /// first, before the actual shutdown occurs. The grace period can be
91    /// configured via [`ShutdownConfig`]'s `grace` field.
92    ///
93    /// ```rust
94    /// # use rocket::*;
95    /// use rocket::Shutdown;
96    ///
97    /// #[get("/shutdown")]
98    /// fn shutdown(shutdown: Shutdown) -> &'static str {
99    ///     shutdown.notify();
100    ///     "Shutting down..."
101    /// }
102    /// ```
103    #[inline(always)]
104    pub fn notify(&self) {
105        self.wire.trip();
106    }
107
108    /// Returns `true` if `Shutdown::notify()` has already been called.
109    ///
110    /// # Example
111    ///
112    /// ```rust
113    /// # use rocket::*;
114    /// use rocket::Shutdown;
115    ///
116    /// #[get("/shutdown")]
117    /// fn shutdown(shutdown: Shutdown) {
118    ///     shutdown.notify();
119    ///     assert!(shutdown.notified());
120    /// }
121    /// ```
122    #[must_use]
123    #[inline(always)]
124    pub fn notified(&self) -> bool {
125        self.wire.tripped()
126    }
127}
128
129impl Future for Shutdown {
130    type Output = ();
131
132    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133        self.wire.poll_unpin(cx)
134    }
135}
136
137#[crate::async_trait]
138impl<'r> FromRequest<'r> for Shutdown {
139    type Error = std::convert::Infallible;
140
141    #[inline]
142    async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
143        Outcome::Success(request.rocket().shutdown())
144    }
145}
146
147impl Stages {
148    pub fn new() -> Self {
149        Stages {
150            start: Shutdown::new(),
151            grace: Shutdown::new(),
152            mercy: Shutdown::new(),
153        }
154    }
155
156    pub(crate) fn spawn_listener(&self, config: &ShutdownConfig) {
157        use futures::stream;
158        use futures::future::{select, Either};
159
160        let mut signal = match config.signal_stream() {
161            Some(stream) => Either::Left(stream.chain(stream::pending())),
162            None => Either::Right(stream::pending()),
163        };
164
165        let start  = self.start.clone();
166        let (grace, grace_duration)  = (self.grace.clone(), config.grace());
167        let (mercy, mercy_duration)  = (self.mercy.clone(), config.mercy());
168        tokio::spawn(async move {
169            if let Either::Left((sig, start)) = select(signal.next(), start).await {
170                warn!("Received {}. Shutdown started.", sig.unwrap());
171                start.notify();
172            }
173
174            tokio::time::sleep(grace_duration).await;
175            warn!("Shutdown grace period elapsed. Shutting down I/O.");
176            grace.notify();
177
178            tokio::time::sleep(mercy_duration).await;
179            warn!("Mercy period elapsed. Terminating I/O.");
180            mercy.notify();
181        });
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::Shutdown;
188
189    #[test]
190    fn ensure_is_send_sync_clone_unpin() {
191        fn is_send_sync_clone_unpin<T: Send + Sync + Clone + Unpin>() {}
192        is_send_sync_clone_unpin::<Shutdown>();
193    }
194}