rocket/shutdown/
handle.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use std::future::Future;
use std::task::{Context, Poll};
use std::pin::Pin;

use futures::{FutureExt, StreamExt};

use crate::shutdown::{ShutdownConfig, TripWire};
use crate::request::{FromRequest, Outcome, Request};

/// A request guard and future for graceful shutdown.
///
/// A server shutdown is manually requested by calling [`Shutdown::notify()`]
/// or, if enabled, through [automatic triggers] like `Ctrl-C`. Rocket will stop
/// accepting new requests, finish handling any pending requests, wait a grace
/// period before cancelling any outstanding I/O, and return `Ok()` to the
/// caller of [`Rocket::launch()`]. Graceful shutdown is configured via
/// [`ShutdownConfig`](crate::config::ShutdownConfig).
///
/// [`Rocket::launch()`]: crate::Rocket::launch()
/// [automatic triggers]: crate::shutdown::Shutdown#triggers
///
/// # Detecting Shutdown
///
/// `Shutdown` is also a future that resolves when [`Shutdown::notify()`] is
/// called. This can be used to detect shutdown in any part of the application:
///
/// ```rust
/// # use rocket::*;
/// use rocket::Shutdown;
///
/// #[get("/wait/for/shutdown")]
/// async fn wait_for_shutdown(shutdown: Shutdown) -> &'static str {
///     shutdown.await;
///     "Somewhere, shutdown was requested."
/// }
/// ```
///
/// See the [`stream`](crate::response::stream#graceful-shutdown) docs for an
/// example of detecting shutdown in an infinite responder.
///
/// Additionally, a completed shutdown request resolves the future returned from
/// [`Rocket::launch()`](crate::Rocket::launch()):
///
/// ```rust,no_run
/// # #[macro_use] extern crate rocket;
/// #
/// use rocket::Shutdown;
///
/// #[get("/shutdown")]
/// fn shutdown(shutdown: Shutdown) -> &'static str {
///     shutdown.notify();
///     "Shutting down..."
/// }
///
/// #[rocket::main]
/// async fn main() {
///     let result = rocket::build()
///         .mount("/", routes![shutdown])
///         .launch()
///         .await;
///
///     // If the server shut down (by visiting `/shutdown`), `result` is `Ok`.
///     result.expect("server failed unexpectedly");
/// }
/// ```
#[derive(Debug, Clone)]
#[must_use = "`Shutdown` does nothing unless polled or `notify`ed"]
pub struct Shutdown {
    wire: TripWire,
}

#[derive(Debug, Clone)]
pub struct Stages {
    pub start: Shutdown,
    pub grace: Shutdown,
    pub mercy: Shutdown,
}

impl Shutdown {
    fn new() -> Self {
        Shutdown {
            wire: TripWire::new(),
        }
    }

    /// Notify the application to shut down gracefully.
    ///
    /// This function returns immediately; pending requests will continue to run
    /// until completion or expiration of the grace period, which ever comes
    /// first, before the actual shutdown occurs. The grace period can be
    /// configured via [`ShutdownConfig`]'s `grace` field.
    ///
    /// ```rust
    /// # use rocket::*;
    /// use rocket::Shutdown;
    ///
    /// #[get("/shutdown")]
    /// fn shutdown(shutdown: Shutdown) -> &'static str {
    ///     shutdown.notify();
    ///     "Shutting down..."
    /// }
    /// ```
    #[inline(always)]
    pub fn notify(&self) {
        self.wire.trip();
    }

    /// Returns `true` if `Shutdown::notify()` has already been called.
    ///
    /// # Example
    ///
    /// ```rust
    /// # use rocket::*;
    /// use rocket::Shutdown;
    ///
    /// #[get("/shutdown")]
    /// fn shutdown(shutdown: Shutdown) {
    ///     shutdown.notify();
    ///     assert!(shutdown.notified());
    /// }
    /// ```
    #[must_use]
    #[inline(always)]
    pub fn notified(&self) -> bool {
        self.wire.tripped()
    }
}

impl Future for Shutdown {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.wire.poll_unpin(cx)
    }
}

#[crate::async_trait]
impl<'r> FromRequest<'r> for Shutdown {
    type Error = std::convert::Infallible;

    #[inline]
    async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
        Outcome::Success(request.rocket().shutdown())
    }
}

impl Stages {
    pub fn new() -> Self {
        Stages {
            start: Shutdown::new(),
            grace: Shutdown::new(),
            mercy: Shutdown::new(),
        }
    }

    pub(crate) fn spawn_listener(&self, config: &ShutdownConfig) {
        use futures::stream;
        use futures::future::{select, Either};

        let mut signal = match config.signal_stream() {
            Some(stream) => Either::Left(stream.chain(stream::pending())),
            None => Either::Right(stream::pending()),
        };

        let start  = self.start.clone();
        let (grace, grace_duration)  = (self.grace.clone(), config.grace());
        let (mercy, mercy_duration)  = (self.mercy.clone(), config.mercy());
        tokio::spawn(async move {
            if let Either::Left((sig, start)) = select(signal.next(), start).await {
                warn!("Received {}. Shutdown started.", sig.unwrap());
                start.notify();
            }

            tokio::time::sleep(grace_duration).await;
            warn!("Shutdown grace period elapsed. Shutting down I/O.");
            grace.notify();

            tokio::time::sleep(mercy_duration).await;
            warn!("Mercy period elapsed. Terminating I/O.");
            mercy.notify();
        });
    }
}

#[cfg(test)]
mod tests {
    use super::Shutdown;

    #[test]
    fn ensure_is_send_sync_clone_unpin() {
        fn is_send_sync_clone_unpin<T: Send + Sync + Clone + Unpin>() {}
        is_send_sync_clone_unpin::<Shutdown>();
    }
}