From fa5d907032f5d55f25ff97f79d09c3f5c2e512f8 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Wed, 30 Oct 2024 11:51:47 +0000 Subject: [PATCH] move up ws handling even more --- proxy/src/serverless/mod.rs | 92 ++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 48 deletions(-) diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 5f2ea773a1..4e22c5a89c 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -177,31 +177,49 @@ pub async fn task_main( .client_connections .guard(crate::metrics::Protocol::Http); - let startup_result = Box::pin(connection_startup( - config, - tls_acceptor, - session_id, - conn, - peer_addr, - )) - .await; - let Some((conn, peer_addr, alpn)) = startup_result else { + let startup_result = + connection_startup(config, tls_acceptor, session_id, conn, peer_addr) + .boxed() + .await; + let Some(conn) = startup_result else { return; }; + let (_, peer_addr, _) = conn; - Box::pin(connection_handler( + let ws_upgrade = http_connection_handler( config, backend, connections2, - cancellation_handler, - endpoint_rate_limiter, conn_token, conn, - peer_addr, - alpn, session_id, - )) + ) + .boxed() .await; + + if let Some((session_id, host, websocket)) = ws_upgrade { + let ctx = RequestMonitoring::new( + session_id, + peer_addr, + crate::metrics::Protocol::Ws, + &config.region, + ); + + let ws = websocket::serve_websocket( + config, + auth_backend, + ctx, + websocket, + cancellation_handler, + endpoint_rate_limiter, + host, + ) + .boxed(); + + if let Err(e) = ws.await { + warn!("error in websocket connection: {e:#}"); + } + } } .instrument(http_conn_span), ); @@ -244,6 +262,7 @@ impl MaybeTlsAcceptor for NoTls { } type Alpn = SmallVec<[u8; 8]>; +type ConnWithInfo = (AsyncRW, IpAddr, Alpn); /// Handles the TCP startup lifecycle. /// 1. Parses PROXY protocol V2 @@ -254,7 +273,7 @@ async fn connection_startup( session_id: uuid::Uuid, conn: TcpStream, peer_addr: SocketAddr, -) -> Option<(AsyncRW, IpAddr, Alpn)> { +) -> Option { // handle PROXY protocol let (conn, peer) = match read_proxy_protocol(conn).await { Ok(c) => c, @@ -328,19 +347,15 @@ impl GracefulShutdown /// 1. With graceful shutdowns /// 2. With graceful request cancellation with connection failure /// 3. With websocket upgrade support. -#[allow(clippy::too_many_arguments)] -async fn connection_handler( +async fn http_connection_handler( config: &'static ProxyConfig, backend: Arc, connections: TaskTracker, - cancellation_handler: Arc, - endpoint_rate_limiter: Arc, cancellation_token: CancellationToken, - conn: AsyncRW, - peer_addr: IpAddr, - alpn: Alpn, + conn: ConnWithInfo, session_id: uuid::Uuid, -) { +) -> Option { + let (conn, peer_addr, alpn) = conn; let session_id = AtomicTake::new(session_id); // Cancel all current inflight HTTP requests if the HTTP connection is closed. @@ -349,7 +364,6 @@ async fn connection_handler( let (ws_tx, ws_rx) = oneshot::channel(); let ws_tx = Arc::new(AtomicTake::new(ws_tx)); - let auth_backend = backend.auth_backend; let http2 = match &*alpn { b"h2" => true, @@ -403,35 +417,17 @@ async fn connection_handler( match res { Ok(()) => { - if let Ok((session_id, host, websocket)) = ws_rx.await { + if let Ok(ws_upgrade) = ws_rx.await { tracing::info!(%peer_addr, "connection upgraded to websockets"); - let ctx = RequestMonitoring::new( - session_id, - peer_addr, - crate::metrics::Protocol::Ws, - &config.region, - ); - - if let Err(e) = websocket::serve_websocket( - config, - auth_backend, - ctx, - websocket, - cancellation_handler, - endpoint_rate_limiter, - host, - ) - .await - { - warn!("error in websocket connection: {e:#}"); - } - } else { - tracing::info!(%peer_addr, "HTTP connection closed"); + return Some(ws_upgrade); } + tracing::info!(%peer_addr, "HTTP connection closed"); } Err(e) => tracing::warn!(%peer_addr, "HTTP connection error {e}"), } + + None } struct ProxyService {