From 68550d3040dcf02479f1603378c02eb25a834a46 Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Fri, 6 Jun 2025 19:55:00 +0200 Subject: [PATCH] Move connect_to_compute back into proxy mod Most of its code is responsible for waking, retrying, checking cache and updating metrics. All not part of pglb. --- proxy/src/console_redirect_proxy.rs | 6 +- proxy/src/pglb/connect_compute.rs | 117 +-------------------------- proxy/src/proxy/mod.rs | 118 +++++++++++++++++++++++++++- proxy/src/serverless/backend.rs | 4 +- 4 files changed, 125 insertions(+), 120 deletions(-) diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 89641019e0..8ea24fbffb 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -11,11 +11,13 @@ use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; -use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::pglb::connect_compute::TcpMechanism; use crate::pglb::handshake::{HandshakeData, handshake}; use crate::pglb::passthrough::ProxyPassthrough; use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; -use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection}; +use crate::proxy::{ + ClientRequestError, ErrorSource, connect_to_compute, prepare_client_connection, +}; use crate::util::run_until_cancelled; pub async fn task_main( diff --git a/proxy/src/pglb/connect_compute.rs b/proxy/src/pglb/connect_compute.rs index 1807cdff0e..f6942f500d 100644 --- a/proxy/src/pglb/connect_compute.rs +++ b/proxy/src/pglb/connect_compute.rs @@ -1,20 +1,14 @@ use async_trait::async_trait; -use tokio::time; -use tracing::{debug, info, warn}; +use tracing::warn; use crate::auth::backend::ComputeUserInfo; -use crate::compute::{self, AuthInfo, COULD_NOT_CONNECT, PostgresConnection}; -use crate::config::{ComputeConfig, RetryConfig}; +use crate::compute::{self, AuthInfo, PostgresConnection}; +use crate::config::ComputeConfig; use crate::context::RequestContext; -use crate::control_plane::errors::WakeComputeError; use crate::control_plane::locks::ApiLocks; use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::error::ReportableError; -use crate::metrics::{ - ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType, -}; -use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry}; -use crate::proxy::wake_compute::wake_compute; +use crate::metrics::{ConnectionFailureKind, Metrics}; use crate::types::Host; /// If we couldn't connect, a cached connection info might be to blame @@ -88,106 +82,3 @@ impl ConnectMechanism for TcpMechanism { ) } } - -/// Try to connect to the compute node, retrying if necessary. -#[tracing::instrument(skip_all)] -pub(crate) async fn connect_to_compute( - ctx: &RequestContext, - mechanism: &M, - user_info: &B, - wake_compute_retry_config: RetryConfig, - compute: &ComputeConfig, -) -> Result -where - M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug, - M::Error: From, -{ - let mut num_retries = 0; - let node_info = - wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; - - // try once - let err = match mechanism.connect_once(ctx, &node_info, compute).await { - Ok(res) => { - ctx.success(); - Metrics::get().proxy.retries_metric.observe( - RetriesMetricGroup { - outcome: ConnectOutcome::Success, - retry_type: RetryType::ConnectToCompute, - }, - num_retries.into(), - ); - return Ok(res); - } - Err(e) => e, - }; - - debug!(error = ?err, COULD_NOT_CONNECT); - - let node_info = if !node_info.cached() || !err.should_retry_wake_compute() { - // If we just recieved this from cplane and didn't get it from cache, we shouldn't retry. - // Do not need to retrieve a new node_info, just return the old one. - if should_retry(&err, num_retries, compute.retry) { - Metrics::get().proxy.retries_metric.observe( - RetriesMetricGroup { - outcome: ConnectOutcome::Failed, - retry_type: RetryType::ConnectToCompute, - }, - num_retries.into(), - ); - return Err(err.into()); - } - node_info - } else { - // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node - debug!("compute node's state has likely changed; requesting a wake-up"); - invalidate_cache(node_info); - // TODO: increment num_retries? - wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await? - }; - - // now that we have a new node, try connect to it repeatedly. - // this can error for a few reasons, for instance: - // * DNS connection settings haven't quite propagated yet - debug!("wake_compute success. attempting to connect"); - num_retries = 1; - loop { - match mechanism.connect_once(ctx, &node_info, compute).await { - Ok(res) => { - ctx.success(); - Metrics::get().proxy.retries_metric.observe( - RetriesMetricGroup { - outcome: ConnectOutcome::Success, - retry_type: RetryType::ConnectToCompute, - }, - num_retries.into(), - ); - // TODO: is this necessary? We have a metric. - info!(?num_retries, "connected to compute node after"); - return Ok(res); - } - Err(e) => { - if !should_retry(&e, num_retries, compute.retry) { - // Don't log an error here, caller will print the error - Metrics::get().proxy.retries_metric.observe( - RetriesMetricGroup { - outcome: ConnectOutcome::Failed, - retry_type: RetryType::ConnectToCompute, - }, - num_retries.into(), - ); - return Err(e.into()); - } - - warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT); - } - } - - let wait_duration = retry_after(num_retries, compute.retry); - num_retries += 1; - - let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout); - time::sleep(wait_duration).await; - drop(pause); - } -} diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 23bb873b2f..5faf0d3426 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -14,20 +14,29 @@ use serde::{Deserialize, Serialize}; use smol_str::{SmolStr, ToSmolStr, format_smolstr}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::time; use tokio_util::sync::CancellationToken; use tracing::{Instrument, debug, error, info, warn}; use crate::cancellation::{self, CancellationHandler}; -use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; +use crate::compute::COULD_NOT_CONNECT; +use crate::config::{ComputeConfig, ProxyConfig, ProxyProtocolV2, RetryConfig, TlsConfig}; use crate::context::RequestContext; +use crate::control_plane::errors::WakeComputeError; use crate::error::{ReportableError, UserFacingError}; -use crate::metrics::{Metrics, NumClientConnectionsGuard}; -use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::metrics::{ + ConnectOutcome, Metrics, NumClientConnectionsGuard, RetriesMetricGroup, RetryType, +}; +use crate::pglb::connect_compute::{ + ComputeConnectBackend, ConnectMechanism, TcpMechanism, invalidate_cache, +}; pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute}; use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake}; use crate::pglb::passthrough::ProxyPassthrough; use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams}; use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; +use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry}; +use crate::proxy::wake_compute::wake_compute; use crate::rate_limiter::EndpointRateLimiter; use crate::stream::{PqStream, Stream}; use crate::types::EndpointCacheKey; @@ -396,6 +405,109 @@ pub(crate) async fn handle_client( })) } +/// Try to connect to the compute node, retrying if necessary. +#[tracing::instrument(skip_all)] +pub(crate) async fn connect_to_compute( + ctx: &RequestContext, + mechanism: &M, + user_info: &B, + wake_compute_retry_config: RetryConfig, + compute: &ComputeConfig, +) -> Result +where + M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug, + M::Error: From, +{ + let mut num_retries = 0; + let node_info = + wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; + + // try once + let err = match mechanism.connect_once(ctx, &node_info, compute).await { + Ok(res) => { + ctx.success(); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + return Ok(res); + } + Err(e) => e, + }; + + debug!(error = ?err, COULD_NOT_CONNECT); + + let node_info = if !node_info.cached() || !err.should_retry_wake_compute() { + // If we just recieved this from cplane and didn't get it from cache, we shouldn't retry. + // Do not need to retrieve a new node_info, just return the old one. + if should_retry(&err, num_retries, compute.retry) { + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Failed, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + return Err(err.into()); + } + node_info + } else { + // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node + debug!("compute node's state has likely changed; requesting a wake-up"); + invalidate_cache(node_info); + // TODO: increment num_retries? + wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await? + }; + + // now that we have a new node, try connect to it repeatedly. + // this can error for a few reasons, for instance: + // * DNS connection settings haven't quite propagated yet + debug!("wake_compute success. attempting to connect"); + num_retries = 1; + loop { + match mechanism.connect_once(ctx, &node_info, compute).await { + Ok(res) => { + ctx.success(); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + // TODO: is this necessary? We have a metric. + info!(?num_retries, "connected to compute node after"); + return Ok(res); + } + Err(e) => { + if !should_retry(&e, num_retries, compute.retry) { + // Don't log an error here, caller will print the error + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Failed, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + return Err(e.into()); + } + + warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT); + } + } + + let wait_duration = retry_after(num_retries, compute.retry); + num_retries += 1; + + let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout); + time::sleep(wait_duration).await; + drop(pause); + } +} + /// Finish client connection initialization: confirm auth success, send params, etc. pub(crate) fn prepare_client_connection( node: &compute::PostgresConnection, diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index a0e782dab0..0b991a652d 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -181,7 +181,7 @@ impl PoolingBackend { tracing::Span::current().record("conn_id", display(conn_id)); info!(%conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| keys); - crate::pglb::connect_compute::connect_to_compute( + crate::proxy::connect_to_compute( ctx, &TokioMechanism { conn_id, @@ -225,7 +225,7 @@ impl PoolingBackend { }, keys: crate::auth::backend::ComputeCredentialKeys::None, }); - crate::pglb::connect_compute::connect_to_compute( + crate::proxy::connect_to_compute( ctx, &HyperMechanism { conn_id,