Keep the conn info cache on max_client_conn from pgbouncer (#11986)

## Problem
Hitting max_client_conn from pgbouncer would lead to invalidation of the
conn info cache.
Customers would hit the limit on wake_compute.

## Summary of changes
`should_retry_wake_compute` detects this specific error from pgbouncer
as non-retriable,
meaning we won't try to wake up the compute again.
This commit is contained in:
Konstantin Merenkov
2025-05-21 17:27:30 +02:00
committed by GitHub
parent 136cf1979b
commit 5db20af8a7
6 changed files with 240 additions and 4 deletions

40
Cargo.lock generated
View File

@@ -3898,6 +3898,16 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]] [[package]]
name = "num" name = "num"
version = "0.4.1" version = "0.4.1"
@@ -4182,6 +4192,12 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]] [[package]]
name = "p256" name = "p256"
version = "0.11.1" version = "0.11.1"
@@ -5239,6 +5255,7 @@ dependencies = [
"tracing-log", "tracing-log",
"tracing-opentelemetry", "tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
"tracing-test",
"tracing-utils", "tracing-utils",
"try-lock", "try-lock",
"typed-json", "typed-json",
@@ -7689,6 +7706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [ dependencies = [
"matchers", "matchers",
"nu-ansi-term",
"once_cell", "once_cell",
"regex", "regex",
"serde", "serde",
@@ -7702,6 +7720,27 @@ dependencies = [
"tracing-serde", "tracing-serde",
] ]
[[package]]
name = "tracing-test"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
dependencies = [
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "tracing-utils" name = "tracing-utils"
version = "0.1.0" version = "0.1.0"
@@ -8554,6 +8593,7 @@ dependencies = [
"tracing", "tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
"tracing-subscriber",
"url", "url",
"uuid", "uuid",
"zeroize", "zeroize",

View File

@@ -86,6 +86,27 @@ pub struct DbError {
} }
impl DbError { impl DbError {
pub fn new_test_error(code: SqlState, message: String) -> Self {
DbError {
severity: "ERROR".to_string(),
parsed_severity: Some(Severity::Error),
code,
message,
detail: None,
hint: None,
position: None,
where_: None,
schema: None,
table: None,
column: None,
datatype: None,
constraint: None,
file: None,
line: None,
routine: None,
}
}
pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result<DbError> { pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result<DbError> {
let mut severity = None; let mut severity = None;
let mut parsed_severity = None; let mut parsed_severity = None;

View File

@@ -127,3 +127,4 @@ rstest.workspace = true
walkdir.workspace = true walkdir.workspace = true
rand_distr = "0.4" rand_distr = "0.4"
tokio-postgres.workspace = true tokio-postgres.workspace = true
tracing-test = "0.2"

View File

@@ -48,7 +48,7 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
use postgres_client::error::SqlState; use postgres_client::error::SqlState;
// Here are errors that happens after the user successfully authenticated to the database. // Here are errors that happens after the user successfully authenticated to the database.
// TODO: there are pgbouncer errors that should be retried, but they are not listed here. // TODO: there are pgbouncer errors that should be retried, but they are not listed here.
!matches!( let non_retriable_pg_errors = matches!(
self.code(), self.code(),
&SqlState::TOO_MANY_CONNECTIONS &SqlState::TOO_MANY_CONNECTIONS
| &SqlState::OUT_OF_MEMORY | &SqlState::OUT_OF_MEMORY
@@ -56,8 +56,20 @@ impl ShouldRetryWakeCompute for postgres_client::error::DbError {
| &SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::T_R_SERIALIZATION_FAILURE
| &SqlState::INVALID_CATALOG_NAME | &SqlState::INVALID_CATALOG_NAME
| &SqlState::INVALID_SCHEMA_NAME | &SqlState::INVALID_SCHEMA_NAME
| &SqlState::INVALID_PARAMETER_VALUE | &SqlState::INVALID_PARAMETER_VALUE,
) );
if non_retriable_pg_errors {
return false;
}
// PGBouncer errors that should not trigger a wake_compute retry.
if self.code() == &SqlState::PROTOCOL_VIOLATION {
// Source for the error message:
// https://github.com/pgbouncer/pgbouncer/blob/f15997fe3effe3a94ba8bcc1ea562e6117d1a131/src/client.c#L1070
return !self
.message()
.contains("no more connections allowed (max_client_conn)");
}
true
} }
} }
@@ -110,3 +122,55 @@ pub(crate) fn retry_after(num_retries: u32, config: RetryConfig) -> time::Durati
.base_delay .base_delay
.mul_f64(config.backoff_factor.powi((num_retries as i32) - 1)) .mul_f64(config.backoff_factor.powi((num_retries as i32) - 1))
} }
#[cfg(test)]
mod tests {
use super::ShouldRetryWakeCompute;
use postgres_client::error::{DbError, SqlState};
#[test]
fn should_retry_wake_compute_for_db_error() {
// These SQLStates should NOT trigger a wake_compute retry.
let non_retry_states = [
SqlState::TOO_MANY_CONNECTIONS,
SqlState::OUT_OF_MEMORY,
SqlState::SYNTAX_ERROR,
SqlState::T_R_SERIALIZATION_FAILURE,
SqlState::INVALID_CATALOG_NAME,
SqlState::INVALID_SCHEMA_NAME,
SqlState::INVALID_PARAMETER_VALUE,
];
for state in non_retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
!err.should_retry_wake_compute(),
"State {state:?} unexpectedly retried"
);
}
// Errors coming from pgbouncer should not trigger a wake_compute retry
let non_retry_pgbouncer_errors = ["no more connections allowed (max_client_conn)"];
for error in non_retry_pgbouncer_errors {
let err = DbError::new_test_error(SqlState::PROTOCOL_VIOLATION, error.to_string());
assert!(
!err.should_retry_wake_compute(),
"PGBouncer error {error:?} unexpectedly retried"
);
}
// These SQLStates should trigger a wake_compute retry.
let retry_states = [
SqlState::CONNECTION_FAILURE,
SqlState::CONNECTION_EXCEPTION,
SqlState::CONNECTION_DOES_NOT_EXIST,
SqlState::SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION,
];
for state in retry_states {
let err = DbError::new_test_error(state.clone(), "oops".to_string());
assert!(
err.should_retry_wake_compute(),
"State {state:?} unexpectedly skipped retry"
);
}
}
}

View File

@@ -15,6 +15,7 @@ use rstest::rstest;
use rustls::crypto::ring; use rustls::crypto::ring;
use rustls::pki_types; use rustls::pki_types;
use tokio::io::DuplexStream; use tokio::io::DuplexStream;
use tracing_test::traced_test;
use super::connect_compute::ConnectMechanism; use super::connect_compute::ConnectMechanism;
use super::retry::CouldRetry; use super::retry::CouldRetry;
@@ -381,8 +382,14 @@ enum ConnectAction {
WakeFail, WakeFail,
WakeRetry, WakeRetry,
Connect, Connect,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = true
Retry, Retry,
// connect_once -> Err, could_retry = true, should_retry_wake_compute = false
RetryNoWake,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = true
Fail, Fail,
// connect_once -> Err, could_retry = false, should_retry_wake_compute = false
FailNoWake,
} }
#[derive(Clone)] #[derive(Clone)]
@@ -424,6 +431,7 @@ struct TestConnection;
#[derive(Debug)] #[derive(Debug)]
struct TestConnectError { struct TestConnectError {
retryable: bool, retryable: bool,
wakeable: bool,
kind: crate::error::ErrorKind, kind: crate::error::ErrorKind,
} }
@@ -448,7 +456,7 @@ impl CouldRetry for TestConnectError {
} }
impl ShouldRetryWakeCompute for TestConnectError { impl ShouldRetryWakeCompute for TestConnectError {
fn should_retry_wake_compute(&self) -> bool { fn should_retry_wake_compute(&self) -> bool {
true self.wakeable
} }
} }
@@ -471,10 +479,22 @@ impl ConnectMechanism for TestConnectMechanism {
ConnectAction::Connect => Ok(TestConnection), ConnectAction::Connect => Ok(TestConnection),
ConnectAction::Retry => Err(TestConnectError { ConnectAction::Retry => Err(TestConnectError {
retryable: true, retryable: true,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::RetryNoWake => Err(TestConnectError {
retryable: true,
wakeable: false,
kind: ErrorKind::Compute, kind: ErrorKind::Compute,
}), }),
ConnectAction::Fail => Err(TestConnectError { ConnectAction::Fail => Err(TestConnectError {
retryable: false, retryable: false,
wakeable: true,
kind: ErrorKind::Compute,
}),
ConnectAction::FailNoWake => Err(TestConnectError {
retryable: false,
wakeable: false,
kind: ErrorKind::Compute, kind: ErrorKind::Compute,
}), }),
x => panic!("expecting action {x:?}, connect is called instead"), x => panic!("expecting action {x:?}, connect is called instead"),
@@ -709,3 +729,92 @@ async fn wake_non_retry() {
.unwrap_err(); .unwrap_err();
mechanism.verify(); mechanism.verify();
} }
#[tokio::test]
#[traced_test]
async fn fail_but_wake_invalidates_cache() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::Fail,
ConnectAction::Wake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn fail_no_wake_skips_cache_invalidation() {
let ctx = RequestContext::test();
let mech = TestConnectMechanism::new(vec![
ConnectAction::Wake,
ConnectAction::FailNoWake,
ConnectAction::Connect,
]);
let user = helper_create_connect_info(&mech);
let cfg = config();
connect_to_compute(&ctx, &mech, &user, cfg.retry, &cfg)
.await
.unwrap();
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_but_wake_invalidates_cache() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → Retry (retryable + wakeable) → Wake → Connect
let mechanism = TestConnectMechanism::new(vec![Wake, Retry, Wake, Connect]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap();
mechanism.verify();
// Because Retry has wakeable=true, we should see invalidate_cache
assert!(logs_contain(
"invalidating stalled compute node info cache entry"
));
}
#[tokio::test]
#[traced_test]
async fn retry_no_wake_skips_invalidation() {
let _ = env_logger::try_init();
use ConnectAction::*;
let ctx = RequestContext::test();
// Wake → RetryNoWake (retryable + NOT wakeable)
let mechanism = TestConnectMechanism::new(vec![Wake, RetryNoWake]);
let user_info = helper_create_connect_info(&mechanism);
let cfg = config();
connect_to_compute(&ctx, &mechanism, &user_info, cfg.retry, &cfg)
.await
.unwrap_err();
mechanism.verify();
// Because RetryNoWake has wakeable=false, we must NOT see invalidate_cache
assert!(!logs_contain(
"invalidating stalled compute node info cache entry"
));
}

View File

@@ -107,6 +107,7 @@ tower = { version = "0.4", default-features = false, features = ["balance", "buf
tracing = { version = "0.1", features = ["log"] } tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" } tracing-core = { version = "0.1" }
tracing-log = { version = "0.2" } tracing-log = { version = "0.2" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
url = { version = "2", features = ["serde"] } url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4", "v7"] } uuid = { version = "1", features = ["serde", "v4", "v7"] }
zeroize = { version = "1", features = ["derive", "serde"] } zeroize = { version = "1", features = ["derive", "serde"] }