rocket/
server.rs

1use std::io;
2use std::pin::pin;
3use std::sync::Arc;
4use std::time::Duration;
5
6use hyper::service::service_fn;
7use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
8use hyper_util::server::conn::auto::Builder;
9use futures::{Future, TryFutureExt};
10use tokio::io::{AsyncRead, AsyncWrite};
11
12use crate::{Ignite, Orbit, Request, Rocket};
13use crate::request::ConnectionMeta;
14use crate::erased::{ErasedRequest, ErasedResponse, ErasedIoHandler};
15use crate::listener::{Listener, Connection, BouncedExt, CancellableExt};
16use crate::error::log_server_error;
17use crate::data::{IoStream, RawStream};
18use crate::util::{spawn_inspect, FutureExt, ReaderStream};
19use crate::http::Status;
20use crate::trace::{Trace, TraceAll};
21
22type Result<T, E = crate::Error> = std::result::Result<T, E>;
23
24impl Rocket<Orbit> {
25    #[tracing::instrument("request", skip_all, fields(
26        method = %parts.method,
27        uri = %parts.uri,
28        autohandled
29    ))]
30    async fn service<T: for<'a> Into<RawStream<'a>>>(
31        self: Arc<Self>,
32        parts: http::request::Parts,
33        stream: T,
34        upgrade: Option<hyper::upgrade::OnUpgrade>,
35        connection: ConnectionMeta,
36    ) -> Result<hyper::Response<ReaderStream<ErasedResponse>>, http::Error> {
37        connection.trace_debug();
38        let request = ErasedRequest::new(self, parts, |rocket, parts| {
39            Request::from_hyp(rocket, parts, connection).unwrap_or_else(|e| e)
40        });
41
42        span_debug!("request headers" => request.inner().headers().iter().trace_all_debug());
43        let mut response = request.into_response(
44            stream,
45            |rocket, request, data| Box::pin(rocket.preprocess(request, data)),
46            |token, rocket, request, data| Box::pin(async move {
47                if !request.errors.is_empty() {
48                    return rocket.dispatch_error(Status::BadRequest, request).await;
49                }
50
51                rocket.dispatch(token, request, data).await
52            })
53        ).await;
54
55        // TODO: Should upgrades be handled in dispatch?
56        response.inner().trace_info();
57        span_debug!("response headers" => response.inner().headers().iter().trace_all_debug());
58        let io_handler = response.make_io_handler(Rocket::extract_io_handler);
59        if let (Some((proto, handler)), Some(upgrade)) = (io_handler, upgrade) {
60            let upgrade = upgrade.map_ok(IoStream::from).map_err(io::Error::other);
61            tokio::task::spawn(io_handler_task(proto, upgrade, handler));
62        }
63
64        let mut builder = hyper::Response::builder();
65        builder = builder.status(response.inner().status().code);
66        for header in response.inner().headers().iter() {
67            builder = builder.header(header.name().as_str(), header.value());
68        }
69
70        let chunk_size = response.inner().body().max_chunk_size();
71        builder.body(ReaderStream::with_capacity(response, chunk_size))
72    }
73
74    pub(crate) fn alt_svc(&self) -> Option<&'static str> {
75        cfg!(feature = "http3-preview").then(|| {
76            static ALT_SVC: state::InitCell<Option<String>> = state::InitCell::new();
77
78            ALT_SVC.get_or_init(|| {
79                let addr = self.endpoints().find_map(|v| v.quic())?;
80                Some(format!("h3=\":{}\"", addr.port()))
81            }).as_deref()
82        })?
83    }
84}
85
86#[tracing::instrument("upgrade", skip_all, fields(protocol = proto))]
87async fn io_handler_task<S>(proto: String, stream: S, mut handler: ErasedIoHandler)
88    where S: Future<Output = io::Result<IoStream>>
89{
90    let stream = match stream.await {
91        Ok(stream) => stream,
92        Err(e) => return warn!(error = %e, "i/o upgrade failed"),
93    };
94
95    debug!("i/o upgrade succeeded");
96    if let Err(e) = handler.take().io(stream).await {
97        match e.kind() {
98            io::ErrorKind::BrokenPipe => warn!("i/o handler closed"),
99            _ => warn!(error = %e, "i/o handler terminated unsuccessfully"),
100        }
101    }
102}
103
104impl Rocket<Ignite> {
105    pub(crate) async fn listen_and_serve<L, R>(
106        self,
107        listener: L,
108        orbit_callback: impl FnOnce(Rocket<Orbit>) -> R,
109    ) -> Result<Arc<Rocket<Orbit>>>
110        where L: Listener + 'static,
111              R: Future<Output = Result<Arc<Rocket<Orbit>>>>
112    {
113        let endpoint = listener.endpoint()?;
114
115        #[cfg(feature = "http3-preview")]
116        if let (Some(addr), Some(tls)) = (endpoint.tcp(), endpoint.tls_config()) {
117            use crate::error::ErrorKind;
118
119            let h3listener = crate::listener::quic::QuicListener::bind(addr, tls.clone())
120                .map_err(|e| ErrorKind::Bind(Some(endpoint.clone()), Box::new(e)))
121                .await?;
122
123            let rocket = self.into_orbit(vec![h3listener.endpoint()?, endpoint]);
124            let rocket = orbit_callback(rocket).await?;
125
126            let http12 = tokio::task::spawn(rocket.clone().serve12(listener));
127            let http3 = tokio::task::spawn(rocket.clone().serve3(h3listener));
128            let (r1, r2) = tokio::join!(http12, http3);
129            r1.map_err(|e| ErrorKind::Liftoff(Err(rocket.clone()), e))??;
130            r2.map_err(|e| ErrorKind::Liftoff(Err(rocket.clone()), e))??;
131            return Ok(rocket);
132        }
133
134        if cfg!(feature = "http3-preview") {
135            warn!("HTTP/3 cannot start without a valid TCP + TLS configuration.\n\
136                Falling back to HTTP/1 + HTTP/2 server.");
137        }
138
139        let rocket = self.into_orbit(vec![endpoint]);
140        let rocket = orbit_callback(rocket).await?;
141        rocket.clone().serve12(listener).await?;
142        Ok(rocket)
143    }
144}
145
146impl Rocket<Orbit> {
147    pub(crate) async fn serve12<L>(self: Arc<Self>, listener: L) -> Result<()>
148        where L: Listener + 'static,
149              L::Connection: AsyncRead + AsyncWrite
150    {
151        let mut builder = Builder::new(TokioExecutor::new());
152        let keep_alive = Duration::from_secs(self.config.keep_alive.into());
153        builder.http1()
154            .half_close(true)
155            .timer(TokioTimer::new())
156            .keep_alive(keep_alive > Duration::ZERO)
157            .preserve_header_case(true)
158            .header_read_timeout(Duration::from_secs(15));
159
160        #[cfg(feature = "http2")] {
161            builder.http2().timer(TokioTimer::new());
162            if keep_alive > Duration::ZERO {
163                builder.http2()
164                    .timer(TokioTimer::new())
165                    .keep_alive_interval(keep_alive / 4)
166                    .keep_alive_timeout(keep_alive);
167            }
168        }
169
170        let (listener, server) = (Arc::new(listener.bounced()), Arc::new(builder));
171        while let Some(accept) = listener.accept().race(self.shutdown()).await.left().transpose()? {
172            let (listener, rocket, server) = (listener.clone(), self.clone(), server.clone());
173            spawn_inspect(|e| log_server_error(&**e), async move {
174                let conn = listener.connect(accept).race_io(rocket.shutdown()).await?;
175                let meta = ConnectionMeta::new(conn.endpoint(), conn.certificates());
176                let service = service_fn(|mut req| {
177                    let upgrade = hyper::upgrade::on(&mut req);
178                    let (parts, incoming) = req.into_parts();
179                    rocket.clone().service(parts, incoming, Some(upgrade), meta.clone())
180                });
181
182                let io = TokioIo::new(conn.cancellable(rocket.shutdown.clone()));
183                let mut server = pin!(server.serve_connection_with_upgrades(io, service));
184                match server.as_mut().race(rocket.shutdown()).await.left() {
185                    Some(result) => result,
186                    None => {
187                        server.as_mut().graceful_shutdown();
188                        server.await
189                    },
190                }
191            });
192        }
193
194        Ok(())
195    }
196
197    #[cfg(feature = "http3-preview")]
198    async fn serve3(self: Arc<Self>, listener: crate::listener::quic::QuicListener) -> Result<()> {
199        let rocket = self.clone();
200        let listener = Arc::new(listener);
201        while let Some(Some(accept)) = listener.accept().race(rocket.shutdown()).await.left() {
202            let (listener, rocket) = (listener.clone(), rocket.clone());
203            spawn_inspect(|e: &io::Error| log_server_error(e), async move {
204                let mut stream = listener.connect(accept).race_io(rocket.shutdown()).await?;
205                while let Some(mut conn) = stream.accept().race_io(rocket.shutdown()).await? {
206                    let rocket = rocket.clone();
207                    spawn_inspect(|e: &io::Error| log_server_error(e), async move {
208                        let meta = ConnectionMeta::new(conn.endpoint(), None);
209                        let rx = conn.rx.cancellable(rocket.shutdown.clone());
210                        let response = rocket.clone()
211                            .service(conn.parts, rx, None, meta)
212                            .map_err(io::Error::other)
213                            .race_io(rocket.shutdown.mercy.clone())
214                            .await?;
215
216                        let grace = rocket.shutdown.grace.clone();
217                        match conn.tx.send_response(response).race(grace).await.left() {
218                            Some(result) => result,
219                            None => Ok(conn.tx.cancel()),
220                        }
221                    });
222                }
223
224                Ok(())
225            });
226        }
227
228        Ok(())
229    }
230}