diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 366c7ef1e9..16bdc1983b 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -14,6 +14,7 @@ mod local_conn_pool; mod sql_over_http; mod websocket; +use std::future::Future; use std::net::{IpAddr, SocketAddr}; use std::pin::{pin, Pin}; use std::sync::Arc; @@ -30,8 +31,7 @@ use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty}; use hyper::body::Incoming; use hyper::upgrade::OnUpgrade; -use hyper_util::rt::TokioExecutor; -use hyper_util::server::conn::auto::Builder; +use hyper_util::rt::{TokioExecutor, TokioIo}; use rand::rngs::StdRng; use rand::SeedableRng; use smallvec::SmallVec; @@ -43,7 +43,7 @@ use tokio::time::timeout; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use tracing::{info, warn, Instrument}; +use tracing::{debug, info, warn, Instrument}; use utils::http::error::ApiError; use crate::cancellation::CancellationHandlerMain; @@ -296,6 +296,57 @@ async fn connection_startup( Some((conn, peer_addr, alpn)) } +trait GracefulShutdown: Future> + Send { + fn graceful_shutdown(self: Pin<&mut Self>); +} + +impl GracefulShutdown + for hyper::server::conn::http1::UpgradeableConnection, S> +where + S: hyper::service::HttpService + Send, + S::Future: Send + 'static, + B: hyper::body::Body + Send + 'static, + B::Data: Send + 'static, + + S::Error: Into>, + B::Error: Into>, +{ + fn graceful_shutdown(self: Pin<&mut Self>) { + self.graceful_shutdown(); + } +} + +impl GracefulShutdown for hyper::server::conn::http1::Connection, S> +where + S: hyper::service::HttpService + Send, + S::Future: Send + 'static, + B: hyper::body::Body + Send + 'static, + B::Data: Send + 'static, + + S::Error: Into>, + B::Error: Into>, +{ + fn graceful_shutdown(self: Pin<&mut Self>) { + self.graceful_shutdown(); + } +} + +impl GracefulShutdown + for hyper::server::conn::http2::Connection, S, TokioExecutor> +where + S: hyper::service::HttpService + Send, + S::Future: Send + 'static, + B: hyper::body::Body + Send + 'static, + B::Data: Send + 'static, + + S::Error: Into>, + B::Error: Into>, +{ + fn graceful_shutdown(self: Pin<&mut Self>) { + hyper::server::conn::http2::Connection::graceful_shutdown(self); + } +} + /// Handles HTTP connection /// 1. With graceful shutdowns /// 2. With graceful request cancellation with connection failure @@ -310,7 +361,7 @@ async fn connection_handler( cancellation_token: CancellationToken, conn: AsyncRW, peer_addr: IpAddr, - _alpn: Alpn, + alpn: Alpn, session_id: uuid::Uuid, ) { let session_id = AtomicTake::new(session_id); @@ -322,6 +373,15 @@ async fn connection_handler( let (ws_tx, mut ws_rx) = mpsc::channel(1); let auth_backend = backend.auth_backend; + let http2 = match &*alpn { + b"h2" => true, + b"http/1.1" => false, + _ => { + debug!("no alpn negotiated"); + false + } + }; + let service = hyper::service::service_fn(move |req: hyper::Request| { // First HTTP request shares the same session ID let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4); @@ -372,12 +432,24 @@ async fn connection_handler( } }); - let server = Builder::new(TokioExecutor::new()); + let io = hyper_util::rt::TokioIo::new(conn); + let conn = if http2 { + let conn = hyper::server::conn::http2::Builder::new(TokioExecutor::new()) + .serve_connection(io, service); - let conn = server.serve_connection_with_upgrades(hyper_util::rt::TokioIo::new(conn), service); + Box::pin(conn) as Pin> + } else { + let serve = hyper::server::conn::http1::Builder::new().serve_connection(io, service); + + if config.http_config.accept_websockets { + Box::pin(serve.with_upgrades()) as Pin> + } else { + Box::pin(serve) as Pin> + } + }; // On cancellation, trigger the HTTP connection handler to shut down. - let res = match select(pin!(cancellation_token.cancelled()), pin!(conn)).await { + let res = match select(pin!(cancellation_token.cancelled()), conn).await { Either::Left((_cancelled, mut conn)) => { tracing::debug!(%peer_addr, "cancelling connection"); conn.as_mut().graceful_shutdown();