From 735c66dc65f1163591a2745934f4be766072c88c Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Fri, 10 Jan 2025 09:36:51 +0000 Subject: [PATCH] fix(proxy): propagate the existing ComputeUserInfo to connect for cancellation (#10322) ## Problem We were incorrectly constructing the ComputeUserInfo, used for cancellation checks, based on the return parameters from postgres. This didn't contain the correct info. ## Summary of changes Propagate down the existing ComputeUserInfo. --- proxy/src/auth/backend/console_redirect.rs | 30 +++++++++++++++++----- proxy/src/compute.rs | 27 ++----------------- proxy/src/console_redirect_proxy.rs | 5 ++-- proxy/src/control_plane/mod.rs | 5 +++- proxy/src/proxy/connect_compute.rs | 6 +++-- proxy/src/proxy/mod.rs | 13 ++++++---- 6 files changed, 45 insertions(+), 41 deletions(-) diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index dbfda588cc..1cbf91d3ae 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -1,7 +1,8 @@ +use std::fmt; + use async_trait::async_trait; use postgres_client::config::SslMode; use pq_proto::BeMessage as Be; -use std::fmt; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{info, info_span}; @@ -12,10 +13,13 @@ use crate::auth::IpPattern; use crate::cache::Cached; use crate::config::AuthenticationConfig; use crate::context::RequestContext; -use crate::control_plane::{self, client::cplane_proxy_v1, CachedNodeInfo, NodeInfo}; +use crate::control_plane::client::cplane_proxy_v1; +use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::error::{ReportableError, UserFacingError}; use crate::proxy::connect_compute::ComputeConnectBackend; +use crate::proxy::NeonOptions; use crate::stream::PqStream; +use crate::types::RoleName; use crate::{auth, compute, waiters}; #[derive(Debug, Error)] @@ -105,10 +109,16 @@ impl ConsoleRedirectBackend { ctx: &RequestContext, auth_config: &'static AuthenticationConfig, client: &mut PqStream, - ) -> auth::Result<(ConsoleRedirectNodeInfo, Option>)> { + ) -> auth::Result<( + ConsoleRedirectNodeInfo, + ComputeUserInfo, + Option>, + )> { authenticate(ctx, auth_config, &self.console_uri, client) .await - .map(|(node_info, ip_allowlist)| (ConsoleRedirectNodeInfo(node_info), ip_allowlist)) + .map(|(node_info, user_info, ip_allowlist)| { + (ConsoleRedirectNodeInfo(node_info), user_info, ip_allowlist) + }) } } @@ -133,7 +143,7 @@ async fn authenticate( auth_config: &'static AuthenticationConfig, link_uri: &reqwest::Url, client: &mut PqStream, -) -> auth::Result<(NodeInfo, Option>)> { +) -> auth::Result<(NodeInfo, ComputeUserInfo, Option>)> { ctx.set_auth_method(crate::context::AuthMethod::ConsoleRedirect); // registering waiter can fail if we get unlucky with rng. @@ -188,8 +198,15 @@ async fn authenticate( let mut config = compute::ConnCfg::new(db_info.host.to_string(), db_info.port); config.dbname(&db_info.dbname).user(&db_info.user); + let user: RoleName = db_info.user.into(); + let user_info = ComputeUserInfo { + endpoint: db_info.aux.endpoint_id.as_str().into(), + user: user.clone(), + options: NeonOptions::default(), + }; + ctx.set_dbname(db_info.dbname.into()); - ctx.set_user(db_info.user.into()); + ctx.set_user(user); ctx.set_project(db_info.aux.clone()); info!("woken up a compute node"); @@ -212,6 +229,7 @@ async fn authenticate( config, aux: db_info.aux, }, + user_info, db_info.allowed_ips, )) } diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index 788bd63fee..aff796bbab 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -24,10 +24,8 @@ use crate::control_plane::messages::MetricsAuxInfo; use crate::error::{ReportableError, UserFacingError}; use crate::metrics::{Metrics, NumDbConnectionsGuard}; use crate::proxy::neon_option; -use crate::proxy::NeonOptions; use crate::tls::postgres_rustls::MakeRustlsConnect; use crate::types::Host; -use crate::types::{EndpointId, RoleName}; pub const COULD_NOT_CONNECT: &str = "Couldn't connect to compute node"; @@ -253,6 +251,7 @@ impl ConnCfg { ctx: &RequestContext, aux: MetricsAuxInfo, config: &ComputeConfig, + user_info: ComputeUserInfo, ) -> Result { let pause = ctx.latency_timer_pause(crate::metrics::Waiting::Compute); let (socket_addr, stream, host) = self.connect_raw(config.timeout).await?; @@ -287,28 +286,6 @@ impl ConnCfg { self.0.get_ssl_mode() ); - let compute_info = match parameters.get("user") { - Some(user) => { - match parameters.get("database") { - Some(database) => { - ComputeUserInfo { - user: RoleName::from(user), - options: NeonOptions::default(), // just a shim, we don't need options - endpoint: EndpointId::from(database), - } - } - None => { - warn!("compute node didn't return database name"); - ComputeUserInfo::default() - } - } - } - None => { - warn!("compute node didn't return user name"); - ComputeUserInfo::default() - } - }; - // NB: CancelToken is supposed to hold socket_addr, but we use connect_raw. // Yet another reason to rework the connection establishing code. let cancel_closure = CancelClosure::new( @@ -321,7 +298,7 @@ impl ConnCfg { }, vec![], // TODO: deprecated, will be removed host.to_string(), - compute_info, + user_info, ); let connection = PostgresConnection { diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 846f55f9e1..0c6755063f 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -195,7 +195,7 @@ pub(crate) async fn handle_client( ctx.set_db_options(params.clone()); - let (user_info, ip_allowlist) = match backend + let (node_info, user_info, ip_allowlist) = match backend .authenticate(ctx, &config.authentication_config, &mut stream) .await { @@ -208,11 +208,12 @@ pub(crate) async fn handle_client( let mut node = connect_to_compute( ctx, &TcpMechanism { + user_info, params_compat: true, params: ¶ms, locks: &config.connect_compute_locks, }, - &user_info, + &node_info, config.wake_compute_retry_config, &config.connect_to_compute, ) diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index c65041df0e..1dca26d686 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -74,8 +74,11 @@ impl NodeInfo { &self, ctx: &RequestContext, config: &ComputeConfig, + user_info: ComputeUserInfo, ) -> Result { - self.config.connect(ctx, self.aux.clone(), config).await + self.config + .connect(ctx, self.aux.clone(), config, user_info) + .await } pub(crate) fn reuse_settings(&mut self, other: Self) { diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 8a80494860..dd145e6bb2 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -4,7 +4,7 @@ use tokio::time; use tracing::{debug, info, warn}; use super::retry::ShouldRetryWakeCompute; -use crate::auth::backend::ComputeCredentialKeys; +use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT}; use crate::config::{ComputeConfig, RetryConfig}; use crate::context::RequestContext; @@ -71,6 +71,8 @@ pub(crate) struct TcpMechanism<'a> { /// connect_to_compute concurrency lock pub(crate) locks: &'static ApiLocks, + + pub(crate) user_info: ComputeUserInfo, } #[async_trait] @@ -88,7 +90,7 @@ impl ConnectMechanism for TcpMechanism<'_> { ) -> Result { let host = node_info.config.get_host(); let permit = self.locks.get_permit(&host).await?; - permit.release_result(node_info.connect(ctx, config).await) + permit.release_result(node_info.connect(ctx, config, self.user_info.clone()).await) } fn update_connect_config(&self, config: &mut compute::ConnCfg) { diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 1f7dba2f9a..63f93f0a91 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -332,16 +332,19 @@ pub(crate) async fn handle_client( } }; - let params_compat = match &user_info { - auth::Backend::ControlPlane(_, info) => { - info.info.options.get(NeonOptions::PARAMS_COMPAT).is_some() - } - auth::Backend::Local(_) => false, + let compute_user_info = match &user_info { + auth::Backend::ControlPlane(_, info) => &info.info, + auth::Backend::Local(_) => unreachable!("local proxy does not run tcp proxy service"), }; + let params_compat = compute_user_info + .options + .get(NeonOptions::PARAMS_COMPAT) + .is_some(); let mut node = connect_to_compute( ctx, &TcpMechanism { + user_info: compute_user_info.clone(), params_compat, params: ¶ms, locks: &config.connect_compute_locks,