1use std::io;
2use std::pin::pin;
3use std::sync::Arc;
4use std::time::Duration;
5
6use hyper::service::service_fn;
7use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
8use hyper_util::server::conn::auto::Builder;
9use futures::{Future, TryFutureExt};
10use tokio::io::{AsyncRead, AsyncWrite};
11
12use crate::{Ignite, Orbit, Request, Rocket};
13use crate::request::ConnectionMeta;
14use crate::erased::{ErasedRequest, ErasedResponse, ErasedIoHandler};
15use crate::listener::{Listener, Connection, BouncedExt, CancellableExt};
16use crate::error::log_server_error;
17use crate::data::{IoStream, RawStream};
18use crate::util::{spawn_inspect, FutureExt, ReaderStream};
19use crate::http::Status;
20use crate::trace::{Trace, TraceAll};
21
22type Result<T, E = crate::Error> = std::result::Result<T, E>;
23
24impl Rocket<Orbit> {
25 #[tracing::instrument("request", skip_all, fields(
26 method = %parts.method,
27 uri = %parts.uri,
28 autohandled
29 ))]
30 async fn service<T: for<'a> Into<RawStream<'a>>>(
31 self: Arc<Self>,
32 parts: http::request::Parts,
33 stream: T,
34 upgrade: Option<hyper::upgrade::OnUpgrade>,
35 connection: ConnectionMeta,
36 ) -> Result<hyper::Response<ReaderStream<ErasedResponse>>, http::Error> {
37 connection.trace_debug();
38 let request = ErasedRequest::new(self, parts, |rocket, parts| {
39 Request::from_hyp(rocket, parts, connection).unwrap_or_else(|e| e)
40 });
41
42 span_debug!("request headers" => request.inner().headers().iter().trace_all_debug());
43 let mut response = request.into_response(
44 stream,
45 |rocket, request, data| Box::pin(rocket.preprocess(request, data)),
46 |token, rocket, request, data| Box::pin(async move {
47 if !request.errors.is_empty() {
48 return rocket.dispatch_error(Status::BadRequest, request).await;
49 }
50
51 rocket.dispatch(token, request, data).await
52 })
53 ).await;
54
55 response.inner().trace_info();
57 span_debug!("response headers" => response.inner().headers().iter().trace_all_debug());
58 let io_handler = response.make_io_handler(Rocket::extract_io_handler);
59 if let (Some((proto, handler)), Some(upgrade)) = (io_handler, upgrade) {
60 let upgrade = upgrade.map_ok(IoStream::from).map_err(io::Error::other);
61 tokio::task::spawn(io_handler_task(proto, upgrade, handler));
62 }
63
64 let mut builder = hyper::Response::builder();
65 builder = builder.status(response.inner().status().code);
66 for header in response.inner().headers().iter() {
67 builder = builder.header(header.name().as_str(), header.value());
68 }
69
70 let chunk_size = response.inner().body().max_chunk_size();
71 builder.body(ReaderStream::with_capacity(response, chunk_size))
72 }
73
74 pub(crate) fn alt_svc(&self) -> Option<&'static str> {
75 cfg!(feature = "http3-preview").then(|| {
76 static ALT_SVC: state::InitCell<Option<String>> = state::InitCell::new();
77
78 ALT_SVC.get_or_init(|| {
79 let addr = self.endpoints().find_map(|v| v.quic())?;
80 Some(format!("h3=\":{}\"", addr.port()))
81 }).as_deref()
82 })?
83 }
84}
85
86#[tracing::instrument("upgrade", skip_all, fields(protocol = proto))]
87async fn io_handler_task<S>(proto: String, stream: S, mut handler: ErasedIoHandler)
88 where S: Future<Output = io::Result<IoStream>>
89{
90 let stream = match stream.await {
91 Ok(stream) => stream,
92 Err(e) => return warn!(error = %e, "i/o upgrade failed"),
93 };
94
95 debug!("i/o upgrade succeeded");
96 if let Err(e) = handler.take().io(stream).await {
97 match e.kind() {
98 io::ErrorKind::BrokenPipe => warn!("i/o handler closed"),
99 _ => warn!(error = %e, "i/o handler terminated unsuccessfully"),
100 }
101 }
102}
103
104impl Rocket<Ignite> {
105 pub(crate) async fn listen_and_serve<L, R>(
106 self,
107 listener: L,
108 orbit_callback: impl FnOnce(Rocket<Orbit>) -> R,
109 ) -> Result<Arc<Rocket<Orbit>>>
110 where L: Listener + 'static,
111 R: Future<Output = Result<Arc<Rocket<Orbit>>>>
112 {
113 let endpoint = listener.endpoint()?;
114
115 #[cfg(feature = "http3-preview")]
116 if let (Some(addr), Some(tls)) = (endpoint.tcp(), endpoint.tls_config()) {
117 use crate::error::ErrorKind;
118
119 let h3listener = crate::listener::quic::QuicListener::bind(addr, tls.clone())
120 .map_err(|e| ErrorKind::Bind(Some(endpoint.clone()), Box::new(e)))
121 .await?;
122
123 let rocket = self.into_orbit(vec![h3listener.endpoint()?, endpoint]);
124 let rocket = orbit_callback(rocket).await?;
125
126 let http12 = tokio::task::spawn(rocket.clone().serve12(listener));
127 let http3 = tokio::task::spawn(rocket.clone().serve3(h3listener));
128 let (r1, r2) = tokio::join!(http12, http3);
129 r1.map_err(|e| ErrorKind::Liftoff(Err(rocket.clone()), e))??;
130 r2.map_err(|e| ErrorKind::Liftoff(Err(rocket.clone()), e))??;
131 return Ok(rocket);
132 }
133
134 if cfg!(feature = "http3-preview") {
135 warn!("HTTP/3 cannot start without a valid TCP + TLS configuration.\n\
136 Falling back to HTTP/1 + HTTP/2 server.");
137 }
138
139 let rocket = self.into_orbit(vec![endpoint]);
140 let rocket = orbit_callback(rocket).await?;
141 rocket.clone().serve12(listener).await?;
142 Ok(rocket)
143 }
144}
145
146impl Rocket<Orbit> {
147 pub(crate) async fn serve12<L>(self: Arc<Self>, listener: L) -> Result<()>
148 where L: Listener + 'static,
149 L::Connection: AsyncRead + AsyncWrite
150 {
151 let mut builder = Builder::new(TokioExecutor::new());
152 let keep_alive = Duration::from_secs(self.config.keep_alive.into());
153 builder.http1()
154 .half_close(true)
155 .timer(TokioTimer::new())
156 .keep_alive(keep_alive > Duration::ZERO)
157 .preserve_header_case(true)
158 .header_read_timeout(Duration::from_secs(15));
159
160 #[cfg(feature = "http2")] {
161 builder.http2().timer(TokioTimer::new());
162 if keep_alive > Duration::ZERO {
163 builder.http2()
164 .timer(TokioTimer::new())
165 .keep_alive_interval(keep_alive / 4)
166 .keep_alive_timeout(keep_alive);
167 }
168 }
169
170 let (listener, server) = (Arc::new(listener.bounced()), Arc::new(builder));
171 while let Some(accept) = listener.accept().race(self.shutdown()).await.left().transpose()? {
172 let (listener, rocket, server) = (listener.clone(), self.clone(), server.clone());
173 spawn_inspect(|e| log_server_error(&**e), async move {
174 let conn = listener.connect(accept).race_io(rocket.shutdown()).await?;
175 let meta = ConnectionMeta::new(conn.endpoint(), conn.certificates());
176 let service = service_fn(|mut req| {
177 let upgrade = hyper::upgrade::on(&mut req);
178 let (parts, incoming) = req.into_parts();
179 rocket.clone().service(parts, incoming, Some(upgrade), meta.clone())
180 });
181
182 let io = TokioIo::new(conn.cancellable(rocket.shutdown.clone()));
183 let mut server = pin!(server.serve_connection_with_upgrades(io, service));
184 match server.as_mut().race(rocket.shutdown()).await.left() {
185 Some(result) => result,
186 None => {
187 server.as_mut().graceful_shutdown();
188 server.await
189 },
190 }
191 });
192 }
193
194 Ok(())
195 }
196
197 #[cfg(feature = "http3-preview")]
198 async fn serve3(self: Arc<Self>, listener: crate::listener::quic::QuicListener) -> Result<()> {
199 let rocket = self.clone();
200 let listener = Arc::new(listener);
201 while let Some(Some(accept)) = listener.accept().race(rocket.shutdown()).await.left() {
202 let (listener, rocket) = (listener.clone(), rocket.clone());
203 spawn_inspect(|e: &io::Error| log_server_error(e), async move {
204 let mut stream = listener.connect(accept).race_io(rocket.shutdown()).await?;
205 while let Some(mut conn) = stream.accept().race_io(rocket.shutdown()).await? {
206 let rocket = rocket.clone();
207 spawn_inspect(|e: &io::Error| log_server_error(e), async move {
208 let meta = ConnectionMeta::new(conn.endpoint(), None);
209 let rx = conn.rx.cancellable(rocket.shutdown.clone());
210 let response = rocket.clone()
211 .service(conn.parts, rx, None, meta)
212 .map_err(io::Error::other)
213 .race_io(rocket.shutdown.mercy.clone())
214 .await?;
215
216 let grace = rocket.shutdown.grace.clone();
217 match conn.tx.send_response(response).race(grace).await.left() {
218 Some(result) => result,
219 None => Ok(conn.tx.cancel()),
220 }
221 });
222 }
223
224 Ok(())
225 });
226 }
227
228 Ok(())
229 }
230}