From 41a5b8524a60fdf65fc2590eaa1c928e89dc4bf7 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Tue, 10 Jun 2025 22:21:23 -0700 Subject: [PATCH] remove user_info from connect --- proxy/src/compute/mod.rs | 34 +++++++++++++---------------- proxy/src/console_redirect_proxy.rs | 12 +++++++--- proxy/src/control_plane/mod.rs | 3 +-- proxy/src/proxy/connect_compute.rs | 8 +------ proxy/src/proxy/mod.rs | 14 ++++++++---- 5 files changed, 36 insertions(+), 35 deletions(-) diff --git a/proxy/src/compute/mod.rs b/proxy/src/compute/mod.rs index 5dd264b35e..24d294a762 100644 --- a/proxy/src/compute/mod.rs +++ b/proxy/src/compute/mod.rs @@ -15,9 +15,8 @@ use thiserror::Error; use tokio::net::{TcpStream, lookup_host}; use tracing::{debug, error, info, warn}; -use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; +use crate::auth::backend::ComputeCredentialKeys; use crate::auth::parse_endpoint_param; -use crate::cancellation::CancelClosure; use crate::compute::tls::TlsError; use crate::config::ComputeConfig; use crate::context::RequestContext; @@ -273,8 +272,11 @@ pub(crate) struct PostgresConnection { pub(crate) stream: MaybeTlsStream, /// PostgreSQL connection parameters. pub(crate) params: std::collections::HashMap, - /// Query cancellation token. - pub(crate) cancel_closure: CancelClosure, + + pub socket_addr: SocketAddr, + pub cancel_token: RawCancelToken, + pub hostname: String, + /// Labels for proxy's metrics. pub(crate) aux: MetricsAuxInfo, /// Notices received from compute after authenticating @@ -291,7 +293,6 @@ impl ConnectInfo { aux: MetricsAuxInfo, auth: &AuthInfo, config: &ComputeConfig, - user_info: ComputeUserInfo, ) -> Result { let mut tmp_config = auth.enrich(self.to_postgres_client_config()); // we setup SSL early in `ConnectInfo::connect_raw`. @@ -324,24 +325,19 @@ impl ConnectInfo { ctx.get_testodrome_id().unwrap_or_default(), ); - // NB: CancelToken is supposed to hold socket_addr, but we use connect_raw. - // Yet another reason to rework the connection establishing code. - let cancel_closure = CancelClosure::new( - socket_addr, - RawCancelToken { - ssl_mode: self.ssl_mode, - process_id, - secret_key, - }, - self.host.to_string(), - user_info, - ); - let connection = PostgresConnection { stream, params: parameters, delayed_notice, - cancel_closure, + + socket_addr, + cancel_token: RawCancelToken { + ssl_mode: self.ssl_mode, + process_id, + secret_key, + }, + hostname: self.host.to_string(), + aux, guage: Metrics::get().proxy.db_connections.guard(ctx.protocol()), }; diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 89adfc9049..863156296d 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -6,7 +6,7 @@ use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info}; use crate::auth::backend::ConsoleRedirectBackend; -use crate::cancellation::CancellationHandler; +use crate::cancellation::{CancelClosure, CancellationHandler}; use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::error::ReportableError; @@ -221,7 +221,6 @@ pub(crate) async fn handle_client( let node = connect_to_compute( ctx, &TcpMechanism { - user_info, auth: auth_info, locks: &config.connect_compute_locks, }, @@ -240,11 +239,18 @@ pub(crate) async fn handle_client( let session_id = ctx.session_id(); let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); tokio::spawn(async move { + let cancel_closure = CancelClosure::new( + node.socket_addr, + node.cancel_token, + node.hostname, + user_info, + ); + session .maintain_cancel_key( session_id, cancel, - &node.cancel_closure, + &cancel_closure, &config.connect_to_compute, ) .await; diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index ed83e98bfe..d80caee9f3 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -78,10 +78,9 @@ impl NodeInfo { ctx: &RequestContext, auth: &compute::AuthInfo, config: &ComputeConfig, - user_info: ComputeUserInfo, ) -> Result { self.conn_info - .connect(ctx, self.aux.clone(), auth, config, user_info) + .connect(ctx, self.aux.clone(), auth, config) .await } } diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 92ed84f50f..39d789eda8 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use tokio::time; use tracing::{debug, info, warn}; -use crate::auth::backend::ComputeUserInfo; use crate::compute::{self, AuthInfo, COULD_NOT_CONNECT, PostgresConnection}; use crate::config::{ComputeConfig, RetryConfig}; use crate::context::RequestContext; @@ -53,7 +52,6 @@ pub(crate) struct TcpMechanism { pub(crate) auth: AuthInfo, /// connect_to_compute concurrency lock pub(crate) locks: &'static ApiLocks, - pub(crate) user_info: ComputeUserInfo, } #[async_trait] @@ -73,11 +71,7 @@ impl ConnectMechanism for TcpMechanism { config: &ComputeConfig, ) -> Result { let permit = self.locks.get_permit(&node_info.conn_info.host).await?; - permit.release_result( - node_info - .connect(ctx, &self.auth, config, self.user_info.clone()) - .await, - ) + permit.release_result(node_info.connect(ctx, &self.auth, config).await) } } diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 6c332b0829..8aa398b20f 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -18,7 +18,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, warn}; -use crate::cancellation::{self, CancellationHandler}; +use crate::cancellation::{self, CancelClosure, CancellationHandler}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; use crate::context::RequestContext; use crate::error::{ReportableError, UserFacingError}; @@ -357,11 +357,10 @@ pub(crate) async fn handle_client( let res = connect_to_compute( ctx, &TcpMechanism { - user_info: creds.info.clone(), auth: auth_info, locks: &config.connect_compute_locks, }, - &auth::Backend::ControlPlane(cplane, creds.info), + &auth::Backend::ControlPlane(cplane, creds.info.clone()), config.wake_compute_retry_config, &config.connect_to_compute, ) @@ -380,11 +379,18 @@ pub(crate) async fn handle_client( let session_id = ctx.session_id(); let (cancel_on_shutdown, cancel) = tokio::sync::oneshot::channel(); tokio::spawn(async move { + let cancel_closure = CancelClosure::new( + node.socket_addr, + node.cancel_token, + node.hostname, + creds.info, + ); + session .maintain_cancel_key( session_id, cancel, - &node.cancel_closure, + &cancel_closure, &config.connect_to_compute, ) .await;