mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
proxy refactor large files (#6153)
## Problem The `src/proxy.rs` file is far too large ## Summary of changes Creates 3 new files: ``` src/metrics.rs src/proxy/retry.rs src/proxy/connect_compute.rs ```
This commit is contained in:
@@ -11,7 +11,8 @@ use crate::auth::validate_password_and_exchange;
|
||||
use crate::console::errors::GetAuthInfoError;
|
||||
use crate::console::provider::AuthInfo;
|
||||
use crate::console::AuthSecret;
|
||||
use crate::proxy::{handle_try_wake, retry_after, LatencyTimer};
|
||||
use crate::proxy::connect_compute::handle_try_wake;
|
||||
use crate::proxy::retry::retry_after;
|
||||
use crate::scram;
|
||||
use crate::stream::Stream;
|
||||
use crate::{
|
||||
@@ -22,6 +23,7 @@ use crate::{
|
||||
provider::{CachedNodeInfo, ConsoleReqExtra},
|
||||
Api,
|
||||
},
|
||||
metrics::LatencyTimer,
|
||||
stream, url,
|
||||
};
|
||||
use futures::TryFutureExt;
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
compute,
|
||||
config::AuthenticationConfig,
|
||||
console::AuthSecret,
|
||||
proxy::LatencyTimer,
|
||||
metrics::LatencyTimer,
|
||||
sasl,
|
||||
stream::{PqStream, Stream},
|
||||
};
|
||||
|
||||
@@ -4,7 +4,7 @@ use super::{
|
||||
use crate::{
|
||||
auth::{self, AuthFlow},
|
||||
console::AuthSecret,
|
||||
proxy::LatencyTimer,
|
||||
metrics::LatencyTimer,
|
||||
sasl,
|
||||
stream::{self, Stream},
|
||||
};
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
//! User credentials used in authentication.
|
||||
|
||||
use crate::{
|
||||
auth::password_hack::parse_endpoint_param,
|
||||
error::UserFacingError,
|
||||
proxy::{neon_options_str, NUM_CONNECTION_ACCEPTED_BY_SNI},
|
||||
auth::password_hack::parse_endpoint_param, error::UserFacingError,
|
||||
metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::neon_options_str,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use pq_proto::StartupMessageParams;
|
||||
|
||||
@@ -1,9 +1,6 @@
|
||||
use crate::{
|
||||
auth::parse_endpoint_param,
|
||||
cancellation::CancelClosure,
|
||||
console::errors::WakeComputeError,
|
||||
error::UserFacingError,
|
||||
proxy::{neon_option, NUM_DB_CONNECTIONS_GAUGE},
|
||||
auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError,
|
||||
error::UserFacingError, metrics::NUM_DB_CONNECTIONS_GAUGE, proxy::neon_option,
|
||||
};
|
||||
use futures::{FutureExt, TryFutureExt};
|
||||
use itertools::Itertools;
|
||||
|
||||
@@ -21,7 +21,7 @@ pub mod errors {
|
||||
use crate::{
|
||||
error::{io_error, UserFacingError},
|
||||
http,
|
||||
proxy::ShouldRetry,
|
||||
proxy::retry::ShouldRetry,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ use super::{
|
||||
errors::{ApiError, GetAuthInfoError, WakeComputeError},
|
||||
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
|
||||
};
|
||||
use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
|
||||
use crate::metrics::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
|
||||
use crate::{auth::backend::ComputeUserInfo, compute, http, scram};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
|
||||
@@ -13,7 +13,7 @@ pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio::time::Instant;
|
||||
use tracing::trace;
|
||||
|
||||
use crate::{proxy::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
|
||||
use crate::{metrics::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl};
|
||||
use reqwest_middleware::RequestBuilder;
|
||||
|
||||
/// This is the preferred way to create new http clients,
|
||||
|
||||
@@ -16,6 +16,7 @@ pub mod console;
|
||||
pub mod error;
|
||||
pub mod http;
|
||||
pub mod logging;
|
||||
pub mod metrics;
|
||||
pub mod parse;
|
||||
pub mod protocol2;
|
||||
pub mod proxy;
|
||||
|
||||
232
proxy/src/metrics.rs
Normal file
232
proxy/src/metrics.rs
Normal file
@@ -0,0 +1,232 @@
|
||||
use ::metrics::{
|
||||
exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
IntCounterPairVec, IntCounterVec,
|
||||
};
|
||||
use prometheus::{
|
||||
register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec,
|
||||
IntGaugeVec,
|
||||
};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::time;
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"proxy_opened_db_connections_total",
|
||||
"Number of opened connections to a database.",
|
||||
"proxy_closed_db_connections_total",
|
||||
"Number of closed connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"proxy_opened_client_connections_total",
|
||||
"Number of opened connections from a client.",
|
||||
"proxy_closed_client_connections_total",
|
||||
"Number of closed connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of client connections accepted.",
|
||||
"proxy_closed_connections_total",
|
||||
"Number of client connections closed.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"proxy_compute_connection_latency_seconds",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// http/ws/tcp, true/false, true/false, success/failure
|
||||
// 3 * 2 * 2 * 2 = 24 counters
|
||||
&["protocol", "cache_miss", "pool_miss", "outcome"],
|
||||
// largest bucket = 2^16 * 0.5ms = 32s
|
||||
exponential_buckets(0.0005, 2.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"proxy_console_request_latency",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// proxy_wake_compute/proxy_get_role_info
|
||||
&["request"],
|
||||
// largest bucket = 2^16 * 0.2ms = 13s
|
||||
exponential_buckets(0.0002, 2.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_allowed_ips_cache_misses",
|
||||
"Number of cache hits/misses for allowed ips",
|
||||
// hit/miss
|
||||
&["outcome"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"proxy_control_plane_token_acquire_seconds",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// largest bucket = 3^16 * 0.05ms = 2.15s
|
||||
exponential_buckets(0.00005, 3.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static RATE_LIMITER_LIMIT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"semaphore_control_plane_limit",
|
||||
"Current limit of the semaphore control plane",
|
||||
&["limit"], // 2 counters
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_by_sni",
|
||||
"Number of connections (per sni).",
|
||||
&["kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static ALLOWED_IPS_NUMBER: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"proxy_allowed_ips_number",
|
||||
"Number of allowed ips",
|
||||
vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub struct LatencyTimer {
|
||||
// time since the stopwatch was started
|
||||
start: Option<time::Instant>,
|
||||
// accumulated time on the stopwatch
|
||||
accumulated: std::time::Duration,
|
||||
// label data
|
||||
protocol: &'static str,
|
||||
cache_miss: bool,
|
||||
pool_miss: bool,
|
||||
outcome: &'static str,
|
||||
}
|
||||
|
||||
pub struct LatencyTimerPause<'a> {
|
||||
timer: &'a mut LatencyTimer,
|
||||
}
|
||||
|
||||
impl LatencyTimer {
|
||||
pub fn new(protocol: &'static str) -> Self {
|
||||
Self {
|
||||
start: Some(time::Instant::now()),
|
||||
accumulated: std::time::Duration::ZERO,
|
||||
protocol,
|
||||
cache_miss: false,
|
||||
// by default we don't do pooling
|
||||
pool_miss: true,
|
||||
// assume failed unless otherwise specified
|
||||
outcome: "failed",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
|
||||
// stop the stopwatch and record the time that we have accumulated
|
||||
let start = self.start.take().expect("latency timer should be started");
|
||||
self.accumulated += start.elapsed();
|
||||
LatencyTimerPause { timer: self }
|
||||
}
|
||||
|
||||
pub fn cache_miss(&mut self) {
|
||||
self.cache_miss = true;
|
||||
}
|
||||
|
||||
pub fn pool_hit(&mut self) {
|
||||
self.pool_miss = false;
|
||||
}
|
||||
|
||||
pub fn success(mut self) {
|
||||
self.outcome = "success";
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LatencyTimerPause<'_> {
|
||||
fn drop(&mut self) {
|
||||
// start the stopwatch again
|
||||
self.timer.start = Some(time::Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LatencyTimer {
|
||||
fn drop(&mut self) {
|
||||
let duration =
|
||||
self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated;
|
||||
COMPUTE_CONNECTION_LATENCY
|
||||
.with_label_values(&[
|
||||
self.protocol,
|
||||
bool_to_str(self.cache_miss),
|
||||
bool_to_str(self.pool_miss),
|
||||
self.outcome,
|
||||
])
|
||||
.observe(duration.as_secs_f64())
|
||||
}
|
||||
}
|
||||
|
||||
pub static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_total",
|
||||
"Number of connection failures (per kind).",
|
||||
&["kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_breakdown",
|
||||
"Number of wake-up failures (per kind).",
|
||||
&["retry", "kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
"Number of bytes sent/received between client and backend.",
|
||||
crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes",
|
||||
"Number of bytes sent/received between all clients and backends.",
|
||||
&["direction"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub const fn bool_to_str(x: bool) -> &'static str {
|
||||
if x {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
@@ -1,265 +1,41 @@
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub mod connect_compute;
|
||||
pub mod retry;
|
||||
|
||||
use crate::{
|
||||
auth,
|
||||
cancellation::{self, CancelMap},
|
||||
compute::{self, PostgresConnection},
|
||||
compute,
|
||||
config::{AuthenticationConfig, ProxyConfig, TlsConfig},
|
||||
console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api},
|
||||
http::StatusCode,
|
||||
console::{self, messages::MetricsAuxInfo},
|
||||
metrics::{
|
||||
LatencyTimer, NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER,
|
||||
NUM_CLIENT_CONNECTION_GAUGE, NUM_CONNECTION_REQUESTS_GAUGE,
|
||||
},
|
||||
protocol2::WithClientIp,
|
||||
rate_limiter::EndpointRateLimiter,
|
||||
stream::{PqStream, Stream},
|
||||
usage_metrics::{Ids, USAGE_METRICS},
|
||||
};
|
||||
use anyhow::{bail, Context};
|
||||
use async_trait::async_trait;
|
||||
use futures::TryFutureExt;
|
||||
use itertools::Itertools;
|
||||
use metrics::{
|
||||
exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec,
|
||||
IntCounterPairVec, IntCounterVec,
|
||||
};
|
||||
use once_cell::sync::{Lazy, OnceCell};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams};
|
||||
use prometheus::{
|
||||
register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec,
|
||||
IntGaugeVec,
|
||||
};
|
||||
use regex::Regex;
|
||||
use std::{error::Error, io, net::IpAddr, ops::ControlFlow, sync::Arc, time::Instant};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
|
||||
time,
|
||||
};
|
||||
use std::{net::IpAddr, sync::Arc};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{error, info, info_span, warn, Instrument};
|
||||
use tracing::{error, info, info_span, Instrument};
|
||||
use utils::measured_stream::MeasuredStream;
|
||||
|
||||
/// Number of times we should retry the `/proxy_wake_compute` http request.
|
||||
/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0
|
||||
pub const NUM_RETRIES_CONNECT: u32 = 16;
|
||||
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
|
||||
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25);
|
||||
const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
|
||||
use self::connect_compute::{connect_to_compute, TcpMechanism};
|
||||
|
||||
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
|
||||
const ERR_PROTO_VIOLATION: &str = "protocol violation";
|
||||
|
||||
pub static NUM_DB_CONNECTIONS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"proxy_opened_db_connections_total",
|
||||
"Number of opened connections to a database.",
|
||||
"proxy_closed_db_connections_total",
|
||||
"Number of closed connections to a database.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"proxy_opened_client_connections_total",
|
||||
"Number of opened connections from a client.",
|
||||
"proxy_closed_client_connections_total",
|
||||
"Number of closed connections from a client.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy<IntCounterPairVec> = Lazy::new(|| {
|
||||
register_int_counter_pair_vec!(
|
||||
"proxy_accepted_connections_total",
|
||||
"Number of client connections accepted.",
|
||||
"proxy_closed_connections_total",
|
||||
"Number of client connections closed.",
|
||||
&["protocol"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"proxy_compute_connection_latency_seconds",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// http/ws/tcp, true/false, true/false, success/failure
|
||||
// 3 * 2 * 2 * 2 = 24 counters
|
||||
&["protocol", "cache_miss", "pool_miss", "outcome"],
|
||||
// largest bucket = 2^16 * 0.5ms = 32s
|
||||
exponential_buckets(0.0005, 2.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
register_histogram_vec!(
|
||||
"proxy_console_request_latency",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// proxy_wake_compute/proxy_get_role_info
|
||||
&["request"],
|
||||
// largest bucket = 2^16 * 0.2ms = 13s
|
||||
exponential_buckets(0.0002, 2.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_allowed_ips_cache_misses",
|
||||
"Number of cache hits/misses for allowed ips",
|
||||
// hit/miss
|
||||
&["outcome"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"proxy_control_plane_token_acquire_seconds",
|
||||
"Time it took for proxy to establish a connection to the compute endpoint",
|
||||
// largest bucket = 3^16 * 0.05ms = 2.15s
|
||||
exponential_buckets(0.00005, 3.0, 16).unwrap(),
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static RATE_LIMITER_LIMIT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"semaphore_control_plane_limit",
|
||||
"Current limit of the semaphore control plane",
|
||||
&["limit"], // 2 counters
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_accepted_connections_by_sni",
|
||||
"Number of connections (per sni).",
|
||||
&["kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub static ALLOWED_IPS_NUMBER: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"proxy_allowed_ips_number",
|
||||
"Number of allowed ips",
|
||||
vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub struct LatencyTimer {
|
||||
// time since the stopwatch was started
|
||||
start: Option<Instant>,
|
||||
// accumulated time on the stopwatch
|
||||
accumulated: std::time::Duration,
|
||||
// label data
|
||||
protocol: &'static str,
|
||||
cache_miss: bool,
|
||||
pool_miss: bool,
|
||||
outcome: &'static str,
|
||||
}
|
||||
|
||||
pub struct LatencyTimerPause<'a> {
|
||||
timer: &'a mut LatencyTimer,
|
||||
}
|
||||
|
||||
impl LatencyTimer {
|
||||
pub fn new(protocol: &'static str) -> Self {
|
||||
Self {
|
||||
start: Some(Instant::now()),
|
||||
accumulated: std::time::Duration::ZERO,
|
||||
protocol,
|
||||
cache_miss: false,
|
||||
// by default we don't do pooling
|
||||
pool_miss: true,
|
||||
// assume failed unless otherwise specified
|
||||
outcome: "failed",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
|
||||
// stop the stopwatch and record the time that we have accumulated
|
||||
let start = self.start.take().expect("latency timer should be started");
|
||||
self.accumulated += start.elapsed();
|
||||
LatencyTimerPause { timer: self }
|
||||
}
|
||||
|
||||
pub fn cache_miss(&mut self) {
|
||||
self.cache_miss = true;
|
||||
}
|
||||
|
||||
pub fn pool_hit(&mut self) {
|
||||
self.pool_miss = false;
|
||||
}
|
||||
|
||||
pub fn success(mut self) {
|
||||
self.outcome = "success";
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LatencyTimerPause<'_> {
|
||||
fn drop(&mut self) {
|
||||
// start the stopwatch again
|
||||
self.timer.start = Some(Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for LatencyTimer {
|
||||
fn drop(&mut self) {
|
||||
let duration =
|
||||
self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated;
|
||||
COMPUTE_CONNECTION_LATENCY
|
||||
.with_label_values(&[
|
||||
self.protocol,
|
||||
bool_to_str(self.cache_miss),
|
||||
bool_to_str(self.pool_miss),
|
||||
self.outcome,
|
||||
])
|
||||
.observe(duration.as_secs_f64())
|
||||
}
|
||||
}
|
||||
|
||||
static NUM_CONNECTION_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_total",
|
||||
"Number of connection failures (per kind).",
|
||||
&["kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_WAKEUP_FAILURES: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_connection_failures_breakdown",
|
||||
"Number of wake-up failures (per kind).",
|
||||
&["retry", "kind"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes_per_client",
|
||||
"Number of bytes sent/received between client and backend.",
|
||||
crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS,
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"proxy_io_bytes",
|
||||
"Number of bytes sent/received between all clients and backends.",
|
||||
&["direction"],
|
||||
)
|
||||
.unwrap()
|
||||
});
|
||||
|
||||
pub async fn run_until_cancelled<F: std::future::Future>(
|
||||
f: F,
|
||||
cancellation_token: &CancellationToken,
|
||||
@@ -539,296 +315,6 @@ async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
|
||||
}
|
||||
}
|
||||
|
||||
/// If we couldn't connect, a cached connection info might be to blame
|
||||
/// (e.g. the compute node's address might've changed at the wrong time).
|
||||
/// Invalidate the cache entry (if any) to prevent subsequent errors.
|
||||
#[tracing::instrument(name = "invalidate_cache", skip_all)]
|
||||
pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg {
|
||||
let is_cached = node_info.cached();
|
||||
if is_cached {
|
||||
warn!("invalidating stalled compute node info cache entry");
|
||||
}
|
||||
let label = match is_cached {
|
||||
true => "compute_cached",
|
||||
false => "compute_uncached",
|
||||
};
|
||||
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
|
||||
|
||||
node_info.invalidate().config
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node once.
|
||||
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
|
||||
async fn connect_to_compute_once(
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
proto: &'static str,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let allow_self_signed_compute = node_info.allow_self_signed_compute;
|
||||
|
||||
node_info
|
||||
.config
|
||||
.connect(allow_self_signed_compute, timeout, proto)
|
||||
.await
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ConnectMechanism {
|
||||
type Connection;
|
||||
type ConnectError;
|
||||
type Error: From<Self::ConnectError>;
|
||||
async fn connect_once(
|
||||
&self,
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError>;
|
||||
|
||||
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
|
||||
}
|
||||
|
||||
pub struct TcpMechanism<'a> {
|
||||
/// KV-dictionary with PostgreSQL connection params.
|
||||
pub params: &'a StartupMessageParams,
|
||||
pub proto: &'static str,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TcpMechanism<'_> {
|
||||
type Connection = PostgresConnection;
|
||||
type ConnectError = compute::ConnectionError;
|
||||
type Error = compute::ConnectionError;
|
||||
|
||||
async fn connect_once(
|
||||
&self,
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
connect_to_compute_once(node_info, timeout, self.proto).await
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
config.set_startup_params(self.params);
|
||||
}
|
||||
}
|
||||
|
||||
const fn bool_to_str(x: bool) -> &'static str {
|
||||
if x {
|
||||
"true"
|
||||
} else {
|
||||
"false"
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
WakeComputeError::TimeoutError => "timeout_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn connect_to_compute<M: ConnectMechanism>(
|
||||
mechanism: &M,
|
||||
mut node_info: console::CachedNodeInfo,
|
||||
extra: &console::ConsoleReqExtra,
|
||||
creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>,
|
||||
mut latency_timer: LatencyTimer,
|
||||
) -> Result<M::Connection, M::Error>
|
||||
where
|
||||
M::ConnectError: ShouldRetry + std::fmt::Debug,
|
||||
M::Error: From<WakeComputeError>,
|
||||
{
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
// try once
|
||||
let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => {
|
||||
latency_timer.success();
|
||||
return Ok(res);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
};
|
||||
|
||||
latency_timer.cache_miss();
|
||||
|
||||
let mut num_retries = 1;
|
||||
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let node_info = loop {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
|
||||
#[cfg(feature = "testing")]
|
||||
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
#[cfg(test)]
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e.into());
|
||||
}
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
report_error(&e, true);
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(ControlFlow::Break(mut node_info)) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
break node_info;
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
};
|
||||
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
loop {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => {
|
||||
latency_timer.success();
|
||||
return Ok(res);
|
||||
}
|
||||
Err(e) => {
|
||||
let retriable = e.should_retry(num_retries);
|
||||
if !retriable {
|
||||
error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
|
||||
return Err(e.into());
|
||||
}
|
||||
warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to wake up the compute node.
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
pub fn handle_try_wake(
|
||||
result: Result<console::CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
match result {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
Ok(new) => Ok(ControlFlow::Break(new)),
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ShouldRetry {
|
||||
fn could_retry(&self) -> bool;
|
||||
fn should_retry(&self, num_retries: u32) -> bool {
|
||||
match self {
|
||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||
err => err.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for io::Error {
|
||||
fn could_retry(&self) -> bool {
|
||||
use std::io::ErrorKind;
|
||||
matches!(
|
||||
self.kind(),
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::error::DbError {
|
||||
fn could_retry(&self) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
matches!(
|
||||
self.code(),
|
||||
&SqlState::CONNECTION_FAILURE
|
||||
| &SqlState::CONNECTION_EXCEPTION
|
||||
| &SqlState::CONNECTION_DOES_NOT_EXIST
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::Error {
|
||||
fn could_retry(&self) -> bool {
|
||||
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
io::Error::could_retry(io_err)
|
||||
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
tokio_postgres::error::DbError::could_retry(db_err)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for compute::ConnectionError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
compute::ConnectionError::Postgres(err) => err.could_retry(),
|
||||
compute::ConnectionError::CouldNotConnect(err) => err.could_retry(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1))
|
||||
}
|
||||
|
||||
/// Finish client connection initialization: confirm auth success, send params, etc.
|
||||
#[tracing::instrument(skip_all)]
|
||||
async fn prepare_client_connection(
|
||||
|
||||
238
proxy/src/proxy/connect_compute.rs
Normal file
238
proxy/src/proxy/connect_compute.rs
Normal file
@@ -0,0 +1,238 @@
|
||||
use crate::{
|
||||
auth,
|
||||
compute::{self, PostgresConnection},
|
||||
console::{self, errors::WakeComputeError, Api},
|
||||
metrics::{bool_to_str, LatencyTimer, NUM_CONNECTION_FAILURES, NUM_WAKEUP_FAILURES},
|
||||
proxy::retry::{retry_after, ShouldRetry},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use hyper::StatusCode;
|
||||
use pq_proto::StartupMessageParams;
|
||||
use std::ops::ControlFlow;
|
||||
use tokio::time;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
|
||||
|
||||
/// If we couldn't connect, a cached connection info might be to blame
|
||||
/// (e.g. the compute node's address might've changed at the wrong time).
|
||||
/// Invalidate the cache entry (if any) to prevent subsequent errors.
|
||||
#[tracing::instrument(name = "invalidate_cache", skip_all)]
|
||||
pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg {
|
||||
let is_cached = node_info.cached();
|
||||
if is_cached {
|
||||
warn!("invalidating stalled compute node info cache entry");
|
||||
}
|
||||
let label = match is_cached {
|
||||
true => "compute_cached",
|
||||
false => "compute_uncached",
|
||||
};
|
||||
NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc();
|
||||
|
||||
node_info.invalidate().config
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node once.
|
||||
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
|
||||
async fn connect_to_compute_once(
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
proto: &'static str,
|
||||
) -> Result<PostgresConnection, compute::ConnectionError> {
|
||||
let allow_self_signed_compute = node_info.allow_self_signed_compute;
|
||||
|
||||
node_info
|
||||
.config
|
||||
.connect(allow_self_signed_compute, timeout, proto)
|
||||
.await
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ConnectMechanism {
|
||||
type Connection;
|
||||
type ConnectError;
|
||||
type Error: From<Self::ConnectError>;
|
||||
async fn connect_once(
|
||||
&self,
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError>;
|
||||
|
||||
fn update_connect_config(&self, conf: &mut compute::ConnCfg);
|
||||
}
|
||||
|
||||
pub struct TcpMechanism<'a> {
|
||||
/// KV-dictionary with PostgreSQL connection params.
|
||||
pub params: &'a StartupMessageParams,
|
||||
pub proto: &'static str,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ConnectMechanism for TcpMechanism<'_> {
|
||||
type Connection = PostgresConnection;
|
||||
type ConnectError = compute::ConnectionError;
|
||||
type Error = compute::ConnectionError;
|
||||
|
||||
async fn connect_once(
|
||||
&self,
|
||||
node_info: &console::CachedNodeInfo,
|
||||
timeout: time::Duration,
|
||||
) -> Result<PostgresConnection, Self::Error> {
|
||||
connect_to_compute_once(node_info, timeout, self.proto).await
|
||||
}
|
||||
|
||||
fn update_connect_config(&self, config: &mut compute::ConnCfg) {
|
||||
config.set_startup_params(self.params);
|
||||
}
|
||||
}
|
||||
|
||||
fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
use crate::console::errors::ApiError;
|
||||
let retry = bool_to_str(retry);
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => "bad_compute_address",
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
"quota_exceeded"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => "api_console_locked",
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => "api_console_bad_request",
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
"api_console_other_server_error"
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error",
|
||||
WakeComputeError::TimeoutError => "timeout_error",
|
||||
};
|
||||
NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc();
|
||||
}
|
||||
|
||||
/// Try to connect to the compute node, retrying if necessary.
|
||||
/// This function might update `node_info`, so we take it by `&mut`.
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn connect_to_compute<M: ConnectMechanism>(
|
||||
mechanism: &M,
|
||||
mut node_info: console::CachedNodeInfo,
|
||||
extra: &console::ConsoleReqExtra,
|
||||
creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>,
|
||||
mut latency_timer: LatencyTimer,
|
||||
) -> Result<M::Connection, M::Error>
|
||||
where
|
||||
M::ConnectError: ShouldRetry + std::fmt::Debug,
|
||||
M::Error: From<WakeComputeError>,
|
||||
{
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
|
||||
// try once
|
||||
let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => {
|
||||
latency_timer.success();
|
||||
return Ok(res);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(error = ?e, "could not connect to compute node");
|
||||
(invalidate_cache(node_info), e)
|
||||
}
|
||||
};
|
||||
|
||||
latency_timer.cache_miss();
|
||||
|
||||
let mut num_retries = 1;
|
||||
|
||||
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
|
||||
info!("compute node's state has likely changed; requesting a wake-up");
|
||||
let node_info = loop {
|
||||
let wake_res = match creds {
|
||||
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
|
||||
#[cfg(feature = "testing")]
|
||||
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
|
||||
// nothing to do?
|
||||
auth::BackendType::Link(_) => return Err(err.into()),
|
||||
// test backend
|
||||
#[cfg(test)]
|
||||
auth::BackendType::Test(x) => x.wake_compute(),
|
||||
};
|
||||
|
||||
match handle_try_wake(wake_res, num_retries) {
|
||||
Err(e) => {
|
||||
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
|
||||
report_error(&e, false);
|
||||
return Err(e.into());
|
||||
}
|
||||
// failed to wake up but we can continue to retry
|
||||
Ok(ControlFlow::Continue(e)) => {
|
||||
report_error(&e, true);
|
||||
warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node");
|
||||
}
|
||||
// successfully woke up a compute node and can break the wakeup loop
|
||||
Ok(ControlFlow::Break(mut node_info)) => {
|
||||
node_info.config.reuse_password(&config);
|
||||
mechanism.update_connect_config(&mut node_info.config);
|
||||
break node_info;
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
};
|
||||
|
||||
// now that we have a new node, try connect to it repeatedly.
|
||||
// this can error for a few reasons, for instance:
|
||||
// * DNS connection settings haven't quite propagated yet
|
||||
info!("wake_compute success. attempting to connect");
|
||||
loop {
|
||||
match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await {
|
||||
Ok(res) => {
|
||||
latency_timer.success();
|
||||
return Ok(res);
|
||||
}
|
||||
Err(e) => {
|
||||
let retriable = e.should_retry(num_retries);
|
||||
if !retriable {
|
||||
error!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
|
||||
return Err(e.into());
|
||||
}
|
||||
warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node");
|
||||
}
|
||||
}
|
||||
|
||||
let wait_duration = retry_after(num_retries);
|
||||
num_retries += 1;
|
||||
|
||||
time::sleep(wait_duration).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to wake up the compute node.
|
||||
/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable
|
||||
/// * Returns Ok(Break(node)) if the wakeup succeeded
|
||||
/// * Returns Err(e) if there was an error
|
||||
pub fn handle_try_wake(
|
||||
result: Result<console::CachedNodeInfo, WakeComputeError>,
|
||||
num_retries: u32,
|
||||
) -> Result<ControlFlow<console::CachedNodeInfo, WakeComputeError>, WakeComputeError> {
|
||||
match result {
|
||||
Err(err) => match &err {
|
||||
WakeComputeError::ApiError(api) if api.should_retry(num_retries) => {
|
||||
Ok(ControlFlow::Continue(err))
|
||||
}
|
||||
_ => Err(err),
|
||||
},
|
||||
// Ready to try again.
|
||||
Ok(new) => Ok(ControlFlow::Break(new)),
|
||||
}
|
||||
}
|
||||
68
proxy/src/proxy/retry.rs
Normal file
68
proxy/src/proxy/retry.rs
Normal file
@@ -0,0 +1,68 @@
|
||||
use crate::compute;
|
||||
use std::{error::Error, io};
|
||||
use tokio::time;
|
||||
|
||||
/// Number of times we should retry the `/proxy_wake_compute` http request.
|
||||
/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0
|
||||
pub const NUM_RETRIES_CONNECT: u32 = 16;
|
||||
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25);
|
||||
const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
|
||||
|
||||
pub trait ShouldRetry {
|
||||
fn could_retry(&self) -> bool;
|
||||
fn should_retry(&self, num_retries: u32) -> bool {
|
||||
match self {
|
||||
_ if num_retries >= NUM_RETRIES_CONNECT => false,
|
||||
err => err.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for io::Error {
|
||||
fn could_retry(&self) -> bool {
|
||||
use std::io::ErrorKind;
|
||||
matches!(
|
||||
self.kind(),
|
||||
ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::error::DbError {
|
||||
fn could_retry(&self) -> bool {
|
||||
use tokio_postgres::error::SqlState;
|
||||
matches!(
|
||||
self.code(),
|
||||
&SqlState::CONNECTION_FAILURE
|
||||
| &SqlState::CONNECTION_EXCEPTION
|
||||
| &SqlState::CONNECTION_DOES_NOT_EXIST
|
||||
| &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for tokio_postgres::Error {
|
||||
fn could_retry(&self) -> bool {
|
||||
if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
io::Error::could_retry(io_err)
|
||||
} else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) {
|
||||
tokio_postgres::error::DbError::could_retry(db_err)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for compute::ConnectionError {
|
||||
fn could_retry(&self) -> bool {
|
||||
match self {
|
||||
compute::ConnectionError::Postgres(err) => err.could_retry(),
|
||||
compute::ConnectionError::CouldNotConnect(err) => err.could_retry(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_after(num_retries: u32) -> time::Duration {
|
||||
BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1))
|
||||
}
|
||||
@@ -2,10 +2,13 @@
|
||||
|
||||
mod mitm;
|
||||
|
||||
use super::connect_compute::ConnectMechanism;
|
||||
use super::retry::ShouldRetry;
|
||||
use super::*;
|
||||
use crate::auth::backend::{ComputeUserInfo, TestBackend};
|
||||
use crate::config::CertResolver;
|
||||
use crate::console::{CachedNodeInfo, NodeInfo};
|
||||
use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT};
|
||||
use crate::{auth, http, sasl, scram};
|
||||
use async_trait::async_trait;
|
||||
use rstest::rstest;
|
||||
@@ -423,7 +426,7 @@ impl ConnectMechanism for TestConnectMechanism {
|
||||
async fn connect_once(
|
||||
&self,
|
||||
_node_info: &console::CachedNodeInfo,
|
||||
_timeout: time::Duration,
|
||||
_timeout: std::time::Duration,
|
||||
) -> Result<Self::Connection, Self::ConnectError> {
|
||||
let mut counter = self.counter.lock().unwrap();
|
||||
let action = self.sequence[*counter];
|
||||
|
||||
@@ -120,7 +120,7 @@ where
|
||||
struct PgFrame;
|
||||
impl Decoder for PgFrame {
|
||||
type Item = Bytes;
|
||||
type Error = io::Error;
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
|
||||
if src.len() < 5 {
|
||||
@@ -136,7 +136,7 @@ impl Decoder for PgFrame {
|
||||
}
|
||||
}
|
||||
impl Encoder<Bytes> for PgFrame {
|
||||
type Error = io::Error;
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
|
||||
dst.extend_from_slice(&item);
|
||||
|
||||
@@ -393,10 +393,10 @@ impl Limiter {
|
||||
}
|
||||
new_limit
|
||||
};
|
||||
crate::proxy::RATE_LIMITER_LIMIT
|
||||
crate::metrics::RATE_LIMITER_LIMIT
|
||||
.with_label_values(&["expected"])
|
||||
.set(new_limit as i64);
|
||||
crate::proxy::RATE_LIMITER_LIMIT
|
||||
crate::metrics::RATE_LIMITER_LIMIT
|
||||
.with_label_values(&["actual"])
|
||||
.set(actual_limit as i64);
|
||||
self.limits.store(new_limit, Ordering::Release);
|
||||
@@ -470,7 +470,7 @@ impl reqwest_middleware::Middleware for Limiter {
|
||||
)
|
||||
})?;
|
||||
info!(duration = ?start.elapsed(), "waiting for token to connect to the control plane");
|
||||
crate::proxy::RATE_LIMITER_ACQUIRE_LATENCY.observe(start.elapsed().as_secs_f64());
|
||||
crate::metrics::RATE_LIMITER_ACQUIRE_LATENCY.observe(start.elapsed().as_secs_f64());
|
||||
match next.run(req, extensions).await {
|
||||
Ok(response) => {
|
||||
self.release(token, Some(Outcome::from_reqwest_response(&response)))
|
||||
|
||||
@@ -13,8 +13,8 @@ pub use reqwest_middleware::{ClientWithMiddleware, Error};
|
||||
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE;
|
||||
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
|
||||
use crate::proxy::NUM_CLIENT_CONNECTION_GAUGE;
|
||||
use crate::rate_limiter::EndpointRateLimiter;
|
||||
use crate::{cancellation::CancelMap, config::ProxyConfig};
|
||||
use futures::StreamExt;
|
||||
|
||||
@@ -24,13 +24,12 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus};
|
||||
use crate::{
|
||||
auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list},
|
||||
console,
|
||||
proxy::{neon_options, LatencyTimer, NUM_DB_CONNECTIONS_GAUGE},
|
||||
metrics::{LatencyTimer, NUM_DB_CONNECTIONS_GAUGE},
|
||||
proxy::{connect_compute::ConnectMechanism, neon_options},
|
||||
usage_metrics::{Ids, MetricCounter, USAGE_METRICS},
|
||||
};
|
||||
use crate::{compute, config};
|
||||
|
||||
use crate::proxy::ConnectMechanism;
|
||||
|
||||
use tracing::{error, warn, Span};
|
||||
use tracing::{info, info_span, Instrument};
|
||||
|
||||
@@ -444,7 +443,7 @@ async fn connect_to_compute(
|
||||
.await?
|
||||
.context("missing cache entry from wake_compute")?;
|
||||
|
||||
crate::proxy::connect_to_compute(
|
||||
crate::proxy::connect_compute::connect_to_compute(
|
||||
&TokioMechanism {
|
||||
conn_id,
|
||||
conn_info,
|
||||
|
||||
@@ -29,7 +29,7 @@ use utils::http::error::ApiError;
|
||||
use utils::http::json::json_response;
|
||||
|
||||
use crate::config::HttpConfig;
|
||||
use crate::proxy::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
|
||||
|
||||
use super::conn_pool::ConnInfo;
|
||||
use super::conn_pool::GlobalConnPool;
|
||||
|
||||
Reference in New Issue
Block a user