From db8dd6f380c097ab03740ed40dccc9e8ab311b4c Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Thu, 13 Apr 2023 18:07:29 +0200 Subject: [PATCH] [compute_ctl] Implement live reconfiguration (#3980) With this commit one can request compute reconfiguration from the running `compute_ctl` with compute in `Running` state by sending a new spec: ```shell curl -d "{\"spec\": $(cat ./compute-spec-new.json)}" http://localhost:3080/configure ``` Internally, we start a separate configurator thread that is waiting on `Condvar` for `ConfigurationPending` compute state in a loop. Then it does reconfiguration, sets compute back to `Running` state and notifies other waiters. It will need some follow-ups, e.g. for retry logic for control-plane requests, but should be useful for testing in the current state. This shouldn't affect any existing environment, since computes are configured in a different way there. Resolves neondatabase/cloud#4433 --- compute_tools/src/bin/compute_ctl.rs | 3 ++ compute_tools/src/compute.rs | 42 ++++++++++++++++++++++ compute_tools/src/configurator.rs | 54 ++++++++++++++++++++++++++++ compute_tools/src/http/api.rs | 2 +- compute_tools/src/lib.rs | 1 + compute_tools/src/spec.rs | 17 ++++++--- libs/compute_api/src/responses.rs | 14 +++++++- 7 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 compute_tools/src/configurator.rs diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 633e603f6b..309310407d 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -46,6 +46,7 @@ use url::Url; use compute_api::responses::ComputeStatus; use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; +use compute_tools::configurator::launch_configurator; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -175,6 +176,8 @@ fn main() -> Result<()> { // Launch remaining service threads let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread"); + let _configurator_handle = + launch_configurator(&compute).expect("cannot launch configurator thread"); // Start Postgres let mut delay_exit = false; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 07ede44c9b..6ddfcf86c2 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -356,6 +356,48 @@ impl ComputeNode { Ok(()) } + // We could've wrapped this around `pg_ctl reload`, but right now we don't use + // `pg_ctl` for start / stop, so this just seems much easier to do as we already + // have opened connection to Postgres and superuser access. + #[instrument(skip(self, client))] + fn pg_reload_conf(&self, client: &mut Client) -> Result<()> { + client.simple_query("SELECT pg_reload_conf()")?; + Ok(()) + } + + /// Similar to `apply_config()`, but does a bit different sequence of operations, + /// as it's used to reconfigure a previously started and configured Postgres node. + #[instrument(skip(self))] + pub fn reconfigure(&self) -> Result<()> { + let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec; + + // Write new config + let pgdata_path = Path::new(&self.pgdata); + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; + + let mut client = Client::connect(self.connstr.as_str(), NoTls)?; + self.pg_reload_conf(&mut client)?; + + // Proceed with post-startup configuration. Note, that order of operations is important. + handle_roles(&spec, &mut client)?; + handle_databases(&spec, &mut client)?; + handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; + handle_grants(&spec, self.connstr.as_str(), &mut client)?; + handle_extensions(&spec, &mut client)?; + + // 'Close' connection + drop(client); + + let unknown_op = "unknown".to_string(); + let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op); + info!( + "finished reconfiguration of compute node for operation {}", + op_id + ); + + Ok(()) + } + #[instrument(skip(self))] pub fn start_compute(&self) -> Result { let compute_state = self.state.lock().unwrap().clone(); diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs new file mode 100644 index 0000000000..a07fd0b8cd --- /dev/null +++ b/compute_tools/src/configurator.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; +use std::thread; + +use anyhow::Result; +use tracing::{error, info, instrument}; + +use compute_api::responses::ComputeStatus; + +use crate::compute::ComputeNode; + +#[instrument(skip(compute))] +fn configurator_main_loop(compute: &Arc) { + info!("waiting for reconfiguration requests"); + loop { + let state = compute.state.lock().unwrap(); + let mut state = compute.state_changed.wait(state).unwrap(); + + if state.status == ComputeStatus::ConfigurationPending { + info!("got configuration request"); + state.status = ComputeStatus::Configuration; + compute.state_changed.notify_all(); + drop(state); + + let mut new_status = ComputeStatus::Failed; + if let Err(e) = compute.reconfigure() { + error!("could not configure compute node: {}", e); + } else { + new_status = ComputeStatus::Running; + info!("compute node configured"); + } + + // XXX: used to test that API is blocking + // std::thread::sleep(std::time::Duration::from_millis(10000)); + + compute.set_status(new_status); + } else if state.status == ComputeStatus::Failed { + info!("compute node is now in Failed state, exiting"); + break; + } else { + info!("woken up for compute status: {:?}, sleeping", state.status); + } + } +} + +pub fn launch_configurator(compute: &Arc) -> Result> { + let compute = Arc::clone(compute); + + Ok(thread::Builder::new() + .name("compute-configurator".into()) + .spawn(move || { + configurator_main_loop(&compute); + info!("configurator thread is exited"); + })?) +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 81d4953345..92d058fbd1 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -155,7 +155,7 @@ async fn handle_configure_request( // ``` { let mut state = compute.state.lock().unwrap(); - if state.status != ComputeStatus::Empty { + if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running { let msg = format!( "invalid compute status for configuration request: {:?}", state.status.clone() diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index aee6b53e6a..24811f75ee 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -4,6 +4,7 @@ //! pub mod checker; pub mod config; +pub mod configurator; pub mod http; #[macro_use] pub mod logger; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 2350113c39..088f74335a 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,7 +1,7 @@ use std::path::Path; use std::str::FromStr; -use anyhow::Result; +use anyhow::{anyhow, bail, Result}; use postgres::config::Config; use postgres::{Client, NoTls}; use tracing::{info, info_span, instrument, span_enabled, warn, Level}; @@ -10,6 +10,7 @@ use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; +use compute_api::responses::ControlPlaneSpecResponse; use compute_api::spec::{ComputeSpec, Database, PgIdent, Role}; /// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT` @@ -26,13 +27,19 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result, +}