From bf7d859a8bdb26a6ac4ce1c17fec948d7bcecdcb Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Wed, 20 Nov 2024 13:50:36 +0100 Subject: [PATCH] proxy: Rename RequestMonitoring to RequestContext (#9805) ## Problem It is called context/ctx everywhere and the Monitoring suffix needlessly confuses with proper monitoring code. ## Summary of changes * Rename RequestMonitoring to RequestContext * Rename RequestMonitoringInner to RequestContextInner --- proxy/src/auth/backend/classic.rs | 4 +-- proxy/src/auth/backend/console_redirect.rs | 8 +++--- proxy/src/auth/backend/hacks.rs | 6 ++-- proxy/src/auth/backend/jwt.rs | 26 +++++++++--------- proxy/src/auth/backend/local.rs | 4 +-- proxy/src/auth/backend/mod.rs | 32 +++++++++++----------- proxy/src/auth/credentials.rs | 30 ++++++++++---------- proxy/src/auth/flow.rs | 4 +-- proxy/src/bin/pg_sni_router.rs | 8 +++--- proxy/src/cache/endpoints.rs | 4 +-- proxy/src/compute.rs | 4 +-- proxy/src/console_redirect_proxy.rs | 6 ++-- proxy/src/context/mod.rs | 22 +++++++-------- proxy/src/context/parquet.rs | 6 ++-- proxy/src/control_plane/client/mock.rs | 10 +++---- proxy/src/control_plane/client/mod.rs | 12 ++++---- proxy/src/control_plane/client/neon.rs | 16 +++++------ proxy/src/control_plane/mod.rs | 12 ++++---- proxy/src/proxy/connect_compute.rs | 10 +++---- proxy/src/proxy/handshake.rs | 4 +-- proxy/src/proxy/mod.rs | 6 ++-- proxy/src/proxy/tests/mitm.rs | 2 +- proxy/src/proxy/tests/mod.rs | 27 +++++++++--------- proxy/src/proxy/wake_compute.rs | 4 +-- proxy/src/serverless/backend.rs | 16 +++++------ proxy/src/serverless/conn_pool.rs | 4 +-- proxy/src/serverless/conn_pool_lib.rs | 4 +-- proxy/src/serverless/http_conn_pool.rs | 6 ++-- proxy/src/serverless/local_conn_pool.rs | 6 ++-- proxy/src/serverless/mod.rs | 6 ++-- proxy/src/serverless/sql_over_http.rs | 12 ++++---- proxy/src/serverless/websocket.rs | 4 +-- 32 files changed, 162 insertions(+), 163 deletions(-) diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index 87a02133c8..491b272ac4 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -5,13 +5,13 @@ use super::{ComputeCredentials, ComputeUserInfo}; use crate::auth::backend::ComputeCredentialKeys; use crate::auth::{self, AuthFlow}; use crate::config::AuthenticationConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::AuthSecret; use crate::stream::{PqStream, Stream}; use crate::{compute, sasl}; pub(super) async fn authenticate( - ctx: &RequestMonitoring, + ctx: &RequestContext, creds: ComputeUserInfo, client: &mut PqStream>, config: &'static AuthenticationConfig, diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index e25dc3d45e..5772471486 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -8,7 +8,7 @@ use tracing::{info, info_span}; use super::ComputeCredentialKeys; use crate::cache::Cached; use crate::config::AuthenticationConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::error::{ReportableError, UserFacingError}; use crate::proxy::connect_compute::ComputeConnectBackend; @@ -71,7 +71,7 @@ impl ConsoleRedirectBackend { pub(crate) async fn authenticate( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, auth_config: &'static AuthenticationConfig, client: &mut PqStream, ) -> auth::Result { @@ -87,7 +87,7 @@ pub struct ConsoleRedirectNodeInfo(pub(super) NodeInfo); impl ComputeConnectBackend for ConsoleRedirectNodeInfo { async fn wake_compute( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, ) -> Result { Ok(Cached::new_uncached(self.0.clone())) } @@ -98,7 +98,7 @@ impl ComputeConnectBackend for ConsoleRedirectNodeInfo { } async fn authenticate( - ctx: &RequestMonitoring, + ctx: &RequestContext, auth_config: &'static AuthenticationConfig, link_uri: &reqwest::Url, client: &mut PqStream, diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index e651df1d34..3316543022 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -4,7 +4,7 @@ use tracing::{debug, info}; use super::{ComputeCredentials, ComputeUserInfo, ComputeUserInfoNoEndpoint}; use crate::auth::{self, AuthFlow}; use crate::config::AuthenticationConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::AuthSecret; use crate::intern::EndpointIdInt; use crate::sasl; @@ -15,7 +15,7 @@ use crate::stream::{self, Stream}; /// These properties are benefical for serverless JS workers, so we /// use this mechanism for websocket connections. pub(crate) async fn authenticate_cleartext( - ctx: &RequestMonitoring, + ctx: &RequestContext, info: ComputeUserInfo, client: &mut stream::PqStream>, secret: AuthSecret, @@ -57,7 +57,7 @@ pub(crate) async fn authenticate_cleartext( /// Similar to [`authenticate_cleartext`], but there's a specific password format, /// and passwords are not yet validated (we don't know how to validate them!) pub(crate) async fn password_hack_no_authentication( - ctx: &RequestMonitoring, + ctx: &RequestContext, info: ComputeUserInfoNoEndpoint, client: &mut stream::PqStream>, ) -> auth::Result<(ComputeUserInfo, Vec)> { diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index bfc674139b..f721d81aa2 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -17,7 +17,7 @@ use thiserror::Error; use tokio::time::Instant; use crate::auth::backend::ComputeCredentialKeys; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::errors::GetEndpointJwksError; use crate::http::read_body_with_limit; use crate::intern::RoleNameInt; @@ -39,7 +39,7 @@ const JWKS_FETCH_RETRIES: u32 = 3; pub(crate) trait FetchAuthRules: Clone + Send + Sync + 'static { fn fetch_auth_rules( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, ) -> impl Future, FetchAuthRulesError>> + Send; } @@ -144,7 +144,7 @@ impl JwkCacheEntryLock { async fn renew_jwks( &self, _permit: JwkRenewalPermit<'_>, - ctx: &RequestMonitoring, + ctx: &RequestContext, client: &reqwest_middleware::ClientWithMiddleware, endpoint: EndpointId, auth_rules: &F, @@ -261,7 +261,7 @@ impl JwkCacheEntryLock { async fn get_or_update_jwk_cache( self: &Arc, - ctx: &RequestMonitoring, + ctx: &RequestContext, client: &reqwest_middleware::ClientWithMiddleware, endpoint: EndpointId, fetch: &F, @@ -314,7 +314,7 @@ impl JwkCacheEntryLock { async fn check_jwt( self: &Arc, - ctx: &RequestMonitoring, + ctx: &RequestContext, jwt: &str, client: &reqwest_middleware::ClientWithMiddleware, endpoint: EndpointId, @@ -409,7 +409,7 @@ impl JwkCacheEntryLock { impl JwkCache { pub(crate) async fn check_jwt( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, role_name: &RoleName, fetch: &F, @@ -941,7 +941,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL impl FetchAuthRules for Fetch { async fn fetch_auth_rules( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _endpoint: EndpointId, ) -> Result, FetchAuthRulesError> { Ok(self.0.clone()) @@ -1039,7 +1039,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL for token in &tokens { jwk_cache .check_jwt( - &RequestMonitoring::test(), + &RequestContext::test(), endpoint.clone(), role, &fetch, @@ -1097,7 +1097,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL jwk_cache .check_jwt( - &RequestMonitoring::test(), + &RequestContext::test(), endpoint.clone(), &role_name, &fetch, @@ -1136,7 +1136,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL let ep = EndpointId::from("ep"); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let err = jwk_cache .check_jwt(&ctx, ep, &role, &fetch, &bad_jwt) .await @@ -1175,7 +1175,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL // this role_name is not accepted let bad_role_name = RoleName::from("cloud_admin"); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let err = jwk_cache .check_jwt(&ctx, ep, &bad_role_name, &fetch, &jwt) .await @@ -1268,7 +1268,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL let ep = EndpointId::from("ep"); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); for test in table { let jwt = new_custom_ec_jwt("1".into(), &key, test.body); @@ -1336,7 +1336,7 @@ X0n5X2/pBLJzxZc62ccvZYVnctBiFs6HbSnxpuMQCfkt/BcR/ttIepBQQIW86wHL jwk_cache .check_jwt( - &RequestMonitoring::test(), + &RequestContext::test(), endpoint.clone(), &role_name, &fetch, diff --git a/proxy/src/auth/backend/local.rs b/proxy/src/auth/backend/local.rs index f9cb085daf..32e0f53615 100644 --- a/proxy/src/auth/backend/local.rs +++ b/proxy/src/auth/backend/local.rs @@ -7,7 +7,7 @@ use super::jwt::{AuthRule, FetchAuthRules}; use crate::auth::backend::jwt::FetchAuthRulesError; use crate::compute::ConnCfg; use crate::compute_ctl::ComputeCtlApi; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}; use crate::control_plane::NodeInfo; use crate::http; @@ -56,7 +56,7 @@ pub static JWKS_ROLE_MAP: ArcSwapOption = ArcSwapOption::c impl FetchAuthRules for StaticAuthRules { async fn fetch_auth_rules( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _endpoint: EndpointId, ) -> Result, FetchAuthRulesError> { let mappings = JWKS_ROLE_MAP.load(); diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index 83c72e7be0..57ecd5e499 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -20,7 +20,7 @@ use crate::auth::credentials::check_peer_addr_is_in_list; use crate::auth::{self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint}; use crate::cache::Cached; use crate::config::AuthenticationConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::client::ControlPlaneClient; use crate::control_plane::errors::GetAuthInfoError; use crate::control_plane::{ @@ -210,7 +210,7 @@ impl RateBucketInfo { impl AuthenticationConfig { pub(crate) fn check_rate_limit( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, secret: AuthSecret, endpoint: &EndpointId, is_cleartext: bool, @@ -265,7 +265,7 @@ impl AuthenticationConfig { /// /// All authentication flows will emit an AuthenticationOk message if successful. async fn auth_quirks( - ctx: &RequestMonitoring, + ctx: &RequestContext, api: &impl control_plane::ControlPlaneApi, user_info: ComputeUserInfoMaybeEndpoint, client: &mut stream::PqStream>, @@ -343,7 +343,7 @@ async fn auth_quirks( } async fn authenticate_with_secret( - ctx: &RequestMonitoring, + ctx: &RequestContext, secret: AuthSecret, info: ComputeUserInfo, client: &mut stream::PqStream>, @@ -396,7 +396,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> { #[tracing::instrument(fields(allow_cleartext = allow_cleartext), skip_all)] pub(crate) async fn authenticate( self, - ctx: &RequestMonitoring, + ctx: &RequestContext, client: &mut stream::PqStream>, allow_cleartext: bool, config: &'static AuthenticationConfig, @@ -436,7 +436,7 @@ impl<'a> Backend<'a, ComputeUserInfoMaybeEndpoint> { impl Backend<'_, ComputeUserInfo> { pub(crate) async fn get_role_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, ) -> Result { match self { Self::ControlPlane(api, user_info) => api.get_role_secret(ctx, user_info).await, @@ -446,7 +446,7 @@ impl Backend<'_, ComputeUserInfo> { pub(crate) async fn get_allowed_ips_and_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { match self { Self::ControlPlane(api, user_info) => { @@ -461,7 +461,7 @@ impl Backend<'_, ComputeUserInfo> { impl ComputeConnectBackend for Backend<'_, ComputeCredentials> { async fn wake_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, ) -> Result { match self { Self::ControlPlane(api, creds) => api.wake_compute(ctx, &creds.info).await, @@ -497,7 +497,7 @@ mod tests { use crate::auth::backend::MaskedIp; use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern}; use crate::config::AuthenticationConfig; - use crate::context::RequestMonitoring; + use crate::context::RequestContext; use crate::control_plane::{self, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret}; use crate::proxy::NeonOptions; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo}; @@ -513,7 +513,7 @@ mod tests { impl control_plane::ControlPlaneApi for Auth { async fn get_role_secret( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _user_info: &super::ComputeUserInfo, ) -> Result { Ok(CachedRoleSecret::new_uncached(Some(self.secret.clone()))) @@ -521,7 +521,7 @@ mod tests { async fn get_allowed_ips_and_secret( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _user_info: &super::ComputeUserInfo, ) -> Result< (CachedAllowedIps, Option), @@ -535,7 +535,7 @@ mod tests { async fn get_endpoint_jwks( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _endpoint: crate::types::EndpointId, ) -> Result, control_plane::errors::GetEndpointJwksError> { @@ -544,7 +544,7 @@ mod tests { async fn wake_compute( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _user_info: &super::ComputeUserInfo, ) -> Result { unimplemented!() @@ -623,7 +623,7 @@ mod tests { let (mut client, server) = tokio::io::duplex(1024); let mut stream = PqStream::new(Stream::from_raw(server)); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let api = Auth { ips: vec![], secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), @@ -700,7 +700,7 @@ mod tests { let (mut client, server) = tokio::io::duplex(1024); let mut stream = PqStream::new(Stream::from_raw(server)); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let api = Auth { ips: vec![], secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), @@ -752,7 +752,7 @@ mod tests { let (mut client, server) = tokio::io::duplex(1024); let mut stream = PqStream::new(Stream::from_raw(server)); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let api = Auth { ips: vec![], secret: AuthSecret::Scram(ServerSecret::build("my-secret-password").await.unwrap()), diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index dab9007400..f6bce9f2d8 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -10,7 +10,7 @@ use thiserror::Error; use tracing::{debug, warn}; use crate::auth::password_hack::parse_endpoint_param; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::error::{ReportableError, UserFacingError}; use crate::metrics::{Metrics, SniKind}; use crate::proxy::NeonOptions; @@ -86,7 +86,7 @@ pub(crate) fn endpoint_sni( impl ComputeUserInfoMaybeEndpoint { pub(crate) fn parse( - ctx: &RequestMonitoring, + ctx: &RequestContext, params: &StartupMessageParams, sni: Option<&str>, common_names: Option<&HashSet>, @@ -260,7 +260,7 @@ mod tests { fn parse_bare_minimum() -> anyhow::Result<()> { // According to postgresql, only `user` should be required. let options = StartupMessageParams::new([("user", "john_doe")]); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.endpoint_id, None); @@ -275,7 +275,7 @@ mod tests { ("database", "world"), // should be ignored ("foo", "bar"), // should be ignored ]); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.endpoint_id, None); @@ -290,7 +290,7 @@ mod tests { let sni = Some("foo.localhost"); let common_names = Some(["localhost".into()].into()); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; assert_eq!(user_info.user, "john_doe"); @@ -307,7 +307,7 @@ mod tests { ("options", "-ckey=1 project=bar -c geqo=off"), ]); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.endpoint_id.as_deref(), Some("bar")); @@ -322,7 +322,7 @@ mod tests { ("options", "-ckey=1 endpoint=bar -c geqo=off"), ]); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; assert_eq!(user_info.user, "john_doe"); assert_eq!(user_info.endpoint_id.as_deref(), Some("bar")); @@ -340,7 +340,7 @@ mod tests { ), ]); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; assert_eq!(user_info.user, "john_doe"); assert!(user_info.endpoint_id.is_none()); @@ -355,7 +355,7 @@ mod tests { ("options", "-ckey=1 endpoint=bar project=foo -c geqo=off"), ]); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, None, None)?; assert_eq!(user_info.user, "john_doe"); assert!(user_info.endpoint_id.is_none()); @@ -370,7 +370,7 @@ mod tests { let sni = Some("baz.localhost"); let common_names = Some(["localhost".into()].into()); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; assert_eq!(user_info.user, "john_doe"); @@ -385,14 +385,14 @@ mod tests { let common_names = Some(["a.com".into(), "b.com".into()].into()); let sni = Some("p1.a.com"); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; assert_eq!(user_info.endpoint_id.as_deref(), Some("p1")); let common_names = Some(["a.com".into(), "b.com".into()].into()); let sni = Some("p1.b.com"); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; assert_eq!(user_info.endpoint_id.as_deref(), Some("p1")); @@ -408,7 +408,7 @@ mod tests { let sni = Some("second.localhost"); let common_names = Some(["localhost".into()].into()); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref()) .expect_err("should fail"); match err { @@ -427,7 +427,7 @@ mod tests { let sni = Some("project.localhost"); let common_names = Some(["example.com".into()].into()); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let err = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref()) .expect_err("should fail"); match err { @@ -447,7 +447,7 @@ mod tests { let sni = Some("project.localhost"); let common_names = Some(["localhost".into()].into()); - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let user_info = ComputeUserInfoMaybeEndpoint::parse(&ctx, &options, sni, common_names.as_ref())?; assert_eq!(user_info.endpoint_id.as_deref(), Some("project")); diff --git a/proxy/src/auth/flow.rs b/proxy/src/auth/flow.rs index 1740b59b14..9c6ce151cb 100644 --- a/proxy/src/auth/flow.rs +++ b/proxy/src/auth/flow.rs @@ -11,7 +11,7 @@ use tracing::info; use super::backend::ComputeCredentialKeys; use super::{AuthError, PasswordHackPayload}; use crate::config::TlsServerEndPoint; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::AuthSecret; use crate::intern::EndpointIdInt; use crate::sasl; @@ -32,7 +32,7 @@ pub(crate) struct Begin; /// Use [SCRAM](crate::scram)-based auth in [`AuthFlow`]. pub(crate) struct Scram<'a>( pub(crate) &'a scram::ServerSecret, - pub(crate) &'a RequestMonitoring, + pub(crate) &'a RequestContext, ); impl AuthMethod for Scram<'_> { diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index ef5b5e8509..623a0fd3b2 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -11,7 +11,7 @@ use futures::future::Either; use futures::TryFutureExt; use itertools::Itertools; use proxy::config::TlsServerEndPoint; -use proxy::context::RequestMonitoring; +use proxy::context::RequestContext; use proxy::metrics::{Metrics, ThreadPoolMetrics}; use proxy::protocol2::ConnectionInfo; use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; @@ -177,7 +177,7 @@ async fn task_main( .context("failed to set socket option")?; info!(%peer_addr, "serving"); - let ctx = RequestMonitoring::new( + let ctx = RequestContext::new( session_id, ConnectionInfo { addr: peer_addr, @@ -208,7 +208,7 @@ async fn task_main( const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; async fn ssl_handshake( - ctx: &RequestMonitoring, + ctx: &RequestContext, raw_stream: S, tls_config: Arc, tls_server_end_point: TlsServerEndPoint, @@ -259,7 +259,7 @@ async fn ssl_handshake( } async fn handle_client( - ctx: RequestMonitoring, + ctx: RequestContext, dest_suffix: Arc, tls_config: Arc, tls_server_end_point: TlsServerEndPoint, diff --git a/proxy/src/cache/endpoints.rs b/proxy/src/cache/endpoints.rs index 07769e053c..20db1fbb14 100644 --- a/proxy/src/cache/endpoints.rs +++ b/proxy/src/cache/endpoints.rs @@ -11,7 +11,7 @@ use tokio_util::sync::CancellationToken; use tracing::info; use crate::config::EndpointCacheConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt}; use crate::metrics::{Metrics, RedisErrors, RedisEventsCount}; use crate::rate_limiter::GlobalRateLimiter; @@ -75,7 +75,7 @@ impl EndpointsCache { } } - pub(crate) fn is_valid(&self, ctx: &RequestMonitoring, endpoint: &EndpointId) -> bool { + pub(crate) fn is_valid(&self, ctx: &RequestContext, endpoint: &EndpointId) -> bool { if !self.ready.load(Ordering::Acquire) { // the endpoint cache is not yet fully initialised. return true; diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index b8876b44eb..e7fbe9ab47 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -18,7 +18,7 @@ use tracing::{debug, error, info, warn}; use crate::auth::parse_endpoint_param; use crate::cancellation::CancelClosure; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::messages::MetricsAuxInfo; @@ -286,7 +286,7 @@ impl ConnCfg { /// Connect to a corresponding compute node. pub(crate) async fn connect( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, allow_self_signed_compute: bool, aux: MetricsAuxInfo, timeout: Duration, diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 8e71f552a5..c88b2936db 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -8,7 +8,7 @@ use tracing::{debug, error, info, Instrument}; use crate::auth::backend::ConsoleRedirectBackend; use crate::cancellation::{CancellationHandlerMain, CancellationHandlerMainInternal}; use crate::config::{ProxyConfig, ProxyProtocolV2}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo}; @@ -82,7 +82,7 @@ pub async fn task_main( } }; - let ctx = RequestMonitoring::new( + let ctx = RequestContext::new( session_id, peer_addr, crate::metrics::Protocol::Tcp, @@ -141,7 +141,7 @@ pub async fn task_main( pub(crate) async fn handle_client( config: &'static ProxyConfig, backend: &'static ConsoleRedirectBackend, - ctx: &RequestMonitoring, + ctx: &RequestContext, cancellation_handler: Arc, stream: S, conn_gauge: NumClientConnectionsGuard<'static>, diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index d057ee0bfd..6d2d2d51ce 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -32,15 +32,15 @@ pub(crate) static LOG_CHAN_DISCONNECT: OnceCell, + TryLock, ); -struct RequestMonitoringInner { +struct RequestContextInner { pub(crate) conn_info: ConnectionInfo, pub(crate) session_id: Uuid, pub(crate) protocol: Protocol, @@ -81,10 +81,10 @@ pub(crate) enum AuthMethod { Cleartext, } -impl Clone for RequestMonitoring { +impl Clone for RequestContext { fn clone(&self) -> Self { let inner = self.0.try_lock().expect("should not deadlock"); - let new = RequestMonitoringInner { + let new = RequestContextInner { conn_info: inner.conn_info.clone(), session_id: inner.session_id, protocol: inner.protocol, @@ -115,7 +115,7 @@ impl Clone for RequestMonitoring { } } -impl RequestMonitoring { +impl RequestContext { pub fn new( session_id: Uuid, conn_info: ConnectionInfo, @@ -132,7 +132,7 @@ impl RequestMonitoring { role = tracing::field::Empty, ); - let inner = RequestMonitoringInner { + let inner = RequestContextInner { conn_info, session_id, protocol, @@ -168,7 +168,7 @@ impl RequestMonitoring { let ip = IpAddr::from([127, 0, 0, 1]); let addr = SocketAddr::new(ip, 5432); let conn_info = ConnectionInfo { addr, extra: None }; - RequestMonitoring::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test") + RequestContext::new(Uuid::now_v7(), conn_info, Protocol::Tcp, "test") } pub(crate) fn console_application_name(&self) -> String { @@ -325,7 +325,7 @@ impl RequestMonitoring { } pub(crate) struct LatencyTimerPause<'a> { - ctx: &'a RequestMonitoring, + ctx: &'a RequestContext, start: tokio::time::Instant, waiting_for: Waiting, } @@ -341,7 +341,7 @@ impl Drop for LatencyTimerPause<'_> { } } -impl RequestMonitoringInner { +impl RequestContextInner { fn set_cold_start_info(&mut self, info: ColdStartInfo) { self.cold_start_info = info; self.latency_timer.cold_start_info(info); @@ -430,7 +430,7 @@ impl RequestMonitoringInner { } } -impl Drop for RequestMonitoringInner { +impl Drop for RequestContextInner { fn drop(&mut self) { if self.sender.is_some() { self.log_connect(); diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 4112de646f..9bf3a275bb 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -20,7 +20,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, info, Span}; use utils::backoff; -use super::{RequestMonitoringInner, LOG_CHAN}; +use super::{RequestContextInner, LOG_CHAN}; use crate::config::remote_storage_from_toml; use crate::context::LOG_CHAN_DISCONNECT; @@ -117,8 +117,8 @@ impl serde::Serialize for Options<'_> { } } -impl From<&RequestMonitoringInner> for RequestData { - fn from(value: &RequestMonitoringInner) -> Self { +impl From<&RequestContextInner> for RequestData { + fn from(value: &RequestContextInner) -> Self { Self { session_id: value.session_id, peer_addr: value.conn_info.addr.ip().to_string(), diff --git a/proxy/src/control_plane/client/mock.rs b/proxy/src/control_plane/client/mock.rs index fd333d2aac..500acad50f 100644 --- a/proxy/src/control_plane/client/mock.rs +++ b/proxy/src/control_plane/client/mock.rs @@ -13,7 +13,7 @@ use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; use crate::cache::Cached; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::client::{CachedAllowedIps, CachedRoleSecret}; use crate::control_plane::errors::{ ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, @@ -206,7 +206,7 @@ impl super::ControlPlaneApi for MockControlPlane { #[tracing::instrument(skip_all)] async fn get_role_secret( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { Ok(CachedRoleSecret::new_uncached( @@ -216,7 +216,7 @@ impl super::ControlPlaneApi for MockControlPlane { async fn get_allowed_ips_and_secret( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { Ok(( @@ -229,7 +229,7 @@ impl super::ControlPlaneApi for MockControlPlane { async fn get_endpoint_jwks( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, endpoint: EndpointId, ) -> Result, GetEndpointJwksError> { self.do_get_endpoint_jwks(endpoint).await @@ -238,7 +238,7 @@ impl super::ControlPlaneApi for MockControlPlane { #[tracing::instrument(skip_all)] async fn wake_compute( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _user_info: &ComputeUserInfo, ) -> Result { self.do_wake_compute().map_ok(Cached::new_uncached).await diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index 50903e2f1e..f8f74372f0 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -15,7 +15,7 @@ use crate::auth::backend::ComputeUserInfo; use crate::cache::endpoints::EndpointsCache; use crate::cache::project_info::ProjectInfoCacheImpl; use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::{ errors, CachedAllowedIps, CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache, }; @@ -41,7 +41,7 @@ pub enum ControlPlaneClient { impl ControlPlaneApi for ControlPlaneClient { async fn get_role_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { match self { @@ -57,7 +57,7 @@ impl ControlPlaneApi for ControlPlaneClient { async fn get_allowed_ips_and_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result<(CachedAllowedIps, Option), errors::GetAuthInfoError> { match self { @@ -71,7 +71,7 @@ impl ControlPlaneApi for ControlPlaneClient { async fn get_endpoint_jwks( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, ) -> Result, errors::GetEndpointJwksError> { match self { @@ -85,7 +85,7 @@ impl ControlPlaneApi for ControlPlaneClient { async fn wake_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { match self { @@ -271,7 +271,7 @@ impl WakeComputePermit { impl FetchAuthRules for ControlPlaneClient { async fn fetch_auth_rules( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, ) -> Result, FetchAuthRulesError> { self.get_endpoint_jwks(ctx, endpoint) diff --git a/proxy/src/control_plane/client/neon.rs b/proxy/src/control_plane/client/neon.rs index 8f4ae13f33..53f9234926 100644 --- a/proxy/src/control_plane/client/neon.rs +++ b/proxy/src/control_plane/client/neon.rs @@ -14,7 +14,7 @@ use super::super::messages::{ControlPlaneErrorMessage, GetRoleSecret, WakeComput use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::ComputeUserInfo; use crate::cache::Cached; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::caches::ApiCaches; use crate::control_plane::errors::{ ControlPlaneError, GetAuthInfoError, GetEndpointJwksError, WakeComputeError, @@ -65,7 +65,7 @@ impl NeonControlPlaneClient { async fn do_get_auth_info( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { if !self @@ -141,7 +141,7 @@ impl NeonControlPlaneClient { async fn do_get_endpoint_jwks( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, ) -> Result, GetEndpointJwksError> { if !self @@ -200,7 +200,7 @@ impl NeonControlPlaneClient { async fn do_wake_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { let request_id = ctx.session_id().to_string(); @@ -263,7 +263,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { #[tracing::instrument(skip_all)] async fn get_role_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { let normalized_ep = &user_info.endpoint.normalize(); @@ -297,7 +297,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { async fn get_allowed_ips_and_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result<(CachedAllowedIps, Option), GetAuthInfoError> { let normalized_ep = &user_info.endpoint.normalize(); @@ -339,7 +339,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { #[tracing::instrument(skip_all)] async fn get_endpoint_jwks( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, ) -> Result, GetEndpointJwksError> { self.do_get_endpoint_jwks(ctx, endpoint).await @@ -348,7 +348,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { #[tracing::instrument(skip_all)] async fn wake_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result { let key = user_info.endpoint_cache_key(); diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 70607ac0d0..41972e4e44 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -17,7 +17,7 @@ use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; use crate::auth::IpPattern; use crate::cache::project_info::ProjectInfoCacheImpl; use crate::cache::{Cached, TimedLru}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::messages::{ControlPlaneErrorMessage, MetricsAuxInfo}; use crate::intern::ProjectIdInt; use crate::types::{EndpointCacheKey, EndpointId}; @@ -75,7 +75,7 @@ pub(crate) struct NodeInfo { impl NodeInfo { pub(crate) async fn connect( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, timeout: Duration, ) -> Result { self.config @@ -116,26 +116,26 @@ pub(crate) trait ControlPlaneApi { /// We still have to mock the scram to avoid leaking information that user doesn't exist. async fn get_role_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result; async fn get_allowed_ips_and_secret( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result<(CachedAllowedIps, Option), errors::GetAuthInfoError>; async fn get_endpoint_jwks( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, endpoint: EndpointId, ) -> Result, errors::GetEndpointJwksError>; /// Wake up the compute node and return the corresponding connection info. async fn wake_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, ) -> Result; } diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 659b7afa68..b30aec09c1 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -7,7 +7,7 @@ use super::retry::ShouldRetryWakeCompute; use crate::auth::backend::ComputeCredentialKeys; use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT}; use crate::config::RetryConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::locks::ApiLocks; use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; @@ -47,7 +47,7 @@ pub(crate) trait ConnectMechanism { type Error: From; async fn connect_once( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, node_info: &control_plane::CachedNodeInfo, timeout: time::Duration, ) -> Result; @@ -59,7 +59,7 @@ pub(crate) trait ConnectMechanism { pub(crate) trait ComputeConnectBackend { async fn wake_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, ) -> Result; fn get_keys(&self) -> &ComputeCredentialKeys; @@ -82,7 +82,7 @@ impl ConnectMechanism for TcpMechanism<'_> { #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] async fn connect_once( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, node_info: &control_plane::CachedNodeInfo, timeout: time::Duration, ) -> Result { @@ -99,7 +99,7 @@ impl ConnectMechanism for TcpMechanism<'_> { /// Try to connect to the compute node, retrying if necessary. #[tracing::instrument(skip_all)] pub(crate) async fn connect_to_compute( - ctx: &RequestMonitoring, + ctx: &RequestContext, mechanism: &M, user_info: &B, allow_self_signed_compute: bool, diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index a67f1b8112..3ada3a9995 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -9,7 +9,7 @@ use tracing::{info, warn}; use crate::auth::endpoint_sni; use crate::config::{TlsConfig, PG_ALPN_PROTOCOL}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::Metrics; use crate::proxy::ERR_INSECURE_CONNECTION; @@ -66,7 +66,7 @@ pub(crate) enum HandshakeData { /// we also take an extra care of propagating only the select handshake errors to client. #[tracing::instrument(skip_all)] pub(crate) async fn handshake( - ctx: &RequestMonitoring, + ctx: &RequestContext, stream: S, mut tls: Option<&TlsConfig>, record_handshake_error: bool, diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 17721c23d5..4be4006d15 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -25,7 +25,7 @@ use self::connect_compute::{connect_to_compute, TcpMechanism}; use self::passthrough::ProxyPassthrough; use crate::cancellation::{self, CancellationHandlerMain, CancellationHandlerMainInternal}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo}; @@ -117,7 +117,7 @@ pub async fn task_main( } }; - let ctx = RequestMonitoring::new( + let ctx = RequestContext::new( session_id, conn_info, crate::metrics::Protocol::Tcp, @@ -247,7 +247,7 @@ impl ReportableError for ClientRequestError { pub(crate) async fn handle_client( config: &'static ProxyConfig, auth_backend: &'static auth::Backend<'static, ()>, - ctx: &RequestMonitoring, + ctx: &RequestContext, cancellation_handler: Arc, stream: S, mode: ClientMode, diff --git a/proxy/src/proxy/tests/mitm.rs b/proxy/src/proxy/tests/mitm.rs index df9f79a7e3..fe211adfeb 100644 --- a/proxy/src/proxy/tests/mitm.rs +++ b/proxy/src/proxy/tests/mitm.rs @@ -36,7 +36,7 @@ async fn proxy_mitm( // begin handshake with end_server let end_server = connect_tls(server2, client_config2.make_tls_connect().unwrap()).await; let (end_client, startup) = match handshake( - &RequestMonitoring::test(), + &RequestContext::test(), client1, Some(&server_config1), false, diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index be821925b5..3de8ca8736 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -162,7 +162,7 @@ impl TestAuth for Scram { stream: &mut PqStream>, ) -> anyhow::Result<()> { let outcome = auth::AuthFlow::new(stream) - .begin(auth::Scram(&self.0, &RequestMonitoring::test())) + .begin(auth::Scram(&self.0, &RequestContext::test())) .await? .authenticate() .await?; @@ -182,11 +182,10 @@ async fn dummy_proxy( auth: impl TestAuth + Send, ) -> anyhow::Result<()> { let (client, _) = read_proxy_protocol(client).await?; - let mut stream = - match handshake(&RequestMonitoring::test(), client, tls.as_ref(), false).await? { - HandshakeData::Startup(stream, _) => stream, - HandshakeData::Cancel(_) => bail!("cancellation not supported"), - }; + let mut stream = match handshake(&RequestContext::test(), client, tls.as_ref(), false).await? { + HandshakeData::Startup(stream, _) => stream, + HandshakeData::Cancel(_) => bail!("cancellation not supported"), + }; auth.authenticate(&mut stream).await?; @@ -466,7 +465,7 @@ impl ConnectMechanism for TestConnectMechanism { async fn connect_once( &self, - _ctx: &RequestMonitoring, + _ctx: &RequestContext, _node_info: &control_plane::CachedNodeInfo, _timeout: std::time::Duration, ) -> Result { @@ -581,7 +580,7 @@ fn helper_create_connect_info( async fn connect_to_compute_success() { let _ = env_logger::try_init(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); let config = RetryConfig { @@ -599,7 +598,7 @@ async fn connect_to_compute_success() { async fn connect_to_compute_retry() { let _ = env_logger::try_init(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); let config = RetryConfig { @@ -618,7 +617,7 @@ async fn connect_to_compute_retry() { async fn connect_to_compute_non_retry_1() { let _ = env_logger::try_init(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Fail]); let user_info = helper_create_connect_info(&mechanism); let config = RetryConfig { @@ -637,7 +636,7 @@ async fn connect_to_compute_non_retry_1() { async fn connect_to_compute_non_retry_2() { let _ = env_logger::try_init(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Fail, Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); let config = RetryConfig { @@ -657,7 +656,7 @@ async fn connect_to_compute_non_retry_3() { let _ = env_logger::try_init(); tokio::time::pause(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Retry, Retry, Retry, Retry, Retry]); let user_info = helper_create_connect_info(&mechanism); @@ -689,7 +688,7 @@ async fn connect_to_compute_non_retry_3() { async fn wake_retry() { let _ = env_logger::try_init(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![WakeRetry, Wake, Connect]); let user_info = helper_create_connect_info(&mechanism); let config = RetryConfig { @@ -708,7 +707,7 @@ async fn wake_retry() { async fn wake_non_retry() { let _ = env_logger::try_init(); use ConnectAction::*; - let ctx = RequestMonitoring::test(); + let ctx = RequestContext::test(); let mechanism = TestConnectMechanism::new(vec![WakeRetry, WakeFail]); let user_info = helper_create_connect_info(&mechanism); let config = RetryConfig { diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index f9f46bb66c..d09e0b1f41 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -2,7 +2,7 @@ use tracing::{error, info, warn}; use super::connect_compute::ComputeConnectBackend; use crate::config::RetryConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::errors::WakeComputeError; use crate::control_plane::CachedNodeInfo; use crate::error::ReportableError; @@ -13,7 +13,7 @@ use crate::proxy::retry::{retry_after, should_retry}; pub(crate) async fn wake_compute( num_retries: &mut u32, - ctx: &RequestMonitoring, + ctx: &RequestContext, api: &B, config: RetryConfig, ) -> Result { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 5e9fd151ae..d9dcf6fbb7 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -23,7 +23,7 @@ use crate::compute_ctl::{ ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest, }; use crate::config::ProxyConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError}; use crate::control_plane::locks::ApiLocks; @@ -48,7 +48,7 @@ pub(crate) struct PoolingBackend { impl PoolingBackend { pub(crate) async fn authenticate_with_password( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, password: &[u8], ) -> Result { @@ -110,7 +110,7 @@ impl PoolingBackend { pub(crate) async fn authenticate_with_jwt( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, user_info: &ComputeUserInfo, jwt: String, ) -> Result { @@ -161,7 +161,7 @@ impl PoolingBackend { #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] pub(crate) async fn connect_to_compute( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: ConnInfo, keys: ComputeCredentials, force_new: bool, @@ -201,7 +201,7 @@ impl PoolingBackend { #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] pub(crate) async fn connect_to_local_proxy( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: ConnInfo, ) -> Result, HttpConnError> { info!("pool: looking for an existing connection"); @@ -249,7 +249,7 @@ impl PoolingBackend { #[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)] pub(crate) async fn connect_to_local_postgres( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: ConnInfo, ) -> Result, HttpConnError> { if let Some(client) = self.local_pool.get(ctx, &conn_info)? { @@ -490,7 +490,7 @@ impl ConnectMechanism for TokioMechanism { async fn connect_once( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, node_info: &CachedNodeInfo, timeout: Duration, ) -> Result { @@ -540,7 +540,7 @@ impl ConnectMechanism for HyperMechanism { async fn connect_once( &self, - ctx: &RequestMonitoring, + ctx: &RequestContext, node_info: &CachedNodeInfo, timeout: Duration, ) -> Result { diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 1845603bf7..07ba1ae9af 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -21,7 +21,7 @@ use { use super::conn_pool_lib::{ Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool, }; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::messages::MetricsAuxInfo; use crate::metrics::Metrics; @@ -53,7 +53,7 @@ impl fmt::Display for ConnInfo { pub(crate) fn poll_client( global_pool: Arc>, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: ConnInfo, client: C, mut connection: tokio_postgres::Connection, diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 61c39c32c9..fe3c422c3b 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -15,7 +15,7 @@ use super::conn_pool::ClientDataRemote; use super::http_conn_pool::ClientDataHttp; use super::local_conn_pool::ClientDataLocal; use crate::auth::backend::ComputeUserInfo; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::types::{DbName, EndpointCacheKey, RoleName}; @@ -380,7 +380,7 @@ impl GlobalConnPool { pub(crate) fn get( self: &Arc, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: &ConnInfo, ) -> Result>, HttpConnError> { let mut client: Option> = None; diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index a1d4473b01..bc86c4b1cd 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -12,7 +12,7 @@ use tracing::{debug, error, info, info_span, Instrument}; use super::backend::HttpConnError; use super::conn_pool_lib::{ClientInnerExt, ConnInfo}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; use crate::types::EndpointCacheKey; @@ -212,7 +212,7 @@ impl GlobalConnPool { #[expect(unused_results)] pub(crate) fn get( self: &Arc, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: &ConnInfo, ) -> Result>, HttpConnError> { let result: Result>, HttpConnError>; @@ -280,7 +280,7 @@ impl GlobalConnPool { pub(crate) fn poll_http2_client( global_pool: Arc>, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: &ConnInfo, client: Send, connection: Connect, diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 99d4329f88..cadcbd7530 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -36,7 +36,7 @@ use super::conn_pool_lib::{ Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, DbUserConn, EndpointConnPool, }; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::Metrics; @@ -88,7 +88,7 @@ impl LocalConnPool { pub(crate) fn get( self: &Arc, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: &ConnInfo, ) -> Result>, HttpConnError> { let client = self @@ -159,7 +159,7 @@ impl LocalConnPool { #[allow(clippy::too_many_arguments)] pub(crate) fn poll_client( global_pool: Arc>, - ctx: &RequestMonitoring, + ctx: &RequestContext, conn_info: ConnInfo, client: C, mut connection: tokio_postgres::Connection, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index cf758855fa..59247f03bf 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -45,7 +45,7 @@ use utils::http::error::ApiError; use crate::cancellation::CancellationHandlerMain; use crate::config::{ProxyConfig, ProxyProtocolV2}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::metrics::Metrics; use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo}; use crate::proxy::run_until_cancelled; @@ -423,7 +423,7 @@ async fn request_handler( if config.http_config.accept_websockets && framed_websockets::upgrade::is_upgrade_request(&request) { - let ctx = RequestMonitoring::new( + let ctx = RequestContext::new( session_id, conn_info, crate::metrics::Protocol::Ws, @@ -458,7 +458,7 @@ async fn request_handler( // Return the response so the spawned future can continue. Ok(response.map(|b| b.map_err(|x| match x {}).boxed())) } else if request.uri().path() == "/sql" && *request.method() == Method::POST { - let ctx = RequestMonitoring::new( + let ctx = RequestContext::new( session_id, conn_info, crate::metrics::Protocol::Http, diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index f0975617d4..36d8595902 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -34,7 +34,7 @@ use super::json::{json_to_pg_text, pg_text_row_to_json, JsonConversionError}; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::metrics::{HttpDirection, Metrics}; use crate::proxy::{run_until_cancelled, NeonOptions}; @@ -133,7 +133,7 @@ impl UserFacingError for ConnInfoError { fn get_conn_info( config: &'static AuthenticationConfig, - ctx: &RequestMonitoring, + ctx: &RequestContext, headers: &HeaderMap, tls: Option<&TlsConfig>, ) -> Result { @@ -240,7 +240,7 @@ fn get_conn_info( pub(crate) async fn handle( config: &'static ProxyConfig, - ctx: RequestMonitoring, + ctx: RequestContext, request: Request, backend: Arc, cancel: CancellationToken, @@ -516,7 +516,7 @@ fn map_isolation_level_to_headers(level: IsolationLevel) -> Option async fn handle_inner( cancel: CancellationToken, config: &'static ProxyConfig, - ctx: &RequestMonitoring, + ctx: &RequestContext, request: Request, backend: Arc, ) -> Result>, SqlOverHttpError> { @@ -562,7 +562,7 @@ async fn handle_inner( async fn handle_db_inner( cancel: CancellationToken, config: &'static ProxyConfig, - ctx: &RequestMonitoring, + ctx: &RequestContext, request: Request, conn_info: ConnInfo, auth: AuthData, @@ -733,7 +733,7 @@ pub(crate) fn uuid_to_header_value(id: Uuid) -> HeaderValue { } async fn handle_auth_broker_inner( - ctx: &RequestMonitoring, + ctx: &RequestContext, request: Request, conn_info: ConnInfo, jwt: String, diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index ba36116c2c..4088fea835 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -14,7 +14,7 @@ use tracing::warn; use crate::cancellation::CancellationHandlerMain; use crate::config::ProxyConfig; -use crate::context::RequestMonitoring; +use crate::context::RequestContext; use crate::error::{io_error, ReportableError}; use crate::metrics::Metrics; use crate::proxy::{handle_client, ClientMode, ErrorSource}; @@ -126,7 +126,7 @@ impl AsyncBufRead for WebSocketRw { pub(crate) async fn serve_websocket( config: &'static ProxyConfig, auth_backend: &'static crate::auth::Backend<'static, ()>, - ctx: RequestMonitoring, + ctx: RequestContext, websocket: OnUpgrade, cancellation_handler: Arc, endpoint_rate_limiter: Arc,