From 26be13067c9eda206ad9df6d3f54d55a05566e51 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 18 Jul 2025 22:21:48 +0100 Subject: [PATCH] [proxy] refactor logging ID system --- Cargo.lock | 14 ++ proxy/Cargo.toml | 1 + proxy/src/binary/pg_sni_router.rs | 7 +- proxy/src/cache/project_info.rs | 3 +- proxy/src/cancellation.rs | 5 +- proxy/src/compute/mod.rs | 5 + proxy/src/console_redirect_proxy.rs | 23 ++-- proxy/src/context/mod.rs | 24 +++- proxy/src/context/parquet.rs | 2 +- proxy/src/control_plane/mod.rs | 6 +- proxy/src/id.rs | 33 +++++ proxy/src/lib.rs | 1 + proxy/src/pglb/mod.rs | 25 ++-- proxy/src/pglb/passthrough.rs | 3 + proxy/src/proxy/connect_compute.rs | 4 +- proxy/src/proxy/mod.rs | 3 + proxy/src/serverless/backend.rs | 41 +++--- proxy/src/serverless/cancel_set.rs | 13 +- proxy/src/serverless/conn_pool.rs | 20 +-- proxy/src/serverless/conn_pool_lib.rs | 19 +-- proxy/src/serverless/http_conn_pool.rs | 20 +-- proxy/src/serverless/http_util.rs | 7 +- proxy/src/serverless/local_conn_pool.rs | 15 ++- proxy/src/serverless/mod.rs | 167 +++++++++++++----------- 24 files changed, 289 insertions(+), 172 deletions(-) create mode 100644 proxy/src/id.rs diff --git a/Cargo.lock b/Cargo.lock index 137b883a6d..0f2e95fda2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5406,6 +5406,7 @@ dependencies = [ "tracing-test", "tracing-utils", "try-lock", + "type-safe-id", "typed-json", "url", "urlencoding", @@ -8087,6 +8088,19 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "type-safe-id" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd9267f90719e0433aae095640b294ff36ccbf89649ecb9ee34464ec504be157" +dependencies = [ + "arrayvec", + "rand 0.9.1", + "serde", + "thiserror 2.0.11", + "uuid", +] + [[package]] name = "typed-json" version = "0.1.1" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 82fe6818e3..3e97898360 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -98,6 +98,7 @@ tracing-log.workspace = true tracing-opentelemetry.workspace = true try-lock.workspace = true typed-json.workspace = true +type-safe-id = { version = "0.3.3", features = ["serde"] } url.workspace = true urlencoding.workspace = true utils.workspace = true diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 4ac8b6a995..28b7b94327 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -26,6 +26,7 @@ use utils::project_git_version; use utils::sentry_init::init_sentry; use crate::context::RequestContext; +use crate::id::{ClientConnId, RequestId}; use crate::metrics::{Metrics, ThreadPoolMetrics}; use crate::pglb::TlsRequired; use crate::pqproto::FeStartupPacket; @@ -219,7 +220,8 @@ pub(super) async fn task_main( { let (socket, peer_addr) = accept_result?; - let session_id = uuid::Uuid::new_v4(); + let conn_id = ClientConnId::new(); + let session_id = RequestId::from_uuid(conn_id.uuid()); let tls_config = Arc::clone(&tls_config); let dest_suffix = Arc::clone(&dest_suffix); let compute_tls_config = compute_tls_config.clone(); @@ -231,6 +233,7 @@ pub(super) async fn task_main( .context("failed to set socket option")?; let ctx = RequestContext::new( + conn_id, session_id, ConnectionInfo { addr: peer_addr, @@ -252,7 +255,7 @@ pub(super) async fn task_main( // Acknowledge that the task has finished with an error. error!("per-client task finished with an error: {e:#}"); }) - .instrument(tracing::info_span!("handle_client", ?session_id)), + .instrument(tracing::info_span!("handle_client", %session_id)), ); } diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 0ef09a8a9a..a6fc04cc43 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -363,11 +363,12 @@ impl ProjectInfoCacheImpl { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::control_plane::messages::{Details, EndpointRateLimitConfig, ErrorInfo, Status}; use crate::control_plane::{AccessBlockerFlags, AuthSecret}; use crate::scram::ServerSecret; - use std::sync::Arc; #[tokio::test] async fn test_project_info_cache_settings() { diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index f25121331f..6310391dd0 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -23,6 +23,7 @@ use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; use crate::error::ReportableError; use crate::ext::LockExt; +use crate::id::RequestId; use crate::metrics::{CancelChannelSizeGuard, CancellationRequest, Metrics, RedisMsgKind}; use crate::pqproto::CancelKeyData; use crate::rate_limiter::LeakyBucketRateLimiter; @@ -486,7 +487,7 @@ impl Session { /// This is not cancel safe pub(crate) async fn maintain_cancel_key( &self, - session_id: uuid::Uuid, + session_id: RequestId, cancel: tokio::sync::oneshot::Receiver, cancel_closure: &CancelClosure, compute_config: &ComputeConfig, @@ -599,7 +600,7 @@ impl Session { .await { tracing::warn!( - ?session_id, + %session_id, ?err, "could not cancel the query in the database" ); diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index 7b9183b05e..ee0e15d0ed 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -25,6 +25,7 @@ use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::messages::MetricsAuxInfo; use crate::error::{ReportableError, UserFacingError}; +use crate::id::ComputeConnId; use crate::metrics::{Metrics, NumDbConnectionsGuard}; use crate::pqproto::StartupMessageParams; use crate::proxy::neon_option; @@ -356,6 +357,7 @@ pub struct PostgresSettings { } pub struct ComputeConnection { + pub compute_conn_id: ComputeConnId, /// Socket connected to a compute node. pub stream: MaybeTlsStream, /// Labels for proxy's metrics. @@ -373,6 +375,7 @@ impl ConnectInfo { ctx: &RequestContext, aux: &MetricsAuxInfo, config: &ComputeConfig, + compute_conn_id: ComputeConnId, ) -> Result { let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute); let (socket_addr, stream) = self.connect_raw(config).await?; @@ -382,6 +385,7 @@ impl ConnectInfo { // TODO: lots of useful info but maybe we can move it elsewhere (eg traces?) info!( + %compute_conn_id, cold_start_info = ctx.cold_start_info().as_str(), "connected to compute node at {} ({socket_addr}) sslmode={:?}, latency={}, query_id={}", self.host, @@ -391,6 +395,7 @@ impl ConnectInfo { ); let connection = ComputeConnection { + compute_conn_id, stream, socket_addr, hostname: self.host.clone(), diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 041a56e032..02f3d66b0e 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -10,7 +10,8 @@ use crate::cancellation::CancellationHandler; use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::error::ReportableError; -use crate::metrics::{Metrics, NumClientConnectionsGuard}; +use crate::id::{ClientConnId, ComputeConnId, RequestId}; +use crate::metrics::{Metrics, NumClientConnectionsGuard, Protocol}; use crate::pglb::ClientRequestError; use crate::pglb::handshake::{HandshakeData, handshake}; use crate::pglb::passthrough::ProxyPassthrough; @@ -42,12 +43,10 @@ pub async fn task_main( { let (socket, peer_addr) = accept_result?; - let conn_gauge = Metrics::get() - .proxy - .client_connections - .guard(crate::metrics::Protocol::Tcp); + let conn_gauge = Metrics::get().proxy.client_connections.guard(Protocol::Tcp); - let session_id = uuid::Uuid::new_v4(); + let conn_id = ClientConnId::new(); + let session_id = RequestId::from_uuid(conn_id.uuid()); let cancellation_handler = Arc::clone(&cancellation_handler); let cancellations = cancellations.clone(); @@ -90,7 +89,7 @@ pub async fn task_main( } } - let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp); + let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Tcp); let res = handle_client( config, @@ -120,13 +119,13 @@ pub async fn task_main( Ok(()) => {} Err(ErrorSource::Client(e)) => { error!( - ?session_id, + %session_id, "per-client task finished with an IO error from the client: {e:#}" ); } Err(ErrorSource::Compute(e)) => { error!( - ?session_id, + %session_id, "per-client task finished with an IO error from the compute: {e:#}" ); } @@ -214,10 +213,14 @@ pub(crate) async fn handle_client( }; auth_info.set_startup_params(¶ms, true); + // for TCP/WS, we have client_id=session_id=compute_id for now. + let compute_conn_id = ComputeConnId::from_uuid(ctx.session_id().uuid()); + let mut node = connect_to_compute( ctx, &TcpMechanism { locks: &config.connect_compute_locks, + compute_conn_id, }, &node_info, config.wake_compute_retry_config, @@ -250,6 +253,8 @@ pub(crate) async fn handle_client( }); Ok(Some(ProxyPassthrough { + compute_conn_id: node.compute_conn_id, + client: stream, compute: node.stream, diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 3a8828e70c..b1f7004ea9 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -9,11 +9,11 @@ use tokio::sync::mpsc; use tracing::field::display; use tracing::{Span, error, info_span}; use try_lock::TryLock; -use uuid::Uuid; use self::parquet::RequestData; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::error::ErrorKind; +use crate::id::{ClientConnId, RequestId}; use crate::intern::{BranchIdInt, ProjectIdInt}; use crate::metrics::{LatencyAccumulated, LatencyTimer, Metrics, Protocol, Waiting}; use crate::pqproto::StartupMessageParams; @@ -40,7 +40,7 @@ pub struct RequestContext( struct RequestContextInner { pub(crate) conn_info: ConnectionInfo, - pub(crate) session_id: Uuid, + pub(crate) session_id: RequestId, pub(crate) protocol: Protocol, first_packet: chrono::DateTime, pub(crate) span: Span, @@ -116,12 +116,18 @@ impl Clone for RequestContext { } impl RequestContext { - pub fn new(session_id: Uuid, conn_info: ConnectionInfo, protocol: Protocol) -> Self { + pub fn new( + conn_id: ClientConnId, + session_id: RequestId, + conn_info: ConnectionInfo, + protocol: Protocol, + ) -> Self { // TODO: be careful with long lived spans let span = info_span!( "connect_request", %protocol, - ?session_id, + %session_id, + %conn_id, %conn_info, ep = tracing::field::Empty, role = tracing::field::Empty, @@ -164,7 +170,13 @@ impl RequestContext { let ip = IpAddr::from([127, 0, 0, 1]); let addr = SocketAddr::new(ip, 5432); let conn_info = ConnectionInfo { addr, extra: None }; - RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp) + let uuid = uuid::Uuid::now_v7(); + RequestContext::new( + ClientConnId::from_uuid(uuid), + RequestId::from_uuid(uuid), + conn_info, + Protocol::Tcp, + ) } pub(crate) fn console_application_name(&self) -> String { @@ -311,7 +323,7 @@ impl RequestContext { self.0.try_lock().expect("should not deadlock").span.clone() } - pub(crate) fn session_id(&self) -> Uuid { + pub(crate) fn session_id(&self) -> RequestId { self.0.try_lock().expect("should not deadlock").session_id } diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 4d8df19476..5ef2cd2074 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -124,7 +124,7 @@ impl serde::Serialize for Options<'_> { impl From<&RequestContextInner> for RequestData { fn from(value: &RequestContextInner) -> Self { Self { - session_id: value.session_id, + session_id: value.session_id.uuid(), peer_addr: value.conn_info.addr.ip().to_string(), timestamp: value.first_packet.naive_utc(), username: value.user.as_deref().map(String::from), diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 9bbd3f4fb7..e3f5cd1131 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -20,6 +20,7 @@ use crate::cache::{Cached, TimedLru}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo}; +use crate::id::ComputeConnId; use crate::intern::{AccountIdInt, EndpointIdInt, ProjectIdInt}; use crate::protocol2::ConnectionInfoExtra; use crate::rate_limiter::{EndpointRateLimiter, LeakyBucketConfig}; @@ -77,8 +78,11 @@ impl NodeInfo { &self, ctx: &RequestContext, config: &ComputeConfig, + compute_conn_id: ComputeConnId, ) -> Result { - self.conn_info.connect(ctx, &self.aux, config).await + self.conn_info + .connect(ctx, &self.aux, config, compute_conn_id) + .await } } diff --git a/proxy/src/id.rs b/proxy/src/id.rs new file mode 100644 index 0000000000..5fccbc23c1 --- /dev/null +++ b/proxy/src/id.rs @@ -0,0 +1,33 @@ +//! Various ID types used by proxy. + +use type_safe_id::{StaticType, TypeSafeId}; + +/// The ID used for the client connection +pub type ClientConnId = TypeSafeId; + +#[derive(Copy, Clone, Default, Hash, PartialEq, Eq)] +pub struct ClientConn; + +impl StaticType for ClientConn { + // This is visible by customers, so we use 'neon' here instead of 'client'. + const TYPE: &'static str = "neon_conn"; +} + +/// The ID used for the compute connection +pub type ComputeConnId = TypeSafeId; + +#[derive(Copy, Clone, Default, Hash, PartialEq, Eq)] +pub struct ComputeConn; + +impl StaticType for ComputeConn { + const TYPE: &'static str = "compute_conn"; +} + +/// The ID used for the request to authenticate +pub type RequestId = TypeSafeId; +#[derive(Copy, Clone, Default, Hash, PartialEq, Eq)] +pub struct Request; + +impl StaticType for Request { + const TYPE: &'static str = "request"; +} diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 263d784e78..1606f5f2f1 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -91,6 +91,7 @@ mod control_plane; mod error; mod ext; mod http; +mod id; mod intern; mod jemalloc; mod logging; diff --git a/proxy/src/pglb/mod.rs b/proxy/src/pglb/mod.rs index c4cab155c5..2cc399a981 100644 --- a/proxy/src/pglb/mod.rs +++ b/proxy/src/pglb/mod.rs @@ -17,7 +17,8 @@ use crate::cancellation::{self, CancellationHandler}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; use crate::context::RequestContext; use crate::error::{ReportableError, UserFacingError}; -use crate::metrics::{Metrics, NumClientConnectionsGuard}; +use crate::id::{ClientConnId, RequestId}; +use crate::metrics::{Metrics, NumClientConnectionsGuard, Protocol}; pub use crate::pglb::copy_bidirectional::ErrorSource; use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake}; use crate::pglb::passthrough::ProxyPassthrough; @@ -65,12 +66,11 @@ pub async fn task_main( { let (socket, peer_addr) = accept_result?; - let conn_gauge = Metrics::get() - .proxy - .client_connections - .guard(crate::metrics::Protocol::Tcp); + let conn_gauge = Metrics::get().proxy.client_connections.guard(Protocol::Tcp); + + let conn_id = ClientConnId::new(); + let session_id = RequestId::from_uuid(conn_id.uuid()); - let session_id = uuid::Uuid::new_v4(); let cancellation_handler = Arc::clone(&cancellation_handler); let cancellations = cancellations.clone(); @@ -114,7 +114,7 @@ pub async fn task_main( } } - let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Tcp); + let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Tcp); let res = handle_connection( config, @@ -142,17 +142,22 @@ pub async fn task_main( Ok(Some(p)) => { ctx.set_success(); let _disconnect = ctx.log_connect(); + let compute_conn_id = p.compute_conn_id; match p.proxy_pass().await { Ok(()) => {} Err(ErrorSource::Client(e)) => { warn!( - ?session_id, + %conn_id, + %session_id, + %compute_conn_id, "per-client task finished with an IO error from the client: {e:#}" ); } Err(ErrorSource::Compute(e)) => { error!( - ?session_id, + %conn_id, + %session_id, + %compute_conn_id, "per-client task finished with an IO error from the compute: {e:#}" ); } @@ -318,6 +323,8 @@ pub(crate) async fn handle_connection( }; Ok(Some(ProxyPassthrough { + compute_conn_id: node.compute_conn_id, + client, compute: node.stream, diff --git a/proxy/src/pglb/passthrough.rs b/proxy/src/pglb/passthrough.rs index d4c029f6d9..7e67af0d2e 100644 --- a/proxy/src/pglb/passthrough.rs +++ b/proxy/src/pglb/passthrough.rs @@ -8,6 +8,7 @@ use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; use crate::compute::MaybeRustlsStream; use crate::control_plane::messages::MetricsAuxInfo; +use crate::id::ComputeConnId; use crate::metrics::{ Direction, Metrics, NumClientConnectionsGuard, NumConnectionRequestsGuard, NumDbConnectionsGuard, @@ -65,6 +66,8 @@ pub(crate) async fn proxy_pass( } pub(crate) struct ProxyPassthrough { + pub(crate) compute_conn_id: ComputeConnId, + pub(crate) client: Stream, pub(crate) compute: MaybeRustlsStream, diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index ce9774e3eb..d181cefe17 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -9,6 +9,7 @@ use crate::control_plane::errors::WakeComputeError; use crate::control_plane::locks::ApiLocks; use crate::control_plane::{self, NodeInfo}; use crate::error::ReportableError; +use crate::id::ComputeConnId; use crate::metrics::{ ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType, }; @@ -51,6 +52,7 @@ pub(crate) trait ConnectMechanism { pub(crate) struct TcpMechanism { /// connect_to_compute concurrency lock pub(crate) locks: &'static ApiLocks, + pub(crate) compute_conn_id: ComputeConnId, } #[async_trait] @@ -70,7 +72,7 @@ impl ConnectMechanism for TcpMechanism { config: &ComputeConfig, ) -> Result { let permit = self.locks.get_permit(&node_info.conn_info.host).await?; - permit.release_result(node_info.connect(ctx, config).await) + permit.release_result(node_info.connect(ctx, config, self.compute_conn_id).await) } } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 02651109e0..5a1e93a5dc 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -24,6 +24,7 @@ use crate::compute::ComputeConnection; use crate::config::ProxyConfig; use crate::context::RequestContext; use crate::control_plane::client::ControlPlaneClient; +use crate::id::ComputeConnId; pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute}; use crate::pglb::{ClientMode, ClientRequestError}; use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams}; @@ -94,6 +95,8 @@ pub(crate) async fn handle_client( let mut attempt = 0; let connect = TcpMechanism { locks: &config.connect_compute_locks, + // for TCP/WS, we have client_id=session_id=compute_id for now. + compute_conn_id: ComputeConnId::from_uuid(ctx.session_id().uuid()), }; let backend = auth::Backend::ControlPlane(cplane, creds.info); diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index daa6429039..3b46f80cae 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -33,6 +33,7 @@ use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError}; use crate::control_plane::locks::ApiLocks; use crate::error::{ErrorKind, ReportableError, UserFacingError}; +use crate::id::{ComputeConnId, RequestId}; use crate::intern::EndpointIdInt; use crate::proxy::connect_compute::ConnectMechanism; use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute}; @@ -161,7 +162,7 @@ impl PoolingBackend { #[tracing::instrument(skip_all, fields( pid = tracing::field::Empty, compute_id = tracing::field::Empty, - conn_id = tracing::field::Empty, + compute_conn_id = tracing::field::Empty, ))] pub(crate) async fn connect_to_compute( &self, @@ -181,14 +182,14 @@ impl PoolingBackend { if let Some(client) = maybe_client { return Ok(client); } - let conn_id = uuid::Uuid::new_v4(); - tracing::Span::current().record("conn_id", display(conn_id)); - info!(%conn_id, "pool: opening a new connection '{conn_info}'"); + let compute_conn_id = ComputeConnId::new(); + tracing::Span::current().record("compute_conn_id", display(compute_conn_id)); + info!(%compute_conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| keys.info); crate::proxy::connect_compute::connect_to_compute( ctx, &TokioMechanism { - conn_id, + compute_conn_id, conn_info, pool: self.pool.clone(), locks: &self.config.connect_compute_locks, @@ -204,7 +205,7 @@ impl PoolingBackend { // Wake up the destination if needed #[tracing::instrument(skip_all, fields( compute_id = tracing::field::Empty, - conn_id = tracing::field::Empty, + compute_conn_id = tracing::field::Empty, ))] pub(crate) async fn connect_to_local_proxy( &self, @@ -216,9 +217,9 @@ impl PoolingBackend { return Ok(client); } - let conn_id = uuid::Uuid::new_v4(); - tracing::Span::current().record("conn_id", display(conn_id)); - debug!(%conn_id, "pool: opening a new connection '{conn_info}'"); + let compute_conn_id = ComputeConnId::new(); + tracing::Span::current().record("compute_conn_id", display(compute_conn_id)); + debug!(%compute_conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| ComputeUserInfo { user: conn_info.user_info.user.clone(), endpoint: EndpointId::from(format!( @@ -230,7 +231,7 @@ impl PoolingBackend { crate::proxy::connect_compute::connect_to_compute( ctx, &HyperMechanism { - conn_id, + compute_conn_id, conn_info, pool: self.http_conn_pool.clone(), locks: &self.config.connect_compute_locks, @@ -251,7 +252,7 @@ impl PoolingBackend { /// Panics if called with a non-local_proxy backend. #[tracing::instrument(skip_all, fields( pid = tracing::field::Empty, - conn_id = tracing::field::Empty, + compute_conn_id = tracing::field::Empty, ))] pub(crate) async fn connect_to_local_postgres( &self, @@ -303,9 +304,9 @@ impl PoolingBackend { } } - let conn_id = uuid::Uuid::new_v4(); - tracing::Span::current().record("conn_id", display(conn_id)); - info!(%conn_id, "local_pool: opening a new connection '{conn_info}'"); + let compute_conn_id = ComputeConnId::new(); + tracing::Span::current().record("compute_conn_id", display(compute_conn_id)); + info!(%compute_conn_id, "local_pool: opening a new connection '{conn_info}'"); let (key, jwk) = create_random_jwk(); @@ -340,7 +341,7 @@ impl PoolingBackend { client, connection, key, - conn_id, + compute_conn_id, local_backend.node_info.aux.clone(), ); @@ -378,7 +379,7 @@ fn create_random_jwk() -> (SigningKey, jose_jwk::Key) { #[derive(Debug, thiserror::Error)] pub(crate) enum HttpConnError { #[error("pooled connection closed at inconsistent state")] - ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError), + ConnectionClosedAbruptly(#[from] tokio::sync::watch::error::SendError), #[error("could not connect to postgres in compute")] PostgresConnectionError(#[from] postgres_client::Error), #[error("could not connect to local-proxy in compute")] @@ -509,7 +510,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError { struct TokioMechanism { pool: Arc>>, conn_info: ConnInfo, - conn_id: uuid::Uuid, + compute_conn_id: ComputeConnId, keys: ComputeCredentialKeys, /// connect_to_compute concurrency lock @@ -561,7 +562,7 @@ impl ConnectMechanism for TokioMechanism { self.conn_info.clone(), client, connection, - self.conn_id, + self.compute_conn_id, node_info.aux.clone(), )) } @@ -570,7 +571,7 @@ impl ConnectMechanism for TokioMechanism { struct HyperMechanism { pool: Arc>>, conn_info: ConnInfo, - conn_id: uuid::Uuid, + compute_conn_id: ComputeConnId, /// connect_to_compute concurrency lock locks: &'static ApiLocks, @@ -620,7 +621,7 @@ impl ConnectMechanism for HyperMechanism { &self.conn_info, client, connection, - self.conn_id, + self.compute_conn_id, node_info.aux.clone(), )) } diff --git a/proxy/src/serverless/cancel_set.rs b/proxy/src/serverless/cancel_set.rs index ba8945afc5..6d19951d82 100644 --- a/proxy/src/serverless/cancel_set.rs +++ b/proxy/src/serverless/cancel_set.rs @@ -10,7 +10,8 @@ use rand::{Rng, thread_rng}; use rustc_hash::FxHasher; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use uuid::Uuid; + +use crate::id::ClientConnId; type Hasher = BuildHasherDefault; @@ -21,7 +22,7 @@ pub struct CancelSet { } pub(crate) struct CancelShard { - tokens: IndexMap, + tokens: IndexMap, } impl CancelSet { @@ -53,7 +54,7 @@ impl CancelSet { .and_then(|len| self.shards[rng % len].lock().take(rng / len)) } - pub(crate) fn insert(&self, id: uuid::Uuid, token: CancellationToken) -> CancelGuard<'_> { + pub(crate) fn insert(&self, id: ClientConnId, token: CancellationToken) -> CancelGuard<'_> { let shard = NonZeroUsize::new(self.shards.len()).map(|len| { let hash = self.hasher.hash_one(id) as usize; let shard = &self.shards[hash % len]; @@ -77,18 +78,18 @@ impl CancelShard { }) } - fn remove(&mut self, id: uuid::Uuid) { + fn remove(&mut self, id: ClientConnId) { self.tokens.swap_remove(&id); } - fn insert(&mut self, id: uuid::Uuid, token: CancellationToken) { + fn insert(&mut self, id: ClientConnId, token: CancellationToken) { self.tokens.insert(id, (Instant::now(), token)); } } pub(crate) struct CancelGuard<'a> { shard: Option<&'a Mutex>, - id: Uuid, + id: ClientConnId, } impl Drop for CancelGuard<'_> { diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 672e59f81f..71c826fd51 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -26,6 +26,7 @@ use super::conn_pool_lib::{ use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::messages::MetricsAuxInfo; +use crate::id::{ComputeConnId, RequestId}; use crate::metrics::Metrics; type TlsStream = >::Stream; @@ -62,14 +63,14 @@ pub(crate) fn poll_client( conn_info: ConnInfo, client: C, mut connection: postgres_client::Connection, - conn_id: uuid::Uuid, + compute_conn_id: ComputeConnId, aux: MetricsAuxInfo, ) -> Client { let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol()); let mut session_id = ctx.session_id(); let (tx, mut rx) = tokio::sync::watch::channel(session_id); - let span = info_span!(parent: None, "connection", %conn_id); + let span = info_span!(parent: None, "connection", %compute_conn_id); let cold_start_info = ctx.cold_start_info(); span.in_scope(|| { info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); @@ -117,7 +118,7 @@ pub(crate) fn poll_client( if let Some(pool) = pool.clone().upgrade() { // remove client from pool - should close the connection if it's idle. // does nothing if the client is currently checked-out and in-use - if pool.write().remove_client(db_user.clone(), conn_id) { + if pool.write().remove_client(db_user.clone(), compute_conn_id) { info!("idle connection removed"); } } @@ -149,7 +150,7 @@ pub(crate) fn poll_client( // remove from connection pool if let Some(pool) = pool.clone().upgrade() - && pool.write().remove_client(db_user.clone(), conn_id) { + && pool.write().remove_client(db_user.clone(), compute_conn_id) { info!("closed connection removed"); } @@ -161,7 +162,7 @@ pub(crate) fn poll_client( let inner = ClientInnerCommon { inner: client, aux, - conn_id, + compute_conn_id, data: ClientDataEnum::Remote(ClientDataRemote { session: tx, cancel, @@ -173,12 +174,12 @@ pub(crate) fn poll_client( #[derive(Clone)] pub(crate) struct ClientDataRemote { - session: tokio::sync::watch::Sender, + session: tokio::sync::watch::Sender, cancel: CancellationToken, } impl ClientDataRemote { - pub fn session(&mut self) -> &mut tokio::sync::watch::Sender { + pub fn session(&mut self) -> &mut tokio::sync::watch::Sender { &mut self.session } @@ -192,6 +193,7 @@ mod tests { use std::sync::atomic::AtomicBool; use super::*; + use crate::id::ComputeConnId; use crate::proxy::NeonOptions; use crate::serverless::cancel_set::CancelSet; use crate::types::{BranchId, EndpointId, ProjectId}; @@ -225,9 +227,9 @@ mod tests { compute_id: "compute".into(), cold_start_info: crate::control_plane::messages::ColdStartInfo::Warm, }, - conn_id: uuid::Uuid::new_v4(), + compute_conn_id: ComputeConnId::new(), data: ClientDataEnum::Remote(ClientDataRemote { - session: tokio::sync::watch::Sender::new(uuid::Uuid::new_v4()), + session: tokio::sync::watch::Sender::new(RequestId::new()), cancel: CancellationToken::new(), }), } diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 42a3ea17a2..ab83bc28f2 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -19,6 +19,7 @@ use super::local_conn_pool::ClientDataLocal; use crate::auth::backend::ComputeUserInfo; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; +use crate::id::ComputeConnId; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::protocol2::ConnectionInfoExtra; use crate::types::{DbName, EndpointCacheKey, RoleName}; @@ -58,7 +59,7 @@ pub(crate) enum ClientDataEnum { pub(crate) struct ClientInnerCommon { pub(crate) inner: C, pub(crate) aux: MetricsAuxInfo, - pub(crate) conn_id: uuid::Uuid, + pub(crate) compute_conn_id: ComputeConnId, pub(crate) data: ClientDataEnum, // custom client data like session, key, jti } @@ -77,8 +78,8 @@ impl Drop for ClientInnerCommon { } impl ClientInnerCommon { - pub(crate) fn get_conn_id(&self) -> uuid::Uuid { - self.conn_id + pub(crate) fn get_conn_id(&self) -> ComputeConnId { + self.compute_conn_id } pub(crate) fn get_data(&mut self) -> &mut ClientDataEnum { @@ -144,7 +145,7 @@ impl EndpointConnPool { pub(crate) fn remove_client( &mut self, db_user: (DbName, RoleName), - conn_id: uuid::Uuid, + conn_id: ComputeConnId, ) -> bool { let Self { pools, @@ -189,7 +190,7 @@ impl EndpointConnPool { } pub(crate) fn put(pool: &RwLock, conn_info: &ConnInfo, client: ClientInnerCommon) { - let conn_id = client.get_conn_id(); + let compute_conn_id = client.get_conn_id(); let (max_conn, conn_count, pool_name) = { let pool = pool.read(); ( @@ -201,12 +202,12 @@ impl EndpointConnPool { }; if client.inner.is_closed() { - info!(%conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name); + info!(%compute_conn_id, "{}: throwing away connection '{conn_info}' because connection is closed", pool_name); return; } if conn_count >= max_conn { - info!(%conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name); + info!(%compute_conn_id, "{}: throwing away connection '{conn_info}' because pool is full", pool_name); return; } @@ -241,9 +242,9 @@ impl EndpointConnPool { // do logging outside of the mutex if returned { - debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); + debug!(%compute_conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); } else { - info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); + info!(%compute_conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); } } } diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index 7acd816026..f0108f096f 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -18,6 +18,7 @@ use super::conn_pool_lib::{ }; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; +use crate::id::ComputeConnId; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::protocol2::ConnectionInfoExtra; use crate::types::EndpointCacheKey; @@ -65,7 +66,7 @@ impl HttpConnPool { } } - fn remove_conn(&mut self, conn_id: uuid::Uuid) -> bool { + fn remove_conn(&mut self, conn_id: ComputeConnId) -> bool { let Self { conns, global_connections_count, @@ -73,7 +74,7 @@ impl HttpConnPool { } = self; let old_len = conns.len(); - conns.retain(|entry| entry.conn.conn_id != conn_id); + conns.retain(|entry| entry.conn.compute_conn_id != conn_id); let new_len = conns.len(); let removed = old_len - new_len; if removed > 0 { @@ -135,7 +136,10 @@ impl GlobalConnPool> { return result; }; - tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id)); + tracing::Span::current().record( + "conn_id", + tracing::field::display(client.conn.compute_conn_id), + ); debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" @@ -194,13 +198,13 @@ pub(crate) fn poll_http2_client( conn_info: &ConnInfo, client: Send, connection: Connect, - conn_id: uuid::Uuid, + compute_conn_id: ComputeConnId, aux: MetricsAuxInfo, ) -> Client { let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol()); let session_id = ctx.session_id(); - let span = info_span!(parent: None, "connection", %conn_id); + let span = info_span!(parent: None, "connection", %compute_conn_id); let cold_start_info = ctx.cold_start_info(); span.in_scope(|| { info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); @@ -212,7 +216,7 @@ pub(crate) fn poll_http2_client( let client = ClientInnerCommon { inner: client.clone(), aux: aux.clone(), - conn_id, + compute_conn_id, data: ClientDataEnum::Http(ClientDataHttp()), }; pool.write().conns.push_back(ConnPoolEntry { @@ -241,7 +245,7 @@ pub(crate) fn poll_http2_client( // remove from connection pool if let Some(pool) = pool.clone().upgrade() - && pool.write().remove_conn(conn_id) + && pool.write().remove_conn(compute_conn_id) { info!("closed connection removed"); } @@ -252,7 +256,7 @@ pub(crate) fn poll_http2_client( let client = ClientInnerCommon { inner: client, aux, - conn_id, + compute_conn_id, data: ClientDataEnum::Http(ClientDataHttp()), }; diff --git a/proxy/src/serverless/http_util.rs b/proxy/src/serverless/http_util.rs index 0c91ac6835..4543bdb5d7 100644 --- a/proxy/src/serverless/http_util.rs +++ b/proxy/src/serverless/http_util.rs @@ -10,7 +10,6 @@ use http_body_util::{BodyExt, Full}; use http_utils::error::ApiError; use serde::Serialize; use url::Url; -use uuid::Uuid; use super::conn_pool::{AuthData, ConnInfoWithAuth}; use super::conn_pool_lib::ConnInfo; @@ -18,6 +17,7 @@ use super::error::{ConnInfoError, Credentials}; use crate::auth::backend::ComputeUserInfo; use crate::config::AuthenticationConfig; use crate::context::RequestContext; +use crate::id::RequestId; use crate::metrics::{Metrics, SniGroup, SniKind}; use crate::pqproto::StartupMessageParams; use crate::proxy::NeonOptions; @@ -34,9 +34,8 @@ pub(super) static TXN_ISOLATION_LEVEL: HeaderName = pub(super) static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only"); pub(super) static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable"); -pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue { - let mut uuid = [0; uuid::fmt::Hyphenated::LENGTH]; - HeaderValue::from_str(id.as_hyphenated().encode_lower(&mut uuid[..])) +pub(crate) fn uuid_to_header_value(id: RequestId) -> HeaderValue { + HeaderValue::from_maybe_shared(Bytes::from(id.to_string().into_bytes())) .expect("uuid hyphenated format should be all valid header characters") } diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index e4cbd02bfe..db6060db0f 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -40,6 +40,7 @@ use super::conn_pool_lib::{ use super::sql_over_http::SqlOverHttpError; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; +use crate::id::{ComputeConnId, RequestId}; use crate::metrics::Metrics; pub(crate) const EXT_NAME: &str = "pg_session_jwt"; @@ -48,14 +49,14 @@ pub(crate) const EXT_SCHEMA: &str = "auth"; #[derive(Clone)] pub(crate) struct ClientDataLocal { - session: tokio::sync::watch::Sender, + session: tokio::sync::watch::Sender, cancel: CancellationToken, key: SigningKey, jti: u64, } impl ClientDataLocal { - pub fn session(&mut self) -> &mut tokio::sync::watch::Sender { + pub fn session(&mut self) -> &mut tokio::sync::watch::Sender { &mut self.session } @@ -167,14 +168,14 @@ pub(crate) fn poll_client( client: C, mut connection: postgres_client::Connection, key: SigningKey, - conn_id: uuid::Uuid, + compute_conn_id: ComputeConnId, aux: MetricsAuxInfo, ) -> Client { let conn_gauge = Metrics::get().proxy.db_connections.guard(ctx.protocol()); let mut session_id = ctx.session_id(); let (tx, mut rx) = tokio::sync::watch::channel(session_id); - let span = info_span!(parent: None, "connection", %conn_id); + let span = info_span!(parent: None, "connection", %compute_conn_id); let cold_start_info = ctx.cold_start_info(); span.in_scope(|| { info!(cold_start_info = cold_start_info.as_str(), %conn_info, %session_id, "new connection"); @@ -218,7 +219,7 @@ pub(crate) fn poll_client( if let Some(pool) = pool.clone().upgrade() { // remove client from pool - should close the connection if it's idle. // does nothing if the client is currently checked-out and in-use - if pool.global_pool.write().remove_client(db_user.clone(), conn_id) { + if pool.global_pool.write().remove_client(db_user.clone(), compute_conn_id) { info!("idle connection removed"); } } @@ -250,7 +251,7 @@ pub(crate) fn poll_client( // remove from connection pool if let Some(pool) = pool.clone().upgrade() - && pool.global_pool.write().remove_client(db_user.clone(), conn_id) { + && pool.global_pool.write().remove_client(db_user.clone(), compute_conn_id) { info!("closed connection removed"); } @@ -263,7 +264,7 @@ pub(crate) fn poll_client( let inner = ClientInnerCommon { inner: client, aux, - conn_id, + compute_conn_id, data: ClientDataEnum::Local(ClientDataLocal { session: tx, cancel, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 5b7289c53d..9b7b13c8f4 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -16,16 +16,16 @@ mod websocket; use std::net::{IpAddr, SocketAddr}; use std::pin::{Pin, pin}; +use std::str::FromStr; use std::sync::Arc; use anyhow::Context; use arc_swap::ArcSwapOption; use async_trait::async_trait; -use atomic_take::AtomicTake; use bytes::Bytes; pub use conn_pool_lib::GlobalConnPoolOptions; -use futures::TryFutureExt; use futures::future::{Either, select}; +use futures::{FutureExt, TryFutureExt}; use http::{Method, Response, StatusCode}; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty}; @@ -48,7 +48,8 @@ use crate::cancellation::CancellationHandler; use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::ext::TaskExt; -use crate::metrics::Metrics; +use crate::id::{ClientConnId, RequestId}; +use crate::metrics::{Metrics, Protocol}; use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; use crate::rate_limiter::EndpointRateLimiter; use crate::serverless::backend::PoolingBackend; @@ -131,13 +132,12 @@ pub async fn task_main( tracing::error!("could not set nodelay: {e}"); continue; } - let conn_id = uuid::Uuid::new_v4(); - let http_conn_span = tracing::info_span!("http_conn", ?conn_id); + let conn_id = ClientConnId::new(); let n_connections = Metrics::get() .proxy .client_connections - .sample(crate::metrics::Protocol::Http); + .sample(Protocol::Http); tracing::trace!(?n_connections, threshold = ?config.http_config.client_conn_threshold, "check"); if n_connections > config.http_config.client_conn_threshold { tracing::trace!("attempting to cancel a random connection"); @@ -154,46 +154,41 @@ pub async fn task_main( let cancellation_handler = cancellation_handler.clone(); let endpoint_rate_limiter = endpoint_rate_limiter.clone(); let cancellations = cancellations.clone(); - connections.spawn( - async move { - let conn_token2 = conn_token.clone(); - let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token2); + connections.spawn(async move { + let conn_token2 = conn_token.clone(); + let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token2); - let session_id = uuid::Uuid::new_v4(); + let _gauge = Metrics::get() + .proxy + .client_connections + .guard(Protocol::Http); - let _gauge = Metrics::get() - .proxy - .client_connections - .guard(crate::metrics::Protocol::Http); + let startup_result = Box::pin(connection_startup( + config, + tls_acceptor, + conn_id, + conn, + peer_addr, + )) + .await; + let Some((conn, conn_info)) = startup_result else { + return; + }; - let startup_result = Box::pin(connection_startup( - config, - tls_acceptor, - session_id, - conn, - peer_addr, - )) - .await; - let Some((conn, conn_info)) = startup_result else { - return; - }; - - Box::pin(connection_handler( - config, - backend, - connections2, - cancellations, - cancellation_handler, - endpoint_rate_limiter, - conn_token, - conn, - conn_info, - session_id, - )) - .await; - } - .instrument(http_conn_span), - ); + Box::pin(connection_handler( + config, + backend, + connections2, + cancellations, + cancellation_handler, + endpoint_rate_limiter, + conn_token, + conn, + conn_info, + conn_id, + )) + .await; + }); } connections.wait().await; @@ -230,7 +225,7 @@ impl MaybeTlsAcceptor for &'static ArcSwapOption { async fn connection_startup( config: &ProxyConfig, tls_acceptor: Arc, - session_id: uuid::Uuid, + conn_id: ClientConnId, conn: TcpStream, peer_addr: SocketAddr, ) -> Option<(AsyncRW, ConnectionInfo)> { @@ -265,12 +260,12 @@ async fn connection_startup( IpAddr::V4(ip) => ip.is_private(), IpAddr::V6(_) => false, }; - info!(?session_id, %conn_info, "accepted new TCP connection"); + info!(%conn_id, %conn_info, "accepted new TCP connection"); // try upgrade to TLS, but with a timeout. let conn = match timeout(config.handshake_timeout, tls_acceptor.accept(conn)).await { Ok(Ok(conn)) => { - info!(?session_id, %conn_info, "accepted new TLS connection"); + info!(%conn_id, %conn_info, "accepted new TLS connection"); conn } // The handshake failed @@ -278,7 +273,7 @@ async fn connection_startup( if !has_private_peer_addr { Metrics::get().proxy.tls_handshake_failures.inc(); } - warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}"); + warn!(%conn_id, %conn_info, "failed to accept TLS connection: {e:?}"); return None; } // The handshake timed out @@ -286,7 +281,7 @@ async fn connection_startup( if !has_private_peer_addr { Metrics::get().proxy.tls_handshake_failures.inc(); } - warn!(?session_id, %conn_info, "failed to accept TLS connection: {e:?}"); + warn!(%conn_id, %conn_info, "failed to accept TLS connection: {e:?}"); return None; } }; @@ -309,10 +304,8 @@ async fn connection_handler( cancellation_token: CancellationToken, conn: AsyncRW, conn_info: ConnectionInfo, - session_id: uuid::Uuid, + conn_id: ClientConnId, ) { - let session_id = AtomicTake::new(session_id); - // Cancel all current inflight HTTP requests if the HTTP connection is closed. let http_cancellation_token = CancellationToken::new(); let _cancel_connection = http_cancellation_token.clone().drop_guard(); @@ -322,20 +315,6 @@ async fn connection_handler( let conn = server.serve_connection_with_upgrades( hyper_util::rt::TokioIo::new(conn), hyper::service::service_fn(move |req: hyper::Request| { - // First HTTP request shares the same session ID - let mut session_id = session_id.take().unwrap_or_else(uuid::Uuid::new_v4); - - if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) { - // take session_id from request, if given. - if let Some(id) = req - .headers() - .get(&NEON_REQUEST_ID) - .and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok()) - { - session_id = id; - } - } - // Cancel the current inflight HTTP request if the requets stream is closed. // This is slightly different to `_cancel_connection` in that // h2 can cancel individual requests with a `RST_STREAM`. @@ -352,7 +331,7 @@ async fn connection_handler( backend.clone(), connections.clone(), cancellation_handler.clone(), - session_id, + conn_id, conn_info2.clone(), http_request_token, endpoint_rate_limiter.clone(), @@ -362,15 +341,8 @@ async fn connection_handler( .map_ok_or_else(api_error_into_response, |r| r), ); async move { - let mut res = handler.await; + let res = handler.await; cancel_request.disarm(); - - // add the session ID to the response - if let Ok(resp) = &mut res { - resp.headers_mut() - .append(&NEON_REQUEST_ID, uuid_to_header_value(session_id)); - } - res } }), @@ -392,6 +364,44 @@ async fn connection_handler( } } +fn get_request_id(backend: &PoolingBackend, req: &hyper::Request) -> RequestId { + if matches!(backend.auth_backend, crate::auth::Backend::Local(_)) { + // take session_id from request, if given. + + if let Some(id) = req + .headers() + .get(&NEON_REQUEST_ID) + .and_then(|id| uuid::Uuid::try_parse_ascii(id.as_bytes()).ok()) + { + return RequestId::from_uuid(id); + } + + if let Some(id) = req + .headers() + .get(&NEON_REQUEST_ID) + .and_then(|id| id.to_str().ok()) + .and_then(|id| RequestId::from_str(id).ok()) + { + return id; + } + } + + RequestId::new() +} + +fn set_request_id( + mut res: Result, E>, + session_id: RequestId, +) -> Result, E> { + // add the session ID to the response + if let Ok(resp) = &mut res { + resp.headers_mut() + .append(&NEON_REQUEST_ID, uuid_to_header_value(session_id)); + } + + res +} + #[allow(clippy::too_many_arguments)] async fn request_handler( mut request: hyper::Request, @@ -399,7 +409,7 @@ async fn request_handler( backend: Arc, ws_connections: TaskTracker, cancellation_handler: Arc, - session_id: uuid::Uuid, + conn_id: ClientConnId, conn_info: ConnectionInfo, // used to cancel in-flight HTTP requests. not used to cancel websockets http_cancellation_token: CancellationToken, @@ -417,7 +427,8 @@ async fn request_handler( if config.http_config.accept_websockets && framed_websockets::upgrade::is_upgrade_request(&request) { - let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Ws); + let session_id = RequestId::from_uuid(conn_id.uuid()); + let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Ws); ctx.set_user_agent( request @@ -457,7 +468,8 @@ async fn request_handler( // Return the response so the spawned future can continue. Ok(response.map(|b| b.map_err(|x| match x {}).boxed())) } else if request.uri().path() == "/sql" && *request.method() == Method::POST { - let ctx = RequestContext::new(session_id, conn_info, crate::metrics::Protocol::Http); + let session_id = get_request_id(&backend, &request); + let ctx = RequestContext::new(conn_id, session_id, conn_info, Protocol::Http); let span = ctx.span(); let testodrome_id = request @@ -473,6 +485,7 @@ async fn request_handler( sql_over_http::handle(config, ctx, request, backend, http_cancellation_token) .instrument(span) + .map(|res| set_request_id(res, session_id)) .await } else if request.uri().path() == "/sql" && *request.method() == Method::OPTIONS { Response::builder()