From 66dd3f8ca5a1cdf6adf158b5ed5eff556c4e9b1e Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Fri, 31 Mar 2023 15:52:58 +0200 Subject: [PATCH] Implement live reconfiguration in the `compute_ctl` Accept spec in JSON format and request compute reconfiguration from the configurator thread. If anything goes wrong after we set the compute state to `ConfigurationPending` and / or sent spec to the configurator thread, we basically leave compute in the potentially wrong state. That said, it's control-plane's responsibility to watch compute state after reconfiguration request and to clean restart it in case of errors. It still lacks ability of starting up without spec and some validations, i.e. that live reconfiguration should be only available with `--compute-id` and `--control-plane-uri` options. Otherwise, it works fine and could be tested by running `compute_ctl` locally, then sending it a new spec: ```shell curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec ``` We have one configurator thread and async http server, so generally we have single consumer - multiple producers pattern here. That's why we use `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of producers is limited to one due to code logic, but we still need an ability to potentially pass `Sender` to several threads. Next, we use async `hyper` + `tokio` http server, but all the other code is completely synchronous. So we need to send data from async to sync, that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`. It doesn't make much sense to rewrite all code to async now, but we can consider doing this in the future. I think that a combination of `Mutex` and `CondVar` would work just fine too, but as we already have `tokio`, I decided to try something from it. --- compute_tools/src/bin/compute_ctl.rs | 20 +++++++- compute_tools/src/compute.rs | 45 +++++++++++++++++- compute_tools/src/configurator.rs | 56 ++++++++++++++++++++++ compute_tools/src/http/api.rs | 69 +++++++++++++++++++++++++--- compute_tools/src/lib.rs | 1 + compute_tools/src/spec.rs | 21 ++++----- libs/compute_api/src/models.rs | 14 +++++- libs/compute_api/src/spec.rs | 12 ++--- 8 files changed, 210 insertions(+), 28 deletions(-) create mode 100644 compute_tools/src/configurator.rs diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 3d95d1cd3e..a265484fa6 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -40,6 +40,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; +use tokio::sync::mpsc; use tracing::{error, info}; use url::Url; @@ -47,6 +48,7 @@ use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus}; use compute_api::spec::ComputeSpec; use compute_tools::compute::ComputeNode; +use compute_tools::configurator::launch_configurator; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -168,10 +170,26 @@ fn main() -> Result<()> { }; let compute = Arc::new(compute_state); + // We have one configurator thread and async http server, so generally we + // single consumer - multiple producers pattern here. That's why we use + // `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of + // producers is limited to one due to code logic, but we still need to + // pass `Sender` to several threads. + // + // Next, we use async `hyper` + `tokio` http server, but all the other code + // is completely synchronous. So we need to send data from async to sync, + // that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`. + // It doesn't make much sense to rewrite all code to async now, but we can + // consider doing this in the future. + let (spec_tx, spec_rx) = mpsc::unbounded_channel::(); + // Launch service threads first, so we were able to serve availability // requests, while configuration is still in progress. - let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread"); + let _http_handle = + launch_http_server(&compute, spec_tx).expect("cannot launch http endpoint thread"); let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread"); + let _configurator_handle = + launch_configurator(&compute, spec_rx).expect("cannot launch configurator thread"); // Start Postgres let mut delay_exit = false; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 80a6aac568..866310d215 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -216,6 +216,7 @@ impl ComputeNode { Ok(pg) } + /// Do initial configuration of the already started Postgres. #[instrument(skip(self))] pub fn apply_config(&self) -> Result<()> { // If connection fails, @@ -250,8 +251,8 @@ impl ComputeNode { // Proceed with post-startup configuration. Note, that order of operations is important. handle_roles(&self.spec, &mut client)?; handle_databases(&self.spec, &mut client)?; - handle_role_deletions(self, &mut client)?; - handle_grants(self, &mut client)?; + handle_role_deletions(&self.spec, self.connstr.as_str(), &mut client)?; + handle_grants(&self.spec, self.connstr.as_str(), &mut client)?; create_writability_check_data(&mut client)?; handle_extensions(&self.spec, &mut client)?; @@ -266,6 +267,46 @@ impl ComputeNode { Ok(()) } + // We could've wrapped this around `pg_ctl reload`, but right now we don't use + // `pg_ctl` for start / stop, so this just seems much easier to do as we already + // have opened connection to Postgres and superuser access. + #[instrument(skip(self, client))] + fn pg_reload_conf(&self, client: &mut Client) -> Result<()> { + client.simple_query("SELECT pg_reload_conf()")?; + Ok(()) + } + + /// Similar to `apply_config()`, but does a bit different sequence of operations, + /// as it's used to reconfigure a previously started and configured Postgres node. + #[instrument(skip(self, spec))] + pub fn reconfigure(&self, spec: ComputeSpec) -> Result<()> { + // Write new config + let pgdata_path = Path::new(&self.pgdata); + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; + + let mut client = Client::connect(self.connstr.as_str(), NoTls)?; + self.pg_reload_conf(&mut client)?; + + // Proceed with post-startup configuration. Note, that order of operations is important. + handle_roles(&spec, &mut client)?; + handle_databases(&spec, &mut client)?; + handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; + handle_grants(&spec, self.connstr.as_str(), &mut client)?; + handle_extensions(&spec, &mut client)?; + + // 'Close' connection + drop(client); + + let unknown_op = "unknown".to_string(); + let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op); + info!( + "finished reconfiguration of compute node for operation {}", + op_id + ); + + Ok(()) + } + #[instrument(skip(self))] pub fn start_compute(&self) -> Result { info!( diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs new file mode 100644 index 0000000000..cec109a6de --- /dev/null +++ b/compute_tools/src/configurator.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; +use std::thread; + +use anyhow::Result; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::{error, info, instrument}; + +use crate::compute::ComputeNode; +use compute_api::models::ComputeStatus; +use compute_api::spec::ComputeSpec; + +#[instrument(skip(compute, rx))] +fn configurator_main_loop(compute: &Arc, mut rx: UnboundedReceiver) { + info!("waiting for reconfiguration requests"); + while let Some(spec) = rx.blocking_recv() { + info!("got spec = {:?}", &spec); + + let status = compute.get_status(); + // Sanity check, should never happen. + if status != ComputeStatus::ConfigurationPending { + error!( + "unexpected compute status: {:?}, expected {:?}", + status, + ComputeStatus::ConfigurationPending + ); + compute.set_status(ComputeStatus::Failed); + continue; + } else { + compute.set_status(ComputeStatus::Reconfiguration); + } + + let mut new_status = ComputeStatus::Failed; + if let Err(e) = compute.reconfigure(spec) { + error!("could not reconfigure compute node: {}", e); + } else { + new_status = ComputeStatus::Running; + info!("compute node reconfigured"); + } + + compute.set_status(new_status); + } + info!("configurator thread is exiting"); +} + +pub fn launch_configurator( + compute: &Arc, + rx: UnboundedReceiver, +) -> Result> { + let compute = Arc::clone(compute); + + Ok(thread::Builder::new() + .name("compute-configurator".into()) + .spawn(move || { + configurator_main_loop(&compute, rx); + })?) +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 9d5b0d7a53..9d5694879d 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -4,16 +4,24 @@ use std::sync::Arc; use std::thread; use crate::compute::ComputeNode; +use compute_api::models::ComputeStatus; +use compute_api::spec::ComputeSpec; + use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use num_cpus; use serde_json; +use tokio::sync::mpsc::UnboundedSender; use tracing::{error, info}; use tracing_utils::http::OtelName; // Service function to handle all available routes. -async fn routes(req: Request, compute: &Arc) -> Response { +async fn routes( + req: Request, + compute: &Arc, + tx: &UnboundedSender, +) -> Response { // // NOTE: The URI path is currently included in traces. That's OK because // it doesn't contain any variable parts or sensitive information. But @@ -62,6 +70,51 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /spec POST request"); + let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); + if let Ok(spec) = serde_json::from_str::(&spec_raw) { + let mut state = compute.state.write().unwrap(); + if !(state.status == ComputeStatus::WaitingSpec + || state.status == ComputeStatus::Running) + { + let msg = format!( + "invalid compute status for reconfiguration request: {}", + serde_json::to_string(&*state).unwrap() + ); + error!(msg); + return Response::new(Body::from(msg)); + } + state.status = ComputeStatus::ConfigurationPending; + drop(state); + + if let Err(e) = tx.send(spec) { + error!("failed to send spec request to configurator thread: {}", e); + Response::new(Body::from(format!( + "could not request reconfiguration: {}", + e + ))) + } else { + info!("sent spec request to configurator"); + Response::new(Body::from("ok")) + } + } else { + let msg = "invalid spec"; + error!(msg); + Response::new(Body::from(msg)) + } + } + // Return the `404 Not Found` for any other routes. _ => { let mut not_found = Response::new(Body::from("404 Not Found")); @@ -73,14 +126,16 @@ async fn routes(req: Request, compute: &Arc) -> Response) { +async fn serve(state: Arc, tx: UnboundedSender) { let addr = SocketAddr::from(([0, 0, 0, 0], 3080)); let make_service = make_service_fn(move |_conn| { let state = state.clone(); + let tx = tx.clone(); async move { Ok::<_, Infallible>(service_fn(move |req: Request| { let state = state.clone(); + let tx = tx.clone(); async move { Ok::<_, Infallible>( // NOTE: We include the URI path in the string. It @@ -88,7 +143,7 @@ async fn serve(state: Arc) { // information in this API. tracing_utils::http::tracing_handler( req, - |req| routes(req, &state), + |req| routes(req, &state, &tx), OtelName::UriPath, ) .await, @@ -109,10 +164,12 @@ async fn serve(state: Arc) { } /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`. -pub fn launch_http_server(state: &Arc) -> Result> { +pub fn launch_http_server( + state: &Arc, + tx: UnboundedSender, +) -> Result> { let state = Arc::clone(state); - Ok(thread::Builder::new() .name("http-endpoint".into()) - .spawn(move || serve(state))?) + .spawn(move || serve(state, tx))?) } diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index aee6b53e6a..24811f75ee 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -4,6 +4,7 @@ //! pub mod checker; pub mod config; +pub mod configurator; pub mod http; #[macro_use] pub mod logger; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 9c6825994d..a02956c745 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -6,7 +6,6 @@ use postgres::config::Config; use postgres::{Client, NoTls}; use tracing::{info, info_span, instrument, span_enabled, warn, Level}; -use crate::compute::ComputeNode; use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; @@ -185,8 +184,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Reassign all dependent objects and delete requested roles. #[instrument(skip_all)] -pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> { - if let Some(ops) = &node.spec.delta_operations { +pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { + if let Some(ops) = &spec.delta_operations { // First, reassign all dependent objects to db owners. info!("reassigning dependent objects of to-be-deleted roles"); @@ -203,7 +202,7 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< // Check that role is still present in Postgres, as this could be a // restart with the same spec after role deletion. if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { - reassign_owned_objects(node, &op.name)?; + reassign_owned_objects(spec, connstr, &op.name)?; } } @@ -227,10 +226,10 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< } // Reassign all owned objects in all databases to the owner of the database. -fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> { - for db in &node.spec.cluster.databases { +fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { + for db in &spec.cluster.databases { if db.owner != *role_name { - let mut conf = Config::from_str(node.connstr.as_str())?; + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); let mut client = conf.connect(NoTls)?; @@ -375,9 +374,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. #[instrument(skip_all)] -pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { - let spec = &node.spec; - +pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { info!("cluster spec grants:"); // We now have a separate `web_access` role to connect to the database @@ -409,8 +406,8 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { // Do some per-database access adjustments. We'd better do this at db creation time, // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants // atomically. - for db in &node.spec.cluster.databases { - let mut conf = Config::from_str(node.connstr.as_str())?; + for db in &spec.cluster.databases { + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); let mut db_client = conf.connect(NoTls)?; diff --git a/libs/compute_api/src/models.rs b/libs/compute_api/src/models.rs index 9d2575e7fd..620090eae2 100644 --- a/libs/compute_api/src/models.rs +++ b/libs/compute_api/src/models.rs @@ -14,12 +14,24 @@ pub struct ComputeState { pub error: Option, } -#[derive(Serialize, Clone, Copy, PartialEq, Eq)] +#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { + // Spec wasn't provided as start, waiting for it to be + // provided by control-plane. + WaitingSpec, + // Compute node has initial spec and is starting up. Init, + // Compute is configured and running. Running, + // Either startup or configuration failed, + // compute will exit soon or is waiting for + // control-plane to terminate it. Failed, + // Control-plane requested reconfiguration. + ConfigurationPending, + // New spec is being applied. + Reconfiguration, } fn rfc3339_serialize(x: &DateTime, s: S) -> Result diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 7b67806dd8..5c18caa9e9 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -12,7 +12,7 @@ pub type PgIdent = String; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ComputeSpec { pub format_version: f32, pub timestamp: String, @@ -26,7 +26,7 @@ pub struct ComputeSpec { pub startup_tracing_context: Option>, } -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Cluster { pub cluster_id: String, pub name: String, @@ -42,7 +42,7 @@ pub struct Cluster { /// - DROP ROLE /// - ALTER ROLE name RENAME TO new_name /// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct DeltaOp { pub action: String, pub name: PgIdent, @@ -51,7 +51,7 @@ pub struct DeltaOp { /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -60,7 +60,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -70,7 +70,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct GenericOption { pub name: String, pub value: Option,