diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 3d95d1cd3e..a265484fa6 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -40,6 +40,7 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; +use tokio::sync::mpsc; use tracing::{error, info}; use url::Url; @@ -47,6 +48,7 @@ use compute_api::models::{ComputeMetrics, ComputeState, ComputeStatus}; use compute_api::spec::ComputeSpec; use compute_tools::compute::ComputeNode; +use compute_tools::configurator::launch_configurator; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -168,10 +170,26 @@ fn main() -> Result<()> { }; let compute = Arc::new(compute_state); + // We have one configurator thread and async http server, so generally we + // single consumer - multiple producers pattern here. That's why we use + // `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of + // producers is limited to one due to code logic, but we still need to + // pass `Sender` to several threads. + // + // Next, we use async `hyper` + `tokio` http server, but all the other code + // is completely synchronous. So we need to send data from async to sync, + // that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`. + // It doesn't make much sense to rewrite all code to async now, but we can + // consider doing this in the future. + let (spec_tx, spec_rx) = mpsc::unbounded_channel::(); + // Launch service threads first, so we were able to serve availability // requests, while configuration is still in progress. - let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread"); + let _http_handle = + launch_http_server(&compute, spec_tx).expect("cannot launch http endpoint thread"); let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread"); + let _configurator_handle = + launch_configurator(&compute, spec_rx).expect("cannot launch configurator thread"); // Start Postgres let mut delay_exit = false; diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 80a6aac568..866310d215 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -216,6 +216,7 @@ impl ComputeNode { Ok(pg) } + /// Do initial configuration of the already started Postgres. #[instrument(skip(self))] pub fn apply_config(&self) -> Result<()> { // If connection fails, @@ -250,8 +251,8 @@ impl ComputeNode { // Proceed with post-startup configuration. Note, that order of operations is important. handle_roles(&self.spec, &mut client)?; handle_databases(&self.spec, &mut client)?; - handle_role_deletions(self, &mut client)?; - handle_grants(self, &mut client)?; + handle_role_deletions(&self.spec, self.connstr.as_str(), &mut client)?; + handle_grants(&self.spec, self.connstr.as_str(), &mut client)?; create_writability_check_data(&mut client)?; handle_extensions(&self.spec, &mut client)?; @@ -266,6 +267,46 @@ impl ComputeNode { Ok(()) } + // We could've wrapped this around `pg_ctl reload`, but right now we don't use + // `pg_ctl` for start / stop, so this just seems much easier to do as we already + // have opened connection to Postgres and superuser access. + #[instrument(skip(self, client))] + fn pg_reload_conf(&self, client: &mut Client) -> Result<()> { + client.simple_query("SELECT pg_reload_conf()")?; + Ok(()) + } + + /// Similar to `apply_config()`, but does a bit different sequence of operations, + /// as it's used to reconfigure a previously started and configured Postgres node. + #[instrument(skip(self, spec))] + pub fn reconfigure(&self, spec: ComputeSpec) -> Result<()> { + // Write new config + let pgdata_path = Path::new(&self.pgdata); + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; + + let mut client = Client::connect(self.connstr.as_str(), NoTls)?; + self.pg_reload_conf(&mut client)?; + + // Proceed with post-startup configuration. Note, that order of operations is important. + handle_roles(&spec, &mut client)?; + handle_databases(&spec, &mut client)?; + handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?; + handle_grants(&spec, self.connstr.as_str(), &mut client)?; + handle_extensions(&spec, &mut client)?; + + // 'Close' connection + drop(client); + + let unknown_op = "unknown".to_string(); + let op_id = spec.operation_uuid.as_ref().unwrap_or(&unknown_op); + info!( + "finished reconfiguration of compute node for operation {}", + op_id + ); + + Ok(()) + } + #[instrument(skip(self))] pub fn start_compute(&self) -> Result { info!( diff --git a/compute_tools/src/configurator.rs b/compute_tools/src/configurator.rs new file mode 100644 index 0000000000..cec109a6de --- /dev/null +++ b/compute_tools/src/configurator.rs @@ -0,0 +1,56 @@ +use std::sync::Arc; +use std::thread; + +use anyhow::Result; +use tokio::sync::mpsc::UnboundedReceiver; +use tracing::{error, info, instrument}; + +use crate::compute::ComputeNode; +use compute_api::models::ComputeStatus; +use compute_api::spec::ComputeSpec; + +#[instrument(skip(compute, rx))] +fn configurator_main_loop(compute: &Arc, mut rx: UnboundedReceiver) { + info!("waiting for reconfiguration requests"); + while let Some(spec) = rx.blocking_recv() { + info!("got spec = {:?}", &spec); + + let status = compute.get_status(); + // Sanity check, should never happen. + if status != ComputeStatus::ConfigurationPending { + error!( + "unexpected compute status: {:?}, expected {:?}", + status, + ComputeStatus::ConfigurationPending + ); + compute.set_status(ComputeStatus::Failed); + continue; + } else { + compute.set_status(ComputeStatus::Reconfiguration); + } + + let mut new_status = ComputeStatus::Failed; + if let Err(e) = compute.reconfigure(spec) { + error!("could not reconfigure compute node: {}", e); + } else { + new_status = ComputeStatus::Running; + info!("compute node reconfigured"); + } + + compute.set_status(new_status); + } + info!("configurator thread is exiting"); +} + +pub fn launch_configurator( + compute: &Arc, + rx: UnboundedReceiver, +) -> Result> { + let compute = Arc::clone(compute); + + Ok(thread::Builder::new() + .name("compute-configurator".into()) + .spawn(move || { + configurator_main_loop(&compute, rx); + })?) +} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 9d5b0d7a53..9d5694879d 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -4,16 +4,24 @@ use std::sync::Arc; use std::thread; use crate::compute::ComputeNode; +use compute_api::models::ComputeStatus; +use compute_api::spec::ComputeSpec; + use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use num_cpus; use serde_json; +use tokio::sync::mpsc::UnboundedSender; use tracing::{error, info}; use tracing_utils::http::OtelName; // Service function to handle all available routes. -async fn routes(req: Request, compute: &Arc) -> Response { +async fn routes( + req: Request, + compute: &Arc, + tx: &UnboundedSender, +) -> Response { // // NOTE: The URI path is currently included in traces. That's OK because // it doesn't contain any variable parts or sensitive information. But @@ -62,6 +70,51 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /spec POST request"); + let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); + if let Ok(spec) = serde_json::from_str::(&spec_raw) { + let mut state = compute.state.write().unwrap(); + if !(state.status == ComputeStatus::WaitingSpec + || state.status == ComputeStatus::Running) + { + let msg = format!( + "invalid compute status for reconfiguration request: {}", + serde_json::to_string(&*state).unwrap() + ); + error!(msg); + return Response::new(Body::from(msg)); + } + state.status = ComputeStatus::ConfigurationPending; + drop(state); + + if let Err(e) = tx.send(spec) { + error!("failed to send spec request to configurator thread: {}", e); + Response::new(Body::from(format!( + "could not request reconfiguration: {}", + e + ))) + } else { + info!("sent spec request to configurator"); + Response::new(Body::from("ok")) + } + } else { + let msg = "invalid spec"; + error!(msg); + Response::new(Body::from(msg)) + } + } + // Return the `404 Not Found` for any other routes. _ => { let mut not_found = Response::new(Body::from("404 Not Found")); @@ -73,14 +126,16 @@ async fn routes(req: Request, compute: &Arc) -> Response) { +async fn serve(state: Arc, tx: UnboundedSender) { let addr = SocketAddr::from(([0, 0, 0, 0], 3080)); let make_service = make_service_fn(move |_conn| { let state = state.clone(); + let tx = tx.clone(); async move { Ok::<_, Infallible>(service_fn(move |req: Request| { let state = state.clone(); + let tx = tx.clone(); async move { Ok::<_, Infallible>( // NOTE: We include the URI path in the string. It @@ -88,7 +143,7 @@ async fn serve(state: Arc) { // information in this API. tracing_utils::http::tracing_handler( req, - |req| routes(req, &state), + |req| routes(req, &state, &tx), OtelName::UriPath, ) .await, @@ -109,10 +164,12 @@ async fn serve(state: Arc) { } /// Launch a separate Hyper HTTP API server thread and return its `JoinHandle`. -pub fn launch_http_server(state: &Arc) -> Result> { +pub fn launch_http_server( + state: &Arc, + tx: UnboundedSender, +) -> Result> { let state = Arc::clone(state); - Ok(thread::Builder::new() .name("http-endpoint".into()) - .spawn(move || serve(state))?) + .spawn(move || serve(state, tx))?) } diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index aee6b53e6a..24811f75ee 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -4,6 +4,7 @@ //! pub mod checker; pub mod config; +pub mod configurator; pub mod http; #[macro_use] pub mod logger; diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 9c6825994d..a02956c745 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -6,7 +6,6 @@ use postgres::config::Config; use postgres::{Client, NoTls}; use tracing::{info, info_span, instrument, span_enabled, warn, Level}; -use crate::compute::ComputeNode; use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; @@ -185,8 +184,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Reassign all dependent objects and delete requested roles. #[instrument(skip_all)] -pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> { - if let Some(ops) = &node.spec.delta_operations { +pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { + if let Some(ops) = &spec.delta_operations { // First, reassign all dependent objects to db owners. info!("reassigning dependent objects of to-be-deleted roles"); @@ -203,7 +202,7 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< // Check that role is still present in Postgres, as this could be a // restart with the same spec after role deletion. if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { - reassign_owned_objects(node, &op.name)?; + reassign_owned_objects(spec, connstr, &op.name)?; } } @@ -227,10 +226,10 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< } // Reassign all owned objects in all databases to the owner of the database. -fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> { - for db in &node.spec.cluster.databases { +fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { + for db in &spec.cluster.databases { if db.owner != *role_name { - let mut conf = Config::from_str(node.connstr.as_str())?; + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); let mut client = conf.connect(NoTls)?; @@ -375,9 +374,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. #[instrument(skip_all)] -pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { - let spec = &node.spec; - +pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { info!("cluster spec grants:"); // We now have a separate `web_access` role to connect to the database @@ -409,8 +406,8 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { // Do some per-database access adjustments. We'd better do this at db creation time, // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants // atomically. - for db in &node.spec.cluster.databases { - let mut conf = Config::from_str(node.connstr.as_str())?; + for db in &spec.cluster.databases { + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); let mut db_client = conf.connect(NoTls)?; diff --git a/libs/compute_api/src/models.rs b/libs/compute_api/src/models.rs index 9d2575e7fd..620090eae2 100644 --- a/libs/compute_api/src/models.rs +++ b/libs/compute_api/src/models.rs @@ -14,12 +14,24 @@ pub struct ComputeState { pub error: Option, } -#[derive(Serialize, Clone, Copy, PartialEq, Eq)] +#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { + // Spec wasn't provided as start, waiting for it to be + // provided by control-plane. + WaitingSpec, + // Compute node has initial spec and is starting up. Init, + // Compute is configured and running. Running, + // Either startup or configuration failed, + // compute will exit soon or is waiting for + // control-plane to terminate it. Failed, + // Control-plane requested reconfiguration. + ConfigurationPending, + // New spec is being applied. + Reconfiguration, } fn rfc3339_serialize(x: &DateTime, s: S) -> Result diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 7b67806dd8..5c18caa9e9 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -12,7 +12,7 @@ pub type PgIdent = String; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct ComputeSpec { pub format_version: f32, pub timestamp: String, @@ -26,7 +26,7 @@ pub struct ComputeSpec { pub startup_tracing_context: Option>, } -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Cluster { pub cluster_id: String, pub name: String, @@ -42,7 +42,7 @@ pub struct Cluster { /// - DROP ROLE /// - ALTER ROLE name RENAME TO new_name /// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct DeltaOp { pub action: String, pub name: PgIdent, @@ -51,7 +51,7 @@ pub struct DeltaOp { /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -60,7 +60,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -70,7 +70,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Deserialize)] +#[derive(Clone, Debug, Deserialize)] pub struct GenericOption { pub name: String, pub value: Option,