From dbadb0f9bbb3409f366a9f44ead3854d60676fd3 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 25 Oct 2024 15:34:19 +0100 Subject: [PATCH] proxy: propagate session IDs (#9509) fixes #9367 by sending session IDs to local_proxy, and also returns session IDs to the client for easier debugging. --- proxy/src/serverless/mod.rs | 23 +++++++++++++++++++++-- proxy/src/serverless/sql_over_http.rs | 10 ++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 29ff7b9d91..8fb7a771d9 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -32,6 +32,7 @@ use hyper_util::rt::TokioExecutor; use hyper_util::server::conn::auto::Builder; use rand::rngs::StdRng; use rand::SeedableRng; +use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio::time::timeout; @@ -309,7 +310,18 @@ async fn connection_handler( hyper_util::rt::TokioIo::new(conn), hyper::service::service_fn(move |req: hyper::Request| { // First HTTP request shares the same session ID - let session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4); + let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4); + + if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) { + // take session_id from request, if given. + if let Some(id) = req + .headers() + .get(&NEON_REQUEST_ID) + .and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok()) + { + session_id = id; + } + } // Cancel the current inflight HTTP request if the requets stream is closed. // This is slightly different to `_cancel_connection` in that @@ -335,8 +347,15 @@ async fn connection_handler( .map_ok_or_else(api_error_into_response, |r| r), ); async move { - let res = handler.await; + let mut res = handler.await; cancel_request.disarm(); + + // add the session ID to the response + if let Ok(resp) = &mut res { + resp.headers_mut() + .append(&NEON_REQUEST_ID, uuid_to_header_value(session_id)); + } + res } }), diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 8e2d4c126a..1f3eec6d19 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -23,6 +23,7 @@ use typed_json::json; use url::Url; use urlencoding; use utils::http::error::ApiError; +use uuid::Uuid; use super::backend::{LocalProxyConnError, PoolingBackend}; use super::conn_pool::{AuthData, ConnInfoWithAuth}; @@ -63,6 +64,8 @@ enum Payload { Batch(BatchQueryData), } +pub(super) static NEON_REQUEST_ID: HeaderName = HeaderName::from_static("neon-request-id"); + static CONN_STRING: HeaderName = HeaderName::from_static("neon-connection-string"); static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output"); static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode"); @@ -706,6 +709,12 @@ static HEADERS_TO_FORWARD: &[&HeaderName] = &[ &TXN_DEFERRABLE, ]; +pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue { + let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH]; + HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..])) + .expect("uuid hyphenated format should be all valid header characters") +} + async fn handle_auth_broker_inner( ctx: &RequestMonitoring, request: Request, @@ -732,6 +741,7 @@ async fn handle_auth_broker_inner( req = req.header(h, hv); } } + req = req.header(&NEON_REQUEST_ID, uuid_to_header_value(ctx.session_id())); let req = req .body(body)