Move connect_to_compute back into proxy mod

Most of its code is responsible for waking, retrying, checking cache
and updating metrics. All not part of pglb.
This commit is contained in:
Folke Behrens
2025-06-06 19:55:00 +02:00
parent f8d530d031
commit 68550d3040
4 changed files with 125 additions and 120 deletions

View File

@@ -11,11 +11,13 @@ use crate::config::{ProxyConfig, ProxyProtocolV2};
use crate::context::RequestContext;
use crate::error::ReportableError;
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
use crate::pglb::connect_compute::TcpMechanism;
use crate::pglb::handshake::{HandshakeData, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol};
use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection};
use crate::proxy::{
ClientRequestError, ErrorSource, connect_to_compute, prepare_client_connection,
};
use crate::util::run_until_cancelled;
pub async fn task_main(

View File

@@ -1,20 +1,14 @@
use async_trait::async_trait;
use tokio::time;
use tracing::{debug, info, warn};
use tracing::warn;
use crate::auth::backend::ComputeUserInfo;
use crate::compute::{self, AuthInfo, COULD_NOT_CONNECT, PostgresConnection};
use crate::config::{ComputeConfig, RetryConfig};
use crate::compute::{self, AuthInfo, PostgresConnection};
use crate::config::ComputeConfig;
use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError;
use crate::control_plane::locks::ApiLocks;
use crate::control_plane::{self, CachedNodeInfo, NodeInfo};
use crate::error::ReportableError;
use crate::metrics::{
ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType,
};
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
use crate::proxy::wake_compute::wake_compute;
use crate::metrics::{ConnectionFailureKind, Metrics};
use crate::types::Host;
/// If we couldn't connect, a cached connection info might be to blame
@@ -88,106 +82,3 @@ impl ConnectMechanism for TcpMechanism {
)
}
}
/// Try to connect to the compute node, retrying if necessary.
#[tracing::instrument(skip_all)]
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
ctx: &RequestContext,
mechanism: &M,
user_info: &B,
wake_compute_retry_config: RetryConfig,
compute: &ComputeConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
let node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
// try once
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Ok(res);
}
Err(e) => e,
};
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
debug!("compute node's state has likely changed; requesting a wake-up");
invalidate_cache(node_info);
// TODO: increment num_retries?
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?
};
// now that we have a new node, try connect to it repeatedly.
// this can error for a few reasons, for instance:
// * DNS connection settings haven't quite propagated yet
debug!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
// TODO: is this necessary? We have a metric.
info!(?num_retries, "connected to compute node after");
return Ok(res);
}
Err(e) => {
if !should_retry(&e, num_retries, compute.retry) {
// Don't log an error here, caller will print the error
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Err(e.into());
}
warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
}
}
let wait_duration = retry_after(num_retries, compute.retry);
num_retries += 1;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
time::sleep(wait_duration).await;
drop(pause);
}
}

View File

@@ -14,20 +14,29 @@ use serde::{Deserialize, Serialize};
use smol_str::{SmolStr, ToSmolStr, format_smolstr};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, error, info, warn};
use crate::cancellation::{self, CancellationHandler};
use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig};
use crate::compute::COULD_NOT_CONNECT;
use crate::config::{ComputeConfig, ProxyConfig, ProxyProtocolV2, RetryConfig, TlsConfig};
use crate::context::RequestContext;
use crate::control_plane::errors::WakeComputeError;
use crate::error::{ReportableError, UserFacingError};
use crate::metrics::{Metrics, NumClientConnectionsGuard};
use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute};
use crate::metrics::{
ConnectOutcome, Metrics, NumClientConnectionsGuard, RetriesMetricGroup, RetryType,
};
use crate::pglb::connect_compute::{
ComputeConnectBackend, ConnectMechanism, TcpMechanism, invalidate_cache,
};
pub use crate::pglb::copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute};
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::pqproto::{BeMessage, CancelKeyData, StartupMessageParams};
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::retry::{CouldRetry, ShouldRetryWakeCompute, retry_after, should_retry};
use crate::proxy::wake_compute::wake_compute;
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::{PqStream, Stream};
use crate::types::EndpointCacheKey;
@@ -396,6 +405,109 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
}))
}
/// Try to connect to the compute node, retrying if necessary.
#[tracing::instrument(skip_all)]
pub(crate) async fn connect_to_compute<M: ConnectMechanism, B: ComputeConnectBackend>(
ctx: &RequestContext,
mechanism: &M,
user_info: &B,
wake_compute_retry_config: RetryConfig,
compute: &ComputeConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
let node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
// try once
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Ok(res);
}
Err(e) => e,
};
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
debug!("compute node's state has likely changed; requesting a wake-up");
invalidate_cache(node_info);
// TODO: increment num_retries?
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?
};
// now that we have a new node, try connect to it repeatedly.
// this can error for a few reasons, for instance:
// * DNS connection settings haven't quite propagated yet
debug!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
// TODO: is this necessary? We have a metric.
info!(?num_retries, "connected to compute node after");
return Ok(res);
}
Err(e) => {
if !should_retry(&e, num_retries, compute.retry) {
// Don't log an error here, caller will print the error
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Err(e.into());
}
warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
}
}
let wait_duration = retry_after(num_retries, compute.retry);
num_retries += 1;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
time::sleep(wait_duration).await;
drop(pause);
}
}
/// Finish client connection initialization: confirm auth success, send params, etc.
pub(crate) fn prepare_client_connection(
node: &compute::PostgresConnection,

View File

@@ -181,7 +181,7 @@ impl PoolingBackend {
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| keys);
crate::pglb::connect_compute::connect_to_compute(
crate::proxy::connect_to_compute(
ctx,
&TokioMechanism {
conn_id,
@@ -225,7 +225,7 @@ impl PoolingBackend {
},
keys: crate::auth::backend::ComputeCredentialKeys::None,
});
crate::pglb::connect_compute::connect_to_compute(
crate::proxy::connect_to_compute(
ctx,
&HyperMechanism {
conn_id,