Make a copy of connect_to_compute for pglb

This commit is contained in:
Folke Behrens
2025-06-10 15:01:12 +02:00
parent 6cea6560e9
commit 0957c8ea69
2 changed files with 104 additions and 2 deletions

View File

@@ -21,7 +21,7 @@ pub use crate::pglb::copy_bidirectional::ErrorSource;
use crate::pglb::handshake::{HandshakeData, HandshakeError, handshake};
use crate::pglb::passthrough::ProxyPassthrough;
use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute};
use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute_pglb};
use crate::proxy::{NeonOptions, prepare_client_connection};
use crate::rate_limiter::EndpointRateLimiter;
use crate::stream::Stream;
@@ -347,7 +347,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin + Send>(
let mut auth_info = compute::AuthInfo::with_auth_keys(creds.keys);
auth_info.set_startup_params(&params, params_compat);
let res = connect_to_compute(
let res = connect_to_compute_pglb(
ctx,
&TcpMechanism {
user_info: creds.info.clone(),

View File

@@ -183,3 +183,105 @@ where
drop(pause);
}
}
#[tracing::instrument(skip_all)]
pub(crate) async fn connect_to_compute_pglb<M: ConnectMechanism, B: WakeComputeBackend>(
ctx: &RequestContext,
mechanism: &M,
user_info: &B,
wake_compute_retry_config: RetryConfig,
compute: &ComputeConfig,
) -> Result<M::Connection, M::Error>
where
M::ConnectError: CouldRetry + ShouldRetryWakeCompute + std::fmt::Debug,
M::Error: From<WakeComputeError>,
{
let mut num_retries = 0;
let node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
// try once
let err = match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Ok(res);
}
Err(e) => e,
};
debug!(error = ?err, COULD_NOT_CONNECT);
let node_info = if !node_info.cached() || !err.should_retry_wake_compute() {
// If we just recieved this from cplane and didn't get it from cache, we shouldn't retry.
// Do not need to retrieve a new node_info, just return the old one.
if should_retry(&err, num_retries, compute.retry) {
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Err(err.into());
}
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
debug!("compute node's state has likely changed; requesting a wake-up");
invalidate_cache(node_info);
// TODO: increment num_retries?
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?
};
// now that we have a new node, try connect to it repeatedly.
// this can error for a few reasons, for instance:
// * DNS connection settings haven't quite propagated yet
debug!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
match mechanism.connect_once(ctx, &node_info, compute).await {
Ok(res) => {
ctx.success();
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
// TODO: is this necessary? We have a metric.
info!(?num_retries, "connected to compute node after");
return Ok(res);
}
Err(e) => {
if !should_retry(&e, num_retries, compute.retry) {
// Don't log an error here, caller will print the error
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
return Err(e.into());
}
warn!(error = ?e, num_retries, retriable = true, COULD_NOT_CONNECT);
}
}
let wait_duration = retry_after(num_retries, compute.retry);
num_retries += 1;
let pause = ctx.latency_timer_pause(crate::metrics::Waiting::RetryTimeout);
time::sleep(wait_duration).await;
drop(pause);
}
}