add fast path for TLS renewal configuration

This commit is contained in:
Conrad Ludgate
2025-06-16 23:04:38 +01:00
committed by Conrad Ludgate
parent a24a0032ad
commit a3f2a2cae5
5 changed files with 102 additions and 33 deletions

View File

@@ -28,7 +28,7 @@ use std::path::Path;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::sync::{Arc, Condvar, Mutex, MutexGuard, RwLock};
use std::time::{Duration, Instant};
use std::{env, fs};
use tokio::{spawn, sync::watch, task::JoinHandle, time};
@@ -1951,10 +1951,7 @@ impl ComputeNode {
.clone(),
);
let mut tls_config = None::<TlsConfig>;
if spec.features.contains(&ComputeFeature::TlsExperimental) {
tls_config = self.compute_ctl_config.tls.clone();
}
let tls_config = self.tls_config(&spec);
self.update_installed_extensions_collection_interval(&spec);
@@ -2155,6 +2152,41 @@ impl ComputeNode {
Ok(())
}
/// Acquire the "reloading" lock while running the supplied function.
///
/// This ensures that this thread is the only thread that
/// can issue signals to postgres.
///
/// If the supplied function errors, the compute status is marked as failed.
pub fn lock_while_reloading<T>(
&self,
mut state: MutexGuard<'_, ComputeState>,
f: impl FnOnce(ComputeSpec) -> Result<T>,
) -> Result<T> {
let old_status = state.status;
// transition to the reloading state.
state.set_status(ComputeStatus::Reloading, &self.state_changed);
let spec = state.pspec.as_ref().unwrap().spec.clone();
// unlock while reloading, so we don't block other tasks.
drop(state);
let res = f(spec);
let new_status = if res.is_ok() {
old_status
} else {
ComputeStatus::Failed
};
let mut state = self.state.lock().unwrap();
// make sure our invariants are upheld
assert_eq!(state.status, ComputeStatus::Reloading);
state.set_status(new_status, &self.state_changed);
res
}
#[instrument(skip_all)]
pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
@@ -2210,16 +2242,10 @@ impl ComputeNode {
tokio::task::spawn_blocking(move || {
'cert_update: loop {
// wait for a new certificate update
digest = crate::tls::wait_until_cert_changed(digest, &tls_config.cert_path);
let new_digest = crate::tls::wait_until_cert_changed(digest, &tls_config.cert_path);
// ensure the keys are saved before continuing.
// load the corresponding keys
let key_pair = crate::tls::load_certs_blocking(&tls_config);
while let Err(e) =
crate::tls::update_key_path_blocking(Path::new(&self.params.pgdata), &key_pair)
{
error!("could not save TLS certificates: {e}");
std::thread::sleep(Duration::from_millis(20));
}
// let postgres/pgbouncer/local_proxy know the new cert/key exists.
// we need to wait until it's configurable first.
@@ -2228,12 +2254,8 @@ impl ComputeNode {
'status_update: loop {
match state.status {
// let's update the state to config pending
ComputeStatus::ConfigurationPending | ComputeStatus::Running => {
info!("reconfiguring compute due to TLS certificate renewal");
state.set_status(
ComputeStatus::ConfigurationPending,
&self.state_changed,
);
ComputeStatus::Running => {
info!("reloading compute due to TLS certificate renewal");
break 'status_update;
}
@@ -2246,20 +2268,54 @@ impl ComputeNode {
// wait
ComputeStatus::Init
| ComputeStatus::Configuration
| ComputeStatus::ConfigurationPending
| ComputeStatus::RefreshConfiguration
| ComputeStatus::RefreshConfigurationPending
| ComputeStatus::Reloading
| ComputeStatus::Empty => {
state = self.state_changed.wait(state).unwrap();
}
}
}
drop(state);
info!(
cert_path = tls_config.cert_path,
key_path = tls_config.key_path,
"TLS certificates renewed",
);
let result = self.lock_while_reloading(state, |spec| {
// ensure the keys are saved before continuing.
// we do this while holding the 'reloading' state so that we know we're not interfering with any
// active configuration stages.
if let Err(e) = crate::tls::update_key_path_blocking(
Path::new(&self.params.pgdata),
&key_pair,
) {
return Ok(Err(e));
}
// reload postgres/pgbouncer/local_proxy to pick up our new certificates.
self.reload(spec)?;
Ok(Ok(()))
});
match result {
// Reload failed. Compute is in a bad state.
Err(e) => {
error!("could not reload compute node: {}", e);
return;
}
// Updating the certificates failed. Retry
Ok(Err(e)) => {
error!("could not save TLS certificates: {e}");
std::thread::sleep(Duration::from_millis(20));
}
// Successful. Acknowledge that we've saved these certificates.
Ok(Ok(())) => {
digest = new_digest;
info!(
cert_path = tls_config.cert_path,
key_path = tls_config.key_path,
"TLS certificates renewed",
);
}
}
}
});
}

View File

@@ -12,8 +12,10 @@ use crate::http::JsonResponse;
/// Check that the compute is currently running.
pub(in crate::http) async fn is_writable(State(compute): State<Arc<ComputeNode>>) -> Response {
let status = compute.get_status();
if status != ComputeStatus::Running {
return JsonResponse::invalid_status(status);
match status {
// If we are running, or just reloading the config, we are ok to write a new config.
ComputeStatus::Running | ComputeStatus::Reloading => {}
_ => return JsonResponse::invalid_status(status),
}
match check_writability(&compute).await {

View File

@@ -34,9 +34,17 @@ pub(in crate::http) async fn configure(
let c = compute.clone();
let completed = task::spawn_blocking(move || {
let mut state = c.state.lock().unwrap();
while !matches!(state.status, ComputeStatus::Empty | ComputeStatus::Running) {
// wait until we are not concurrently configuring
state = c.state_changed.wait(state).unwrap();
loop {
match state.status {
// ideal state.
ComputeStatus::Empty | ComputeStatus::Running => break,
// we need to wait until reloaded
ComputeStatus::Reloading => {
state = c.state_changed.wait(state).unwrap();
}
// All other cases are unexpected.
_ => return Err(JsonResponse::invalid_status(state.status)),
}
}
// Pass the tracing span to the main thread that performs the startup,
@@ -63,7 +71,7 @@ pub(in crate::http) async fn configure(
if state.status == ComputeStatus::Failed {
let err = state.error.as_ref().map_or("unknown error", |x| x);
let msg = format!("compute configuration failed: {err:?}");
return Err(msg);
return Err(JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, msg));
}
}
@@ -73,7 +81,7 @@ pub(in crate::http) async fn configure(
.unwrap();
if let Err(e) = completed {
return JsonResponse::error(StatusCode::INTERNAL_SERVER_ERROR, e);
return e;
}
// Return current compute state if everything went well.

View File

@@ -953,7 +953,7 @@ impl Endpoint {
}
// keep retrying
}
ComputeStatus::Running => {
ComputeStatus::Reloading | ComputeStatus::Running => {
// All good!
break;
}

View File

@@ -155,6 +155,8 @@ pub enum ComputeStatus {
Empty,
// Compute configuration was requested.
ConfigurationPending,
// Postgres, pgbouncer, and local_proxy is currently being reloaded.
Reloading,
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
@@ -189,6 +191,7 @@ impl Display for ComputeStatus {
match self {
ComputeStatus::Empty => f.write_str("empty"),
ComputeStatus::ConfigurationPending => f.write_str("configuration-pending"),
ComputeStatus::Reloading => f.write_str("reloading"),
ComputeStatus::RefreshConfiguration => f.write_str("refresh-configuration"),
ComputeStatus::RefreshConfigurationPending => {
f.write_str("refresh-configuration-pending")