diff --git a/Cargo.lock b/Cargo.lock index 9f4d537b33..b52ecec128 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3898,6 +3898,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num" version = "0.4.1" @@ -4182,6 +4192,12 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "p256" version = "0.11.1" @@ -5239,6 +5255,7 @@ dependencies = [ "tracing-log", "tracing-opentelemetry", "tracing-subscriber", + "tracing-test", "tracing-utils", "try-lock", "typed-json", @@ -7689,6 +7706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "serde", @@ -7702,6 +7720,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.100", +] + [[package]] name = "tracing-utils" version = "0.1.0" @@ -8554,6 +8593,7 @@ dependencies = [ "tracing", "tracing-core", "tracing-log", + "tracing-subscriber", "url", "uuid", "zeroize", diff --git a/libs/proxy/tokio-postgres2/src/error/mod.rs b/libs/proxy/tokio-postgres2/src/error/mod.rs index b12e76e5bf..8149bceeb9 100644 --- a/libs/proxy/tokio-postgres2/src/error/mod.rs +++ b/libs/proxy/tokio-postgres2/src/error/mod.rs @@ -86,6 +86,27 @@ pub struct DbError { } impl DbError { + pub fn new_test_error(code: SqlState, message: String) -> Self { + DbError { + severity: "ERROR".to_string(), + parsed_severity: Some(Severity::Error), + code, + message, + detail: None, + hint: None, + position: None, + where_: None, + schema: None, + table: None, + column: None, + datatype: None, + constraint: None, + file: None, + line: None, + routine: None, + } + } + pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result { let mut severity = None; let mut parsed_severity = None; diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 2cec510d82..ce8610be24 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -127,3 +127,4 @@ rstest.workspace = true walkdir.workspace = true rand_distr = "0.4" tokio-postgres.workspace = true +tracing-test = "0.2" \ No newline at end of file diff --git a/proxy/src/proxy/retry.rs b/proxy/src/proxy/retry.rs index 42d1491782..0879564ced 100644 --- a/proxy/src/proxy/retry.rs +++ b/proxy/src/proxy/retry.rs @@ -48,7 +48,7 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError { use postgres_client::error::SqlState; // Here are errors that happens after the user successfully authenticated to the database. // TODO: there are pgbouncer errors that should be retried, but they are not listed here. - !matches!( + let non_retriable_pg_errors = matches!( self.code(), &SqlState::TOO_MANY_CONNECTIONS | &SqlState::OUT_OF_MEMORY @@ -56,8 +56,20 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError { | &SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::INVALID_CATALOG_NAME | &SqlState::INVALID_SCHEMA_NAME - | &SqlState::INVALID_PARAMETER_VALUE - ) + | &SqlState::INVALID_PARAMETER_VALUE, + ); + if non_retriable_pg_errors { + return false; + } + // PGBouncer errors that should not trigger a wake_compute retry. + if self.code() == &SqlState::PROTOCOL_VIOLATION { + // Source for the error message: + // https://github.com/pgbouncer/pgbouncer/blob/f15997fe3effe3a94ba8bcc1ea562e6117d1a131/src/client.c#L1070 + return !self + .message() + .contains("no more connections allowed (max_client_conn)"); + } + true } } @@ -110,3 +122,55 @@ pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Durati .base_delay .mul_f64(config.backoff_factor.powi((num_retries as i32) - 1)) } + +#[cfg(test)] +mod tests { + use super::ShouldRetryWakeCompute; + use postgres_client::error::{DbError, SqlState}; + + #[test] + fn should_retry_wake_compute_for_db_error() { + // These SQLStates should NOT trigger a wake_compute retry. + let non_retry_states = [ + SqlState::TOO_MANY_CONNECTIONS, + SqlState::OUT_OF_MEMORY, + SqlState::SYNTAX_ERROR, + SqlState::T_R_SERIALIZATION_FAILURE, + SqlState::INVALID_CATALOG_NAME, + SqlState::INVALID_SCHEMA_NAME, + SqlState::INVALID_PARAMETER_VALUE, + ]; + for state in non_retry_states { + let err = DbError::new_test_error(state.clone(), "oops".to_string()); + assert!( + !err.should_retry_wake_compute(), + "State {state:?} unexpectedly retried" + ); + } + + // Errors coming from pgbouncer should not trigger a wake_compute retry + let non_retry_pgbouncer_errors = ["no more connections allowed (max_client_conn)"]; + for error in non_retry_pgbouncer_errors { + let err = DbError::new_test_error(SqlState::PROTOCOL_VIOLATION, error.to_string()); + assert!( + !err.should_retry_wake_compute(), + "PGBouncer error {error:?} unexpectedly retried" + ); + } + + // These SQLStates should trigger a wake_compute retry. + let retry_states = [ + SqlState::CONNECTION_FAILURE, + SqlState::CONNECTION_EXCEPTION, + SqlState::CONNECTION_DOES_NOT_EXIST, + SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION, + ]; + for state in retry_states { + let err = DbError::new_test_error(state.clone(), "oops".to_string()); + assert!( + err.should_retry_wake_compute(), + "State {state:?} unexpectedly skipped retry" + ); + } + } +} diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index f47636cd71..be6426a63c 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -15,6 +15,7 @@ use rstest::rstest; use rustls::crypto::ring; use rustls::pki_types; use tokio::io::DuplexStream; +use tracing_test::traced_test; use super::connect_compute::ConnectMechanism; use super::retry::CouldRetry; @@ -381,8 +382,14 @@ enum ConnectAction { WakeFail, WakeRetry, Connect, + // connect_once -> Err, could_retry = true, should_retry_wake_compute = true Retry, + // connect_once -> Err, could_retry = true, should_retry_wake_compute = false + RetryNoWake, + // connect_once -> Err, could_retry = false, should_retry_wake_compute = true Fail, + // connect_once -> Err, could_retry = false, should_retry_wake_compute = false + FailNoWake, } #[derive(Clone)] @@ -424,6 +431,7 @@ struct TestConnection; #[derive(Debug)] struct TestConnectError { retryable: bool, + wakeable: bool, kind: crate::error::ErrorKind, } @@ -448,7 +456,7 @@ impl CouldRetry for TestConnectError { } impl ShouldRetryWakeCompute for TestConnectError { fn should_retry_wake_compute(&self) -> bool { - true + self.wakeable } } @@ -471,10 +479,22 @@ impl ConnectMechanism for TestConnectMechanism { ConnectAction::Connect => Ok(TestConnection), ConnectAction::Retry => Err(TestConnectError { retryable: true, + wakeable: true, + kind: ErrorKind::Compute, + }), + ConnectAction::RetryNoWake => Err(TestConnectError { + retryable: true, + wakeable: false, kind: ErrorKind::Compute, }), ConnectAction::Fail => Err(TestConnectError { retryable: false, + wakeable: true, + kind: ErrorKind::Compute, + }), + ConnectAction::FailNoWake => Err(TestConnectError { + retryable: false, + wakeable: false, kind: ErrorKind::Compute, }), x => panic!("expecting action {x:?}, connect is called instead"), @@ -709,3 +729,92 @@ async fn wake_non_retry() { .unwrap_err(); mechanism.verify(); } + +#[tokio::test] +#[traced_test] +async fn fail_but_wake_invalidates_cache() { + let ctx = RequestContext::test(); + let mech = TestConnectMechanism::new(vec![ + ConnectAction::Wake, + ConnectAction::Fail, + ConnectAction::Wake, + ConnectAction::Connect, + ]); + let user = helper_create_connect_info(&mech); + let cfg = config(); + + connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg) + .await + .unwrap(); + + assert!(logs_contain( + "invalidating stalled compute node info cache entry" + )); +} + +#[tokio::test] +#[traced_test] +async fn fail_no_wake_skips_cache_invalidation() { + let ctx = RequestContext::test(); + let mech = TestConnectMechanism::new(vec![ + ConnectAction::Wake, + ConnectAction::FailNoWake, + ConnectAction::Connect, + ]); + let user = helper_create_connect_info(&mech); + let cfg = config(); + + connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg) + .await + .unwrap(); + + assert!(!logs_contain( + "invalidating stalled compute node info cache entry" + )); +} + +#[tokio::test] +#[traced_test] +async fn retry_but_wake_invalidates_cache() { + let _ = env_logger::try_init(); + use ConnectAction::*; + + let ctx = RequestContext::test(); + // Wake → Retry (retryable + wakeable) → Wake → Connect + let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]); + let user_info = helper_create_connect_info(&mechanism); + let cfg = config(); + + connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg) + .await + .unwrap(); + mechanism.verify(); + + // Because Retry has wakeable=true, we should see invalidate_cache + assert!(logs_contain( + "invalidating stalled compute node info cache entry" + )); +} + +#[tokio::test] +#[traced_test] +async fn retry_no_wake_skips_invalidation() { + let _ = env_logger::try_init(); + use ConnectAction::*; + + let ctx = RequestContext::test(); + // Wake → RetryNoWake (retryable + NOT wakeable) + let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]); + let user_info = helper_create_connect_info(&mechanism); + let cfg = config(); + + connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg) + .await + .unwrap_err(); + mechanism.verify(); + + // Because RetryNoWake has wakeable=false, we must NOT see invalidate_cache + assert!(!logs_contain( + "invalidating stalled compute node info cache entry" + )); +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 69d44b82ea..87d0092fb2 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -107,6 +107,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf tracing = { version = "0.1", features = ["log"] } tracing-core = { version = "0.1" } tracing-log = { version = "0.2" } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["serde", "v4", "v7"] } zeroize = { version = "1", features = ["derive", "serde"] }