diff --git a/proxy/src/pglb/mod.rs b/proxy/src/pglb/mod.rs index 9967562d85..ef652972f1 100644 --- a/proxy/src/pglb/mod.rs +++ b/proxy/src/pglb/mod.rs @@ -21,7 +21,7 @@ pub use crate::pglb::copy_bidirectional::ErrorSource; use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake}; use crate::pglb::passthrough::ProxyPassthrough; use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; -use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute_pglb}; use crate::proxy::{NeonOptions, prepare_client_connection}; use crate::rate_limiter::EndpointRateLimiter; use crate::stream::Stream; @@ -347,7 +347,7 @@ pub(crate) async fn handle_client( let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys); auth_info.set_startup_params(¶ms, params_compat); - let res = connect_to_compute( + let res = connect_to_compute_pglb( ctx, &TcpMechanism { user_info: creds.info.clone(), diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 92ed84f50f..80387fe9a5 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -183,3 +183,105 @@ where drop(pause); } } + +#[tracing::instrument(skip_all)] +pub(crate) async fn connect_to_compute_pglb( + ctx: &RequestContext, + mechanism: &M, + user_info: &B, + wake_compute_retry_config: RetryConfig, + compute: &ComputeConfig, +) -> Result +where + M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug, + M::Error: From, +{ + let mut num_retries = 0; + let node_info = + wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; + + // try once + let err = match mechanism.connect_once(ctx, &node_info, compute).await { + Ok(res) => { + ctx.success(); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + return Ok(res); + } + Err(e) => e, + }; + + debug!(error = ?err, COULD_NOT_CONNECT); + + let node_info = if !node_info.cached() || !err.should_retry_wake_compute() { + // If we just recieved this from cplane and didn't get it from cache, we shouldn't retry. + // Do not need to retrieve a new node_info, just return the old one. + if should_retry(&err, num_retries, compute.retry) { + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Failed, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + return Err(err.into()); + } + node_info + } else { + // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node + debug!("compute node's state has likely changed; requesting a wake-up"); + invalidate_cache(node_info); + // TODO: increment num_retries? + wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await? + }; + + // now that we have a new node, try connect to it repeatedly. + // this can error for a few reasons, for instance: + // * DNS connection settings haven't quite propagated yet + debug!("wake_compute success. attempting to connect"); + num_retries = 1; + loop { + match mechanism.connect_once(ctx, &node_info, compute).await { + Ok(res) => { + ctx.success(); + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Success, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + // TODO: is this necessary? We have a metric. + info!(?num_retries, "connected to compute node after"); + return Ok(res); + } + Err(e) => { + if !should_retry(&e, num_retries, compute.retry) { + // Don't log an error here, caller will print the error + Metrics::get().proxy.retries_metric.observe( + RetriesMetricGroup { + outcome: ConnectOutcome::Failed, + retry_type: RetryType::ConnectToCompute, + }, + num_retries.into(), + ); + return Err(e.into()); + } + + warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT); + } + } + + let wait_duration = retry_after(num_retries, compute.retry); + num_retries += 1; + + let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout); + time::sleep(wait_duration).await; + drop(pause); + } +}