diff --git a/proxy/src/auth/backend.rs b/proxy/src/auth/backend.rs index ba054b53eb..3b09e05bd2 100644 --- a/proxy/src/auth/backend.rs +++ b/proxy/src/auth/backend.rs @@ -11,7 +11,8 @@ use crate::auth::validate_password_and_exchange; use crate::console::errors::GetAuthInfoError; use crate::console::provider::AuthInfo; use crate::console::AuthSecret; -use crate::proxy::{handle_try_wake, retry_after, LatencyTimer}; +use crate::proxy::connect_compute::handle_try_wake; +use crate::proxy::retry::retry_after; use crate::scram; use crate::stream::Stream; use crate::{ @@ -22,6 +23,7 @@ use crate::{ provider::{CachedNodeInfo, ConsoleReqExtra}, Api, }, + metrics::LatencyTimer, stream, url, }; use futures::TryFutureExt; diff --git a/proxy/src/auth/backend/classic.rs b/proxy/src/auth/backend/classic.rs index ce52daf16c..5c394ec649 100644 --- a/proxy/src/auth/backend/classic.rs +++ b/proxy/src/auth/backend/classic.rs @@ -4,7 +4,7 @@ use crate::{ compute, config::AuthenticationConfig, console::AuthSecret, - proxy::LatencyTimer, + metrics::LatencyTimer, sasl, stream::{PqStream, Stream}, }; diff --git a/proxy/src/auth/backend/hacks.rs b/proxy/src/auth/backend/hacks.rs index abbd25008b..5dde514bca 100644 --- a/proxy/src/auth/backend/hacks.rs +++ b/proxy/src/auth/backend/hacks.rs @@ -4,7 +4,7 @@ use super::{ use crate::{ auth::{self, AuthFlow}, console::AuthSecret, - proxy::LatencyTimer, + metrics::LatencyTimer, sasl, stream::{self, Stream}, }; diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index 72149e8e29..c04769a199 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -1,9 +1,8 @@ //! User credentials used in authentication. use crate::{ - auth::password_hack::parse_endpoint_param, - error::UserFacingError, - proxy::{neon_options_str, NUM_CONNECTION_ACCEPTED_BY_SNI}, + auth::password_hack::parse_endpoint_param, error::UserFacingError, + metrics::NUM_CONNECTION_ACCEPTED_BY_SNI, proxy::neon_options_str, }; use itertools::Itertools; use pq_proto::StartupMessageParams; diff --git a/proxy/src/compute.rs b/proxy/src/compute.rs index f5f7270bf4..a54ba56e43 100644 --- a/proxy/src/compute.rs +++ b/proxy/src/compute.rs @@ -1,9 +1,6 @@ use crate::{ - auth::parse_endpoint_param, - cancellation::CancelClosure, - console::errors::WakeComputeError, - error::UserFacingError, - proxy::{neon_option, NUM_DB_CONNECTIONS_GAUGE}, + auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError, + error::UserFacingError, metrics::NUM_DB_CONNECTIONS_GAUGE, proxy::neon_option, }; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; diff --git a/proxy/src/console/provider.rs b/proxy/src/console/provider.rs index deab966d9e..8d399f26ea 100644 --- a/proxy/src/console/provider.rs +++ b/proxy/src/console/provider.rs @@ -21,7 +21,7 @@ pub mod errors { use crate::{ error::{io_error, UserFacingError}, http, - proxy::ShouldRetry, + proxy::retry::ShouldRetry, }; use thiserror::Error; diff --git a/proxy/src/console/provider/neon.rs b/proxy/src/console/provider/neon.rs index 192252a0df..f748c9a41f 100644 --- a/proxy/src/console/provider/neon.rs +++ b/proxy/src/console/provider/neon.rs @@ -5,7 +5,7 @@ use super::{ errors::{ApiError, GetAuthInfoError, WakeComputeError}, ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo, }; -use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; +use crate::metrics::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER}; use crate::{auth::backend::ComputeUserInfo, compute, http, scram}; use async_trait::async_trait; use futures::TryFutureExt; diff --git a/proxy/src/http.rs b/proxy/src/http.rs index 09423eca77..59e1492ed4 100644 --- a/proxy/src/http.rs +++ b/proxy/src/http.rs @@ -13,7 +13,7 @@ pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio::time::Instant; use tracing::trace; -use crate::{proxy::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl}; +use crate::{metrics::CONSOLE_REQUEST_LATENCY, rate_limiter, url::ApiUrl}; use reqwest_middleware::RequestBuilder; /// This is the preferred way to create new http clients, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index a22600cbb3..2da1eaf482 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -16,6 +16,7 @@ pub mod console; pub mod error; pub mod http; pub mod logging; +pub mod metrics; pub mod parse; pub mod protocol2; pub mod proxy; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs new file mode 100644 index 0000000000..8e2a6105b1 --- /dev/null +++ b/proxy/src/metrics.rs @@ -0,0 +1,232 @@ +use ::metrics::{ + exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec, + IntCounterPairVec, IntCounterVec, +}; +use prometheus::{ + register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec, + IntGaugeVec, +}; + +use once_cell::sync::Lazy; +use tokio::time; + +pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "proxy_opened_db_connections_total", + "Number of opened connections to a database.", + "proxy_closed_db_connections_total", + "Number of closed connections to a database.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "proxy_opened_client_connections_total", + "Number of opened connections from a client.", + "proxy_closed_client_connections_total", + "Number of closed connections from a client.", + &["protocol"], + ) + .unwrap() +}); + +pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy = Lazy::new(|| { + register_int_counter_pair_vec!( + "proxy_accepted_connections_total", + "Number of client connections accepted.", + "proxy_closed_connections_total", + "Number of client connections closed.", + &["protocol"], + ) + .unwrap() +}); + +pub static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { + register_histogram_vec!( + "proxy_compute_connection_latency_seconds", + "Time it took for proxy to establish a connection to the compute endpoint", + // http/ws/tcp, true/false, true/false, success/failure + // 3 * 2 * 2 * 2 = 24 counters + &["protocol", "cache_miss", "pool_miss", "outcome"], + // largest bucket = 2^16 * 0.5ms = 32s + exponential_buckets(0.0005, 2.0, 16).unwrap(), + ) + .unwrap() +}); + +pub static CONSOLE_REQUEST_LATENCY: Lazy = Lazy::new(|| { + register_histogram_vec!( + "proxy_console_request_latency", + "Time it took for proxy to establish a connection to the compute endpoint", + // proxy_wake_compute/proxy_get_role_info + &["request"], + // largest bucket = 2^16 * 0.2ms = 13s + exponential_buckets(0.0002, 2.0, 16).unwrap(), + ) + .unwrap() +}); + +pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_allowed_ips_cache_misses", + "Number of cache hits/misses for allowed ips", + // hit/miss + &["outcome"], + ) + .unwrap() +}); + +pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy = Lazy::new(|| { + register_histogram!( + "proxy_control_plane_token_acquire_seconds", + "Time it took for proxy to establish a connection to the compute endpoint", + // largest bucket = 3^16 * 0.05ms = 2.15s + exponential_buckets(0.00005, 3.0, 16).unwrap(), + ) + .unwrap() +}); + +pub static RATE_LIMITER_LIMIT: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "semaphore_control_plane_limit", + "Current limit of the semaphore control plane", + &["limit"], // 2 counters + ) + .unwrap() +}); + +pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_accepted_connections_by_sni", + "Number of connections (per sni).", + &["kind"], + ) + .unwrap() +}); + +pub static ALLOWED_IPS_NUMBER: Lazy = Lazy::new(|| { + register_histogram!( + "proxy_allowed_ips_number", + "Number of allowed ips", + vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0], + ) + .unwrap() +}); + +pub struct LatencyTimer { + // time since the stopwatch was started + start: Option, + // accumulated time on the stopwatch + accumulated: std::time::Duration, + // label data + protocol: &'static str, + cache_miss: bool, + pool_miss: bool, + outcome: &'static str, +} + +pub struct LatencyTimerPause<'a> { + timer: &'a mut LatencyTimer, +} + +impl LatencyTimer { + pub fn new(protocol: &'static str) -> Self { + Self { + start: Some(time::Instant::now()), + accumulated: std::time::Duration::ZERO, + protocol, + cache_miss: false, + // by default we don't do pooling + pool_miss: true, + // assume failed unless otherwise specified + outcome: "failed", + } + } + + pub fn pause(&mut self) -> LatencyTimerPause<'_> { + // stop the stopwatch and record the time that we have accumulated + let start = self.start.take().expect("latency timer should be started"); + self.accumulated += start.elapsed(); + LatencyTimerPause { timer: self } + } + + pub fn cache_miss(&mut self) { + self.cache_miss = true; + } + + pub fn pool_hit(&mut self) { + self.pool_miss = false; + } + + pub fn success(mut self) { + self.outcome = "success"; + } +} + +impl Drop for LatencyTimerPause<'_> { + fn drop(&mut self) { + // start the stopwatch again + self.timer.start = Some(time::Instant::now()); + } +} + +impl Drop for LatencyTimer { + fn drop(&mut self) { + let duration = + self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated; + COMPUTE_CONNECTION_LATENCY + .with_label_values(&[ + self.protocol, + bool_to_str(self.cache_miss), + bool_to_str(self.pool_miss), + self.outcome, + ]) + .observe(duration.as_secs_f64()) + } +} + +pub static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_connection_failures_total", + "Number of connection failures (per kind).", + &["kind"], + ) + .unwrap() +}); + +pub static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_connection_failures_breakdown", + "Number of wake-up failures (per kind).", + &["retry", "kind"], + ) + .unwrap() +}); + +pub static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_io_bytes_per_client", + "Number of bytes sent/received between client and backend.", + crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS, + ) + .unwrap() +}); + +pub static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_io_bytes", + "Number of bytes sent/received between all clients and backends.", + &["direction"], + ) + .unwrap() +}); + +pub const fn bool_to_str(x: bool) -> &'static str { + if x { + "true" + } else { + "false" + } +} diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index da65065179..17e910860c 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -1,265 +1,41 @@ #[cfg(test)] mod tests; +pub mod connect_compute; +pub mod retry; + use crate::{ auth, cancellation::{self, CancelMap}, - compute::{self, PostgresConnection}, + compute, config::{AuthenticationConfig, ProxyConfig, TlsConfig}, - console::{self, errors::WakeComputeError, messages::MetricsAuxInfo, Api}, - http::StatusCode, + console::{self, messages::MetricsAuxInfo}, + metrics::{ + LatencyTimer, NUM_BYTES_PROXIED_COUNTER, NUM_BYTES_PROXIED_PER_CLIENT_COUNTER, + NUM_CLIENT_CONNECTION_GAUGE, NUM_CONNECTION_REQUESTS_GAUGE, + }, protocol2::WithClientIp, rate_limiter::EndpointRateLimiter, stream::{PqStream, Stream}, usage_metrics::{Ids, USAGE_METRICS}, }; use anyhow::{bail, Context}; -use async_trait::async_trait; use futures::TryFutureExt; use itertools::Itertools; -use metrics::{ - exponential_buckets, register_int_counter_pair_vec, register_int_counter_vec, - IntCounterPairVec, IntCounterVec, -}; -use once_cell::sync::{Lazy, OnceCell}; +use once_cell::sync::OnceCell; use pq_proto::{BeMessage as Be, FeStartupPacket, StartupMessageParams}; -use prometheus::{ - register_histogram, register_histogram_vec, register_int_gauge_vec, Histogram, HistogramVec, - IntGaugeVec, -}; use regex::Regex; -use std::{error::Error, io, net::IpAddr, ops::ControlFlow, sync::Arc, time::Instant}; -use tokio::{ - io::{AsyncRead, AsyncWrite, AsyncWriteExt}, - time, -}; +use std::{net::IpAddr, sync::Arc}; +use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{error, info, info_span, Instrument}; use utils::measured_stream::MeasuredStream; -/// Number of times we should retry the `/proxy_wake_compute` http request. -/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0 -pub const NUM_RETRIES_CONNECT: u32 = 16; -const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2); -const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25); -const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; +use self::connect_compute::{connect_to_compute, TcpMechanism}; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; const ERR_PROTO_VIOLATION: &str = "protocol violation"; -pub static NUM_DB_CONNECTIONS_GAUGE: Lazy = Lazy::new(|| { - register_int_counter_pair_vec!( - "proxy_opened_db_connections_total", - "Number of opened connections to a database.", - "proxy_closed_db_connections_total", - "Number of closed connections to a database.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CLIENT_CONNECTION_GAUGE: Lazy = Lazy::new(|| { - register_int_counter_pair_vec!( - "proxy_opened_client_connections_total", - "Number of opened connections from a client.", - "proxy_closed_client_connections_total", - "Number of closed connections from a client.", - &["protocol"], - ) - .unwrap() -}); - -pub static NUM_CONNECTION_REQUESTS_GAUGE: Lazy = Lazy::new(|| { - register_int_counter_pair_vec!( - "proxy_accepted_connections_total", - "Number of client connections accepted.", - "proxy_closed_connections_total", - "Number of client connections closed.", - &["protocol"], - ) - .unwrap() -}); - -static COMPUTE_CONNECTION_LATENCY: Lazy = Lazy::new(|| { - register_histogram_vec!( - "proxy_compute_connection_latency_seconds", - "Time it took for proxy to establish a connection to the compute endpoint", - // http/ws/tcp, true/false, true/false, success/failure - // 3 * 2 * 2 * 2 = 24 counters - &["protocol", "cache_miss", "pool_miss", "outcome"], - // largest bucket = 2^16 * 0.5ms = 32s - exponential_buckets(0.0005, 2.0, 16).unwrap(), - ) - .unwrap() -}); - -pub static CONSOLE_REQUEST_LATENCY: Lazy = Lazy::new(|| { - register_histogram_vec!( - "proxy_console_request_latency", - "Time it took for proxy to establish a connection to the compute endpoint", - // proxy_wake_compute/proxy_get_role_info - &["request"], - // largest bucket = 2^16 * 0.2ms = 13s - exponential_buckets(0.0002, 2.0, 16).unwrap(), - ) - .unwrap() -}); - -pub static ALLOWED_IPS_BY_CACHE_OUTCOME: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_allowed_ips_cache_misses", - "Number of cache hits/misses for allowed ips", - // hit/miss - &["outcome"], - ) - .unwrap() -}); - -pub static RATE_LIMITER_ACQUIRE_LATENCY: Lazy = Lazy::new(|| { - register_histogram!( - "proxy_control_plane_token_acquire_seconds", - "Time it took for proxy to establish a connection to the compute endpoint", - // largest bucket = 3^16 * 0.05ms = 2.15s - exponential_buckets(0.00005, 3.0, 16).unwrap(), - ) - .unwrap() -}); - -pub static RATE_LIMITER_LIMIT: Lazy = Lazy::new(|| { - register_int_gauge_vec!( - "semaphore_control_plane_limit", - "Current limit of the semaphore control plane", - &["limit"], // 2 counters - ) - .unwrap() -}); - -pub static NUM_CONNECTION_ACCEPTED_BY_SNI: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_accepted_connections_by_sni", - "Number of connections (per sni).", - &["kind"], - ) - .unwrap() -}); - -pub static ALLOWED_IPS_NUMBER: Lazy = Lazy::new(|| { - register_histogram!( - "proxy_allowed_ips_number", - "Number of allowed ips", - vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 50.0, 100.0], - ) - .unwrap() -}); - -pub struct LatencyTimer { - // time since the stopwatch was started - start: Option, - // accumulated time on the stopwatch - accumulated: std::time::Duration, - // label data - protocol: &'static str, - cache_miss: bool, - pool_miss: bool, - outcome: &'static str, -} - -pub struct LatencyTimerPause<'a> { - timer: &'a mut LatencyTimer, -} - -impl LatencyTimer { - pub fn new(protocol: &'static str) -> Self { - Self { - start: Some(Instant::now()), - accumulated: std::time::Duration::ZERO, - protocol, - cache_miss: false, - // by default we don't do pooling - pool_miss: true, - // assume failed unless otherwise specified - outcome: "failed", - } - } - - pub fn pause(&mut self) -> LatencyTimerPause<'_> { - // stop the stopwatch and record the time that we have accumulated - let start = self.start.take().expect("latency timer should be started"); - self.accumulated += start.elapsed(); - LatencyTimerPause { timer: self } - } - - pub fn cache_miss(&mut self) { - self.cache_miss = true; - } - - pub fn pool_hit(&mut self) { - self.pool_miss = false; - } - - pub fn success(mut self) { - self.outcome = "success"; - } -} - -impl Drop for LatencyTimerPause<'_> { - fn drop(&mut self) { - // start the stopwatch again - self.timer.start = Some(Instant::now()); - } -} - -impl Drop for LatencyTimer { - fn drop(&mut self) { - let duration = - self.start.map(|start| start.elapsed()).unwrap_or_default() + self.accumulated; - COMPUTE_CONNECTION_LATENCY - .with_label_values(&[ - self.protocol, - bool_to_str(self.cache_miss), - bool_to_str(self.pool_miss), - self.outcome, - ]) - .observe(duration.as_secs_f64()) - } -} - -static NUM_CONNECTION_FAILURES: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_connection_failures_total", - "Number of connection failures (per kind).", - &["kind"], - ) - .unwrap() -}); - -static NUM_WAKEUP_FAILURES: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_connection_failures_breakdown", - "Number of wake-up failures (per kind).", - &["retry", "kind"], - ) - .unwrap() -}); - -static NUM_BYTES_PROXIED_PER_CLIENT_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_io_bytes_per_client", - "Number of bytes sent/received between client and backend.", - crate::console::messages::MetricsAuxInfo::TRAFFIC_LABELS, - ) - .unwrap() -}); - -static NUM_BYTES_PROXIED_COUNTER: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "proxy_io_bytes", - "Number of bytes sent/received between all clients and backends.", - &["direction"], - ) - .unwrap() -}); - pub async fn run_until_cancelled( f: F, cancellation_token: &CancellationToken, @@ -539,296 +315,6 @@ async fn handshake( } } -/// If we couldn't connect, a cached connection info might be to blame -/// (e.g. the compute node's address might've changed at the wrong time). -/// Invalidate the cache entry (if any) to prevent subsequent errors. -#[tracing::instrument(name = "invalidate_cache", skip_all)] -pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg { - let is_cached = node_info.cached(); - if is_cached { - warn!("invalidating stalled compute node info cache entry"); - } - let label = match is_cached { - true => "compute_cached", - false => "compute_uncached", - }; - NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); - - node_info.invalidate().config -} - -/// Try to connect to the compute node once. -#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)] -async fn connect_to_compute_once( - node_info: &console::CachedNodeInfo, - timeout: time::Duration, - proto: &'static str, -) -> Result { - let allow_self_signed_compute = node_info.allow_self_signed_compute; - - node_info - .config - .connect(allow_self_signed_compute, timeout, proto) - .await -} - -#[async_trait] -pub trait ConnectMechanism { - type Connection; - type ConnectError; - type Error: From; - async fn connect_once( - &self, - node_info: &console::CachedNodeInfo, - timeout: time::Duration, - ) -> Result; - - fn update_connect_config(&self, conf: &mut compute::ConnCfg); -} - -pub struct TcpMechanism<'a> { - /// KV-dictionary with PostgreSQL connection params. - pub params: &'a StartupMessageParams, - pub proto: &'static str, -} - -#[async_trait] -impl ConnectMechanism for TcpMechanism<'_> { - type Connection = PostgresConnection; - type ConnectError = compute::ConnectionError; - type Error = compute::ConnectionError; - - async fn connect_once( - &self, - node_info: &console::CachedNodeInfo, - timeout: time::Duration, - ) -> Result { - connect_to_compute_once(node_info, timeout, self.proto).await - } - - fn update_connect_config(&self, config: &mut compute::ConnCfg) { - config.set_startup_params(self.params); - } -} - -const fn bool_to_str(x: bool) -> &'static str { - if x { - "true" - } else { - "false" - } -} - -fn report_error(e: &WakeComputeError, retry: bool) { - use crate::console::errors::ApiError; - let retry = bool_to_str(retry); - let kind = match e { - WakeComputeError::BadComputeAddress(_) => "bad_compute_address", - WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error", - WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::LOCKED, - ref text, - }) if text.contains("written data quota exceeded") - || text.contains("the limit for current plan reached") => - { - "quota_exceeded" - } - WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::LOCKED, - .. - }) => "api_console_locked", - WakeComputeError::ApiError(ApiError::Console { - status: StatusCode::BAD_REQUEST, - .. - }) => "api_console_bad_request", - WakeComputeError::ApiError(ApiError::Console { status, .. }) - if status.is_server_error() => - { - "api_console_other_server_error" - } - WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error", - WakeComputeError::TimeoutError => "timeout_error", - }; - NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc(); -} - -/// Try to connect to the compute node, retrying if necessary. -/// This function might update `node_info`, so we take it by `&mut`. -#[tracing::instrument(skip_all)] -pub async fn connect_to_compute( - mechanism: &M, - mut node_info: console::CachedNodeInfo, - extra: &console::ConsoleReqExtra, - creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>, - mut latency_timer: LatencyTimer, -) -> Result -where - M::ConnectError: ShouldRetry + std::fmt::Debug, - M::Error: From, -{ - mechanism.update_connect_config(&mut node_info.config); - - // try once - let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { - Ok(res) => { - latency_timer.success(); - return Ok(res); - } - Err(e) => { - error!(error = ?e, "could not connect to compute node"); - (invalidate_cache(node_info), e) - } - }; - - latency_timer.cache_miss(); - - let mut num_retries = 1; - - // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node - info!("compute node's state has likely changed; requesting a wake-up"); - let node_info = loop { - let wake_res = match creds { - auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, - #[cfg(feature = "testing")] - auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, - // nothing to do? - auth::BackendType::Link(_) => return Err(err.into()), - // test backend - #[cfg(test)] - auth::BackendType::Test(x) => x.wake_compute(), - }; - - match handle_try_wake(wake_res, num_retries) { - Err(e) => { - error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); - report_error(&e, false); - return Err(e.into()); - } - // failed to wake up but we can continue to retry - Ok(ControlFlow::Continue(e)) => { - report_error(&e, true); - warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); - } - // successfully woke up a compute node and can break the wakeup loop - Ok(ControlFlow::Break(mut node_info)) => { - node_info.config.reuse_password(&config); - mechanism.update_connect_config(&mut node_info.config); - break node_info; - } - } - - let wait_duration = retry_after(num_retries); - num_retries += 1; - - time::sleep(wait_duration).await; - }; - - // now that we have a new node, try connect to it repeatedly. - // this can error for a few reasons, for instance: - // * DNS connection settings haven't quite propagated yet - info!("wake_compute success. attempting to connect"); - loop { - match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { - Ok(res) => { - latency_timer.success(); - return Ok(res); - } - Err(e) => { - let retriable = e.should_retry(num_retries); - if !retriable { - error!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); - return Err(e.into()); - } - warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); - } - } - - let wait_duration = retry_after(num_retries); - num_retries += 1; - - time::sleep(wait_duration).await; - } -} - -/// Attempts to wake up the compute node. -/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable -/// * Returns Ok(Break(node)) if the wakeup succeeded -/// * Returns Err(e) if there was an error -pub fn handle_try_wake( - result: Result, - num_retries: u32, -) -> Result, WakeComputeError> { - match result { - Err(err) => match &err { - WakeComputeError::ApiError(api) if api.should_retry(num_retries) => { - Ok(ControlFlow::Continue(err)) - } - _ => Err(err), - }, - // Ready to try again. - Ok(new) => Ok(ControlFlow::Break(new)), - } -} - -pub trait ShouldRetry { - fn could_retry(&self) -> bool; - fn should_retry(&self, num_retries: u32) -> bool { - match self { - _ if num_retries >= NUM_RETRIES_CONNECT => false, - err => err.could_retry(), - } - } -} - -impl ShouldRetry for io::Error { - fn could_retry(&self) -> bool { - use std::io::ErrorKind; - matches!( - self.kind(), - ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut - ) - } -} - -impl ShouldRetry for tokio_postgres::error::DbError { - fn could_retry(&self) -> bool { - use tokio_postgres::error::SqlState; - matches!( - self.code(), - &SqlState::CONNECTION_FAILURE - | &SqlState::CONNECTION_EXCEPTION - | &SqlState::CONNECTION_DOES_NOT_EXIST - | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, - ) - } -} - -impl ShouldRetry for tokio_postgres::Error { - fn could_retry(&self) -> bool { - if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) { - io::Error::could_retry(io_err) - } else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) { - tokio_postgres::error::DbError::could_retry(db_err) - } else { - false - } - } -} - -impl ShouldRetry for compute::ConnectionError { - fn could_retry(&self) -> bool { - match self { - compute::ConnectionError::Postgres(err) => err.could_retry(), - compute::ConnectionError::CouldNotConnect(err) => err.could_retry(), - _ => false, - } - } -} - -pub fn retry_after(num_retries: u32) -> time::Duration { - BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1)) -} - /// Finish client connection initialization: confirm auth success, send params, etc. #[tracing::instrument(skip_all)] async fn prepare_client_connection( diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs new file mode 100644 index 0000000000..88b0019c49 --- /dev/null +++ b/proxy/src/proxy/connect_compute.rs @@ -0,0 +1,238 @@ +use crate::{ + auth, + compute::{self, PostgresConnection}, + console::{self, errors::WakeComputeError, Api}, + metrics::{bool_to_str, LatencyTimer, NUM_CONNECTION_FAILURES, NUM_WAKEUP_FAILURES}, + proxy::retry::{retry_after, ShouldRetry}, +}; +use async_trait::async_trait; +use hyper::StatusCode; +use pq_proto::StartupMessageParams; +use std::ops::ControlFlow; +use tokio::time; +use tracing::{error, info, warn}; + +const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2); + +/// If we couldn't connect, a cached connection info might be to blame +/// (e.g. the compute node's address might've changed at the wrong time). +/// Invalidate the cache entry (if any) to prevent subsequent errors. +#[tracing::instrument(name = "invalidate_cache", skip_all)] +pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg { + let is_cached = node_info.cached(); + if is_cached { + warn!("invalidating stalled compute node info cache entry"); + } + let label = match is_cached { + true => "compute_cached", + false => "compute_uncached", + }; + NUM_CONNECTION_FAILURES.with_label_values(&[label]).inc(); + + node_info.invalidate().config +} + +/// Try to connect to the compute node once. +#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)] +async fn connect_to_compute_once( + node_info: &console::CachedNodeInfo, + timeout: time::Duration, + proto: &'static str, +) -> Result { + let allow_self_signed_compute = node_info.allow_self_signed_compute; + + node_info + .config + .connect(allow_self_signed_compute, timeout, proto) + .await +} + +#[async_trait] +pub trait ConnectMechanism { + type Connection; + type ConnectError; + type Error: From; + async fn connect_once( + &self, + node_info: &console::CachedNodeInfo, + timeout: time::Duration, + ) -> Result; + + fn update_connect_config(&self, conf: &mut compute::ConnCfg); +} + +pub struct TcpMechanism<'a> { + /// KV-dictionary with PostgreSQL connection params. + pub params: &'a StartupMessageParams, + pub proto: &'static str, +} + +#[async_trait] +impl ConnectMechanism for TcpMechanism<'_> { + type Connection = PostgresConnection; + type ConnectError = compute::ConnectionError; + type Error = compute::ConnectionError; + + async fn connect_once( + &self, + node_info: &console::CachedNodeInfo, + timeout: time::Duration, + ) -> Result { + connect_to_compute_once(node_info, timeout, self.proto).await + } + + fn update_connect_config(&self, config: &mut compute::ConnCfg) { + config.set_startup_params(self.params); + } +} + +fn report_error(e: &WakeComputeError, retry: bool) { + use crate::console::errors::ApiError; + let retry = bool_to_str(retry); + let kind = match e { + WakeComputeError::BadComputeAddress(_) => "bad_compute_address", + WakeComputeError::ApiError(ApiError::Transport(_)) => "api_transport_error", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + ref text, + }) if text.contains("written data quota exceeded") + || text.contains("the limit for current plan reached") => + { + "quota_exceeded" + } + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::LOCKED, + .. + }) => "api_console_locked", + WakeComputeError::ApiError(ApiError::Console { + status: StatusCode::BAD_REQUEST, + .. + }) => "api_console_bad_request", + WakeComputeError::ApiError(ApiError::Console { status, .. }) + if status.is_server_error() => + { + "api_console_other_server_error" + } + WakeComputeError::ApiError(ApiError::Console { .. }) => "api_console_other_error", + WakeComputeError::TimeoutError => "timeout_error", + }; + NUM_WAKEUP_FAILURES.with_label_values(&[retry, kind]).inc(); +} + +/// Try to connect to the compute node, retrying if necessary. +/// This function might update `node_info`, so we take it by `&mut`. +#[tracing::instrument(skip_all)] +pub async fn connect_to_compute( + mechanism: &M, + mut node_info: console::CachedNodeInfo, + extra: &console::ConsoleReqExtra, + creds: &auth::BackendType<'_, auth::backend::ComputeUserInfo>, + mut latency_timer: LatencyTimer, +) -> Result +where + M::ConnectError: ShouldRetry + std::fmt::Debug, + M::Error: From, +{ + mechanism.update_connect_config(&mut node_info.config); + + // try once + let (config, err) = match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { + Ok(res) => { + latency_timer.success(); + return Ok(res); + } + Err(e) => { + error!(error = ?e, "could not connect to compute node"); + (invalidate_cache(node_info), e) + } + }; + + latency_timer.cache_miss(); + + let mut num_retries = 1; + + // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node + info!("compute node's state has likely changed; requesting a wake-up"); + let node_info = loop { + let wake_res = match creds { + auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await, + #[cfg(feature = "testing")] + auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await, + // nothing to do? + auth::BackendType::Link(_) => return Err(err.into()), + // test backend + #[cfg(test)] + auth::BackendType::Test(x) => x.wake_compute(), + }; + + match handle_try_wake(wake_res, num_retries) { + Err(e) => { + error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node"); + report_error(&e, false); + return Err(e.into()); + } + // failed to wake up but we can continue to retry + Ok(ControlFlow::Continue(e)) => { + report_error(&e, true); + warn!(error = ?e, num_retries, retriable = true, "couldn't wake compute node"); + } + // successfully woke up a compute node and can break the wakeup loop + Ok(ControlFlow::Break(mut node_info)) => { + node_info.config.reuse_password(&config); + mechanism.update_connect_config(&mut node_info.config); + break node_info; + } + } + + let wait_duration = retry_after(num_retries); + num_retries += 1; + + time::sleep(wait_duration).await; + }; + + // now that we have a new node, try connect to it repeatedly. + // this can error for a few reasons, for instance: + // * DNS connection settings haven't quite propagated yet + info!("wake_compute success. attempting to connect"); + loop { + match mechanism.connect_once(&node_info, CONNECT_TIMEOUT).await { + Ok(res) => { + latency_timer.success(); + return Ok(res); + } + Err(e) => { + let retriable = e.should_retry(num_retries); + if !retriable { + error!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); + return Err(e.into()); + } + warn!(error = ?e, num_retries, retriable, "couldn't connect to compute node"); + } + } + + let wait_duration = retry_after(num_retries); + num_retries += 1; + + time::sleep(wait_duration).await; + } +} + +/// Attempts to wake up the compute node. +/// * Returns Ok(Continue(e)) if there was an error waking but retries are acceptable +/// * Returns Ok(Break(node)) if the wakeup succeeded +/// * Returns Err(e) if there was an error +pub fn handle_try_wake( + result: Result, + num_retries: u32, +) -> Result, WakeComputeError> { + match result { + Err(err) => match &err { + WakeComputeError::ApiError(api) if api.should_retry(num_retries) => { + Ok(ControlFlow::Continue(err)) + } + _ => Err(err), + }, + // Ready to try again. + Ok(new) => Ok(ControlFlow::Break(new)), + } +} diff --git a/proxy/src/proxy/retry.rs b/proxy/src/proxy/retry.rs new file mode 100644 index 0000000000..a85ed380b0 --- /dev/null +++ b/proxy/src/proxy/retry.rs @@ -0,0 +1,68 @@ +use crate::compute; +use std::{error::Error, io}; +use tokio::time; + +/// Number of times we should retry the `/proxy_wake_compute` http request. +/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0 +pub const NUM_RETRIES_CONNECT: u32 = 16; +const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25); +const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2; + +pub trait ShouldRetry { + fn could_retry(&self) -> bool; + fn should_retry(&self, num_retries: u32) -> bool { + match self { + _ if num_retries >= NUM_RETRIES_CONNECT => false, + err => err.could_retry(), + } + } +} + +impl ShouldRetry for io::Error { + fn could_retry(&self) -> bool { + use std::io::ErrorKind; + matches!( + self.kind(), + ErrorKind::ConnectionRefused | ErrorKind::AddrNotAvailable | ErrorKind::TimedOut + ) + } +} + +impl ShouldRetry for tokio_postgres::error::DbError { + fn could_retry(&self) -> bool { + use tokio_postgres::error::SqlState; + matches!( + self.code(), + &SqlState::CONNECTION_FAILURE + | &SqlState::CONNECTION_EXCEPTION + | &SqlState::CONNECTION_DOES_NOT_EXIST + | &SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + ) + } +} + +impl ShouldRetry for tokio_postgres::Error { + fn could_retry(&self) -> bool { + if let Some(io_err) = self.source().and_then(|x| x.downcast_ref()) { + io::Error::could_retry(io_err) + } else if let Some(db_err) = self.source().and_then(|x| x.downcast_ref()) { + tokio_postgres::error::DbError::could_retry(db_err) + } else { + false + } + } +} + +impl ShouldRetry for compute::ConnectionError { + fn could_retry(&self) -> bool { + match self { + compute::ConnectionError::Postgres(err) => err.could_retry(), + compute::ConnectionError::CouldNotConnect(err) => err.could_retry(), + _ => false, + } + } +} + +pub fn retry_after(num_retries: u32) -> time::Duration { + BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1)) +} diff --git a/proxy/src/proxy/tests.rs b/proxy/src/proxy/tests.rs index 4691abbfb9..3c483c59ee 100644 --- a/proxy/src/proxy/tests.rs +++ b/proxy/src/proxy/tests.rs @@ -2,10 +2,13 @@ mod mitm; +use super::connect_compute::ConnectMechanism; +use super::retry::ShouldRetry; use super::*; use crate::auth::backend::{ComputeUserInfo, TestBackend}; use crate::config::CertResolver; use crate::console::{CachedNodeInfo, NodeInfo}; +use crate::proxy::retry::{retry_after, NUM_RETRIES_CONNECT}; use crate::{auth, http, sasl, scram}; use async_trait::async_trait; use rstest::rstest; @@ -423,7 +426,7 @@ impl ConnectMechanism for TestConnectMechanism { async fn connect_once( &self, _node_info: &console::CachedNodeInfo, - _timeout: time::Duration, + _timeout: std::time::Duration, ) -> Result { let mut counter = self.counter.lock().unwrap(); let action = self.sequence[*counter]; diff --git a/proxy/src/proxy/tests/mitm.rs b/proxy/src/proxy/tests/mitm.rs index 50b3034936..a0a84a1dc0 100644 --- a/proxy/src/proxy/tests/mitm.rs +++ b/proxy/src/proxy/tests/mitm.rs @@ -120,7 +120,7 @@ where struct PgFrame; impl Decoder for PgFrame { type Item = Bytes; - type Error = io::Error; + type Error = std::io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { if src.len() < 5 { @@ -136,7 +136,7 @@ impl Decoder for PgFrame { } } impl Encoder for PgFrame { - type Error = io::Error; + type Error = std::io::Error; fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { dst.extend_from_slice(&item); diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index 8dfdfcd3db..a190b2cf8f 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -393,10 +393,10 @@ impl Limiter { } new_limit }; - crate::proxy::RATE_LIMITER_LIMIT + crate::metrics::RATE_LIMITER_LIMIT .with_label_values(&["expected"]) .set(new_limit as i64); - crate::proxy::RATE_LIMITER_LIMIT + crate::metrics::RATE_LIMITER_LIMIT .with_label_values(&["actual"]) .set(actual_limit as i64); self.limits.store(new_limit, Ordering::Release); @@ -470,7 +470,7 @@ impl reqwest_middleware::Middleware for Limiter { ) })?; info!(duration = ?start.elapsed(), "waiting for token to connect to the control plane"); - crate::proxy::RATE_LIMITER_ACQUIRE_LATENCY.observe(start.elapsed().as_secs_f64()); + crate::metrics::RATE_LIMITER_ACQUIRE_LATENCY.observe(start.elapsed().as_secs_f64()); match next.run(req, extensions).await { Ok(response) => { self.release(token, Some(Outcome::from_reqwest_response(&response))) diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index 870e9c1103..e358a0712f 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -13,8 +13,8 @@ pub use reqwest_middleware::{ClientWithMiddleware, Error}; pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; use tokio_util::task::TaskTracker; +use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE; use crate::protocol2::{ProxyProtocolAccept, WithClientIp}; -use crate::proxy::NUM_CLIENT_CONNECTION_GAUGE; use crate::rate_limiter::EndpointRateLimiter; use crate::{cancellation::CancelMap, config::ProxyConfig}; use futures::StreamExt; diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 69198d79d3..ab8903418b 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -24,13 +24,12 @@ use tokio_postgres::{AsyncMessage, ReadyForQueryStatus}; use crate::{ auth::{self, backend::ComputeUserInfo, check_peer_addr_is_in_list}, console, - proxy::{neon_options, LatencyTimer, NUM_DB_CONNECTIONS_GAUGE}, + metrics::{LatencyTimer, NUM_DB_CONNECTIONS_GAUGE}, + proxy::{connect_compute::ConnectMechanism, neon_options}, usage_metrics::{Ids, MetricCounter, USAGE_METRICS}, }; use crate::{compute, config}; -use crate::proxy::ConnectMechanism; - use tracing::{error, warn, Span}; use tracing::{info, info_span, Instrument}; @@ -444,7 +443,7 @@ async fn connect_to_compute( .await? .context("missing cache entry from wake_compute")?; - crate::proxy::connect_to_compute( + crate::proxy::connect_compute::connect_to_compute( &TokioMechanism { conn_id, conn_info, diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 795ba819c1..307b085ce0 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -29,7 +29,7 @@ use utils::http::error::ApiError; use utils::http::json::json_response; use crate::config::HttpConfig; -use crate::proxy::NUM_CONNECTION_REQUESTS_GAUGE; +use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE; use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool;