From a3f2a2cae54ac6903160d9283f594d23ecc58783 Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 16 Jun 2025 23:04:38 +0100 Subject: [PATCH] add fast path for TLS renewal configuration --- compute_tools/src/compute.rs | 106 +++++++++++++----- .../src/http/routes/check_writability.rs | 6 +- compute_tools/src/http/routes/configure.rs | 18 ++- control_plane/src/endpoint.rs | 2 +- libs/compute_api/src/responses.rs | 3 + 5 files changed, 102 insertions(+), 33 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 26df98b5d3..e6e0c357f1 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -28,7 +28,7 @@ use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; -use std::sync::{Arc, Condvar, Mutex, RwLock}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock}; use std::time::{Duration, Instant}; use std::{env, fs}; use tokio::{spawn, sync::watch, task::JoinHandle, time}; @@ -1951,10 +1951,7 @@ impl ComputeNode { .clone(), ); - let mut tls_config = None::; - if spec.features.contains(&ComputeFeature::TlsExperimental) { - tls_config = self.compute_ctl_config.tls.clone(); - } + let tls_config = self.tls_config(&spec); self.update_installed_extensions_collection_interval(&spec); @@ -2155,6 +2152,41 @@ impl ComputeNode { Ok(()) } + /// Acquire the "reloading" lock while running the supplied function. + /// + /// This ensures that this thread is the only thread that + /// can issue signals to postgres. + /// + /// If the supplied function errors, the compute status is marked as failed. + pub fn lock_while_reloading( + &self, + mut state: MutexGuard<'_, ComputeState>, + f: impl FnOnce(ComputeSpec) -> Result, + ) -> Result { + let old_status = state.status; + + // transition to the reloading state. + state.set_status(ComputeStatus::Reloading, &self.state_changed); + let spec = state.pspec.as_ref().unwrap().spec.clone(); + // unlock while reloading, so we don't block other tasks. + drop(state); + + let res = f(spec); + + let new_status = if res.is_ok() { + old_status + } else { + ComputeStatus::Failed + }; + + let mut state = self.state.lock().unwrap(); + // make sure our invariants are upheld + assert_eq!(state.status, ComputeStatus::Reloading); + state.set_status(new_status, &self.state_changed); + + res + } + #[instrument(skip_all)] pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); @@ -2210,16 +2242,10 @@ impl ComputeNode { tokio::task::spawn_blocking(move || { 'cert_update: loop { // wait for a new certificate update - digest = crate::tls::wait_until_cert_changed(digest, &tls_config.cert_path); + let new_digest = crate::tls::wait_until_cert_changed(digest, &tls_config.cert_path); - // ensure the keys are saved before continuing. + // load the corresponding keys let key_pair = crate::tls::load_certs_blocking(&tls_config); - while let Err(e) = - crate::tls::update_key_path_blocking(Path::new(&self.params.pgdata), &key_pair) - { - error!("could not save TLS certificates: {e}"); - std::thread::sleep(Duration::from_millis(20)); - } // let postgres/pgbouncer/local_proxy know the new cert/key exists. // we need to wait until it's configurable first. @@ -2228,12 +2254,8 @@ impl ComputeNode { 'status_update: loop { match state.status { // let's update the state to config pending - ComputeStatus::ConfigurationPending | ComputeStatus::Running => { - info!("reconfiguring compute due to TLS certificate renewal"); - state.set_status( - ComputeStatus::ConfigurationPending, - &self.state_changed, - ); + ComputeStatus::Running => { + info!("reloading compute due to TLS certificate renewal"); break 'status_update; } @@ -2246,20 +2268,54 @@ impl ComputeNode { // wait ComputeStatus::Init | ComputeStatus::Configuration + | ComputeStatus::ConfigurationPending | ComputeStatus::RefreshConfiguration | ComputeStatus::RefreshConfigurationPending + | ComputeStatus::Reloading | ComputeStatus::Empty => { state = self.state_changed.wait(state).unwrap(); } } } - drop(state); - info!( - cert_path = tls_config.cert_path, - key_path = tls_config.key_path, - "TLS certificates renewed", - ); + let result = self.lock_while_reloading(state, |spec| { + // ensure the keys are saved before continuing. + // we do this while holding the 'reloading' state so that we know we're not interfering with any + // active configuration stages. + if let Err(e) = crate::tls::update_key_path_blocking( + Path::new(&self.params.pgdata), + &key_pair, + ) { + return Ok(Err(e)); + } + + // reload postgres/pgbouncer/local_proxy to pick up our new certificates. + self.reload(spec)?; + + Ok(Ok(())) + }); + + match result { + // Reload failed. Compute is in a bad state. + Err(e) => { + error!("could not reload compute node: {}", e); + return; + } + // Updating the certificates failed. Retry + Ok(Err(e)) => { + error!("could not save TLS certificates: {e}"); + std::thread::sleep(Duration::from_millis(20)); + } + // Successful. Acknowledge that we've saved these certificates. + Ok(Ok(())) => { + digest = new_digest; + info!( + cert_path = tls_config.cert_path, + key_path = tls_config.key_path, + "TLS certificates renewed", + ); + } + } } }); } diff --git a/compute_tools/src/http/routes/check_writability.rs b/compute_tools/src/http/routes/check_writability.rs index 5a12686fa8..0d08638d64 100644 --- a/compute_tools/src/http/routes/check_writability.rs +++ b/compute_tools/src/http/routes/check_writability.rs @@ -12,8 +12,10 @@ use crate::http::JsonResponse; /// Check that the compute is currently running. pub(in crate::http) async fn is_writable(State(compute): State>) -> Response { let status = compute.get_status(); - if status != ComputeStatus::Running { - return JsonResponse::invalid_status(status); + match status { + // If we are running, or just reloading the config, we are ok to write a new config. + ComputeStatus::Running | ComputeStatus::Reloading => {} + _ => return JsonResponse::invalid_status(status), } match check_writability(&compute).await { diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs index 89db0e8364..a70745c56b 100644 --- a/compute_tools/src/http/routes/configure.rs +++ b/compute_tools/src/http/routes/configure.rs @@ -34,9 +34,17 @@ pub(in crate::http) async fn configure( let c = compute.clone(); let completed = task::spawn_blocking(move || { let mut state = c.state.lock().unwrap(); - while !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) { - // wait until we are not concurrently configuring - state = c.state_changed.wait(state).unwrap(); + loop { + match state.status { + // ideal state. + ComputeStatus::Empty | ComputeStatus::Running => break, + // we need to wait until reloaded + ComputeStatus::Reloading => { + state = c.state_changed.wait(state).unwrap(); + } + // All other cases are unexpected. + _ => return Err(JsonResponse::invalid_status(state.status)), + } } // Pass the tracing span to the main thread that performs the startup, @@ -63,7 +71,7 @@ pub(in crate::http) async fn configure( if state.status == ComputeStatus::Failed { let err = state.error.as_ref().map_or("unknown error", |x| x); let msg = format!("compute configuration failed: {err:?}"); - return Err(msg); + return Err(JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, msg)); } } @@ -73,7 +81,7 @@ pub(in crate::http) async fn configure( .unwrap(); if let Err(e) = completed { - return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e); + return e; } // Return current compute state if everything went well. diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 814ee2a52f..3065e202ab 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -953,7 +953,7 @@ impl Endpoint { } // keep retrying } - ComputeStatus::Running => { + ComputeStatus::Reloading | ComputeStatus::Running => { // All good! break; } diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index a27301e45e..202cf4cb27 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -155,6 +155,8 @@ pub enum ComputeStatus { Empty, // Compute configuration was requested. ConfigurationPending, + // Postgres, pgbouncer, and local_proxy is currently being reloaded. + Reloading, // Compute node has spec and initial startup and // configuration is in progress. Init, @@ -189,6 +191,7 @@ impl Display for ComputeStatus { match self { ComputeStatus::Empty => f.write_str("empty"), ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"), + ComputeStatus::Reloading => f.write_str("reloading"), ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"), ComputeStatus::RefreshConfigurationPending => { f.write_str("refresh-configuration-pending")