From e42982fb1e0178f5efe338a20e5fd8e593aa4ffb Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Thu, 6 Apr 2023 21:21:58 +0200 Subject: [PATCH] [compute_ctl] Empty computes and /configure API (#3963) This commit adds an option to start compute without spec and then pass it a valid spec via `POST /configure` API endpoint. This is a main prerequisite for maintaining the pool of compute nodes in the control-plane. For example: 1. Start compute with ```shell cargo run --bin compute_ctl -- -i no-compute \ -p http://localhost:9095 \ -D compute_pgdata \ -C "postgresql://cloud_admin@127.0.0.1:5434/postgres" \ -b ./pg_install/v15/bin/postgres ``` 2. Configure it with ```shell curl -d "{\"spec\": $(cat ./compute-spec.json)}" http://localhost:3080/configure ``` Internally, it's implemented using a `Condvar` + `Mutex`. Compute spec is moved under Mutex, as it's now could be updated in the http handler. Also `RwLock` was replaced with `Mutex` because the latter works well with `Condvar`. First part of the neondatabase/cloud#4433 --- compute_tools/src/bin/compute_ctl.rs | 153 +++++++++++++--------- compute_tools/src/compute.rs | 157 ++++++++++++++--------- compute_tools/src/http/api.rs | 128 +++++++++++++++++- compute_tools/src/http/mod.rs | 2 + compute_tools/src/http/openapi_spec.yaml | 105 +++++++++++++-- compute_tools/src/http/requests.rs | 11 ++ compute_tools/src/http/responses.rs | 40 ++++++ compute_tools/src/monitor.rs | 4 +- compute_tools/src/pg_helpers.rs | 6 +- compute_tools/src/spec.rs | 50 +++++--- 10 files changed, 498 insertions(+), 158 deletions(-) create mode 100644 compute_tools/src/http/requests.rs create mode 100644 compute_tools/src/http/responses.rs diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index f29a576413..1a3ac77af4 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -34,13 +34,14 @@ use std::fs::File; use std::panic; use std::path::Path; use std::process::exit; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Condvar, Mutex}; use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::Arg; use tracing::{error, info}; +use url::Url; use compute_tools::compute::{ComputeMetrics, ComputeNode, ComputeState, ComputeStatus}; use compute_tools::http::api::launch_http_server; @@ -49,7 +50,6 @@ use compute_tools::monitor::launch_monitor; use compute_tools::params::*; use compute_tools::pg_helpers::*; use compute_tools::spec::*; -use url::Url; fn main() -> Result<()> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; @@ -62,7 +62,7 @@ fn main() -> Result<()> { let connstr = matches .get_one::("connstr") .expect("Postgres connection string is required"); - let spec = matches.get_one::("spec"); + let spec_json = matches.get_one::("spec"); let spec_path = matches.get_one::("spec-path"); let compute_id = matches.get_one::("compute-id"); @@ -71,40 +71,107 @@ fn main() -> Result<()> { // Try to use just 'postgres' if no path is provided let pgbin = matches.get_one::("pgbin").unwrap(); - let spec: ComputeSpec = match spec { + let mut spec = Default::default(); + let mut spec_set = false; + let mut live_config_allowed = false; + match spec_json { // First, try to get cluster spec from the cli argument - Some(json) => serde_json::from_str(json)?, + Some(json) => { + spec = serde_json::from_str(json)?; + spec_set = true; + } None => { // Second, try to read it from the file if path is provided if let Some(sp) = spec_path { let path = Path::new(sp); let file = File::open(path)?; - serde_json::from_reader(file)? + spec = serde_json::from_reader(file)?; + spec_set = true; } else if let Some(id) = compute_id { if let Some(cp_base) = control_plane_uri { - let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec"); - let jwt: String = match std::env::var("NEON_CONSOLE_JWT") { - Ok(v) => v, - Err(_) => "".to_string(), - }; - - reqwest::blocking::Client::new() - .get(cp_uri) - .header("Authorization", jwt) - .send()? - .json()? + live_config_allowed = true; + if let Ok(s) = get_spec_from_control_plane(cp_base, id) { + spec = s; + spec_set = true; + } } else { - panic!( - "must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"", - control_plane_uri, compute_id - ); + panic!("must specify both --control-plane-uri and --compute-id or none"); } } else { - panic!("compute spec should be provided via --spec or --spec-path argument"); + panic!( + "compute spec should be provided by one of the following ways: \ + --spec OR --spec-path OR --control-plane-uri and --compute-id" + ); } } }; + let mut new_state = ComputeState::new(); + if spec_set { + new_state.spec = spec; + } + let compute_node = ComputeNode { + start_time: Utc::now(), + connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, + pgdata: pgdata.to_string(), + pgbin: pgbin.to_string(), + live_config_allowed, + metrics: ComputeMetrics::default(), + state: Mutex::new(new_state), + state_changed: Condvar::new(), + }; + let compute = Arc::new(compute_node); + + // Launch http service first, so we were able to serve control-plane + // requests, while configuration is still in progress. + let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread"); + + if !spec_set { + // No spec provided, hang waiting for it. + info!("no compute spec provided, waiting"); + let mut state = compute.state.lock().unwrap(); + while state.status != ComputeStatus::ConfigurationPending { + state = compute.state_changed.wait(state).unwrap(); + + if state.status == ComputeStatus::ConfigurationPending { + info!("got spec, continue configuration"); + // Spec is already set by the http server handler. + break; + } + } + } + + // We got all we need, fill in the state. + let mut state = compute.state.lock().unwrap(); + let pageserver_connstr = state + .spec + .cluster + .settings + .find("neon.pageserver_connstring") + .expect("pageserver connstr should be provided"); + let storage_auth_token = state.spec.storage_auth_token.clone(); + let tenant = state + .spec + .cluster + .settings + .find("neon.tenant_id") + .expect("tenant id should be provided"); + let timeline = state + .spec + .cluster + .settings + .find("neon.timeline_id") + .expect("tenant id should be provided"); + let startup_tracing_context = state.spec.startup_tracing_context.clone(); + + state.pageserver_connstr = pageserver_connstr; + state.storage_auth_token = storage_auth_token; + state.tenant = tenant; + state.timeline = timeline; + state.status = ComputeStatus::Init; + compute.state_changed.notify_all(); + drop(state); + // Extract OpenTelemetry context for the startup actions from the spec, and // attach it to the current tracing context. // @@ -120,7 +187,7 @@ fn main() -> Result<()> { // postgres is configured and up-and-running, we exit this span. Any other // actions that are performed on incoming HTTP requests, for example, are // performed in separate spans. - let startup_context_guard = if let Some(ref carrier) = spec.startup_tracing_context { + let startup_context_guard = if let Some(ref carrier) = startup_tracing_context { use opentelemetry::propagation::TextMapPropagator; use opentelemetry::sdk::propagation::TraceContextPropagator; Some(TraceContextPropagator::new().extract(carrier).attach()) @@ -128,41 +195,7 @@ fn main() -> Result<()> { None }; - let pageserver_connstr = spec - .cluster - .settings - .find("neon.pageserver_connstring") - .expect("pageserver connstr should be provided"); - let storage_auth_token = spec.storage_auth_token.clone(); - let tenant = spec - .cluster - .settings - .find("neon.tenant_id") - .expect("tenant id should be provided"); - let timeline = spec - .cluster - .settings - .find("neon.timeline_id") - .expect("tenant id should be provided"); - - let compute_state = ComputeNode { - start_time: Utc::now(), - connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, - pgdata: pgdata.to_string(), - pgbin: pgbin.to_string(), - spec, - tenant, - timeline, - pageserver_connstr, - storage_auth_token, - metrics: ComputeMetrics::default(), - state: RwLock::new(ComputeState::new()), - }; - let compute = Arc::new(compute_state); - - // Launch service threads first, so we were able to serve availability - // requests, while configuration is still in progress. - let _http_handle = launch_http_server(&compute).expect("cannot launch http endpoint thread"); + // Launch remaining service threads let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread"); // Start Postgres @@ -172,7 +205,7 @@ fn main() -> Result<()> { Ok(pg) => Some(pg), Err(err) => { error!("could not start the compute node: {:?}", err); - let mut state = compute.state.write().unwrap(); + let mut state = compute.state.lock().unwrap(); state.error = Some(format!("{:?}", err)); state.status = ComputeStatus::Failed; drop(state); @@ -262,7 +295,7 @@ fn cli() -> clap::Command { Arg::new("control-plane-uri") .short('p') .long("control-plane-uri") - .value_name("CONTROL_PLANE"), + .value_name("CONTROL_PLANE_API_BASE_URI"), ) } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 00d1e234ab..3e92ec57dc 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -20,12 +20,12 @@ use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::RwLock; +use std::sync::{Condvar, Mutex}; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; -use serde::{Serialize, Serializer}; +use serde::Serialize; use tokio_postgres; use tracing::{info, instrument, warn}; @@ -41,41 +41,52 @@ pub struct ComputeNode { pub connstr: url::Url, pub pgdata: String, pub pgbin: String, + pub metrics: ComputeMetrics, + /// We should only allow live re- / configuration of the compute node if + /// it uses 'pull model', i.e. it can go to control-plane and fetch + /// the latest configuration. Otherwise, there could be a case: + /// - we start compute with some spec provided as argument + /// - we push new spec and it does reconfiguration + /// - but then something happens and compute pod / VM is destroyed, + /// so k8s controller starts it again with the **old** spec + /// and the same for empty computes: + /// - we started compute without any spec + /// - we push spec and it does configuration + /// - but then it is restarted without any spec again + pub live_config_allowed: bool, + /// Volatile part of the `ComputeNode`, which should be used under `Mutex`. + /// To allow HTTP API server to serving status requests, while configuration + /// is in progress, lock should be held only for short periods of time to do + /// read/write, not the whole configuration process. + pub state: Mutex, + /// `Condvar` to allow notifying waiters about state changes. + pub state_changed: Condvar, +} + +#[derive(Clone, Debug)] +pub struct ComputeState { + pub status: ComputeStatus, + /// Timestamp of the last Postgres activity + pub last_active: DateTime, + pub error: Option, pub spec: ComputeSpec, pub tenant: String, pub timeline: String, pub pageserver_connstr: String, pub storage_auth_token: Option, - pub metrics: ComputeMetrics, - /// Volatile part of the `ComputeNode` so should be used under `RwLock` - /// to allow HTTP API server to serve status requests, while configuration - /// is in progress. - pub state: RwLock, -} - -fn rfc3339_serialize(x: &DateTime, s: S) -> Result -where - S: Serializer, -{ - x.to_rfc3339().serialize(s) -} - -#[derive(Serialize)] -#[serde(rename_all = "snake_case")] -pub struct ComputeState { - pub status: ComputeStatus, - /// Timestamp of the last Postgres activity - #[serde(serialize_with = "rfc3339_serialize")] - pub last_active: DateTime, - pub error: Option, } impl ComputeState { pub fn new() -> Self { Self { - status: ComputeStatus::Init, + status: ComputeStatus::Empty, last_active: Utc::now(), error: None, + spec: ComputeSpec::default(), + tenant: String::new(), + timeline: String::new(), + pageserver_connstr: String::new(), + storage_auth_token: None, } } } @@ -86,11 +97,22 @@ impl Default for ComputeState { } } -#[derive(Serialize, Clone, Copy, PartialEq, Eq)] +#[derive(Serialize, Clone, Copy, PartialEq, Eq, Debug)] #[serde(rename_all = "snake_case")] pub enum ComputeStatus { + // Spec wasn't provided at start, waiting for it to be + // provided by control-plane. + Empty, + // Compute configuration was requested. + ConfigurationPending, + // Compute node has spec and initial startup and + // configuration is in progress. Init, + // Compute is configured and running. Running, + // Either startup or configuration failed, + // compute will exit soon or is waiting for + // control-plane to terminate it. Failed, } @@ -104,11 +126,13 @@ pub struct ComputeMetrics { impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { - self.state.write().unwrap().status = status; + let mut state = self.state.lock().unwrap(); + state.status = status; + self.state_changed.notify_all(); } pub fn get_status(&self) -> ComputeStatus { - self.state.read().unwrap().status + self.state.lock().unwrap().status } // Remove `pgdata` directory and create it again with right permissions. @@ -124,15 +148,15 @@ impl ComputeNode { // Get basebackup from the libpq connection to pageserver using `connstr` and // unarchive it to `pgdata` directory overriding all its previous content. - #[instrument(skip(self))] - fn get_basebackup(&self, lsn: &str) -> Result<()> { + #[instrument(skip(self, compute_state))] + fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> { let start_time = Utc::now(); - let mut config = postgres::Config::from_str(&self.pageserver_connstr)?; + let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?; // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. - if let Some(storage_auth_token) = &self.storage_auth_token { + if let Some(storage_auth_token) = &compute_state.storage_auth_token { info!("Got storage auth token from spec file"); config.password(storage_auth_token); } else { @@ -141,8 +165,14 @@ impl ComputeNode { let mut client = config.connect(NoTls)?; let basebackup_cmd = match lsn { - "0/0" => format!("basebackup {} {}", &self.tenant, &self.timeline), // First start of the compute - _ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn), + "0/0" => format!( + "basebackup {} {}", + &compute_state.tenant, &compute_state.timeline + ), // First start of the compute + _ => format!( + "basebackup {} {} {}", + &compute_state.tenant, &compute_state.timeline, lsn + ), }; let copyreader = client.copy_out(basebackup_cmd.as_str())?; @@ -169,14 +199,14 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. - #[instrument(skip(self))] - fn sync_safekeepers(&self) -> Result { + #[instrument(skip(self, storage_auth_token))] + fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); let sync_handle = Command::new(&self.pgbin) .args(["--sync-safekeepers"]) .env("PGDATA", &self.pgdata) // we cannot use -D in this mode - .envs(if let Some(storage_auth_token) = &self.storage_auth_token { + .envs(if let Some(storage_auth_token) = &storage_auth_token { vec![("NEON_AUTH_TOKEN", storage_auth_token)] } else { vec![] @@ -217,9 +247,9 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. - #[instrument(skip(self))] - pub fn prepare_pgdata(&self) -> Result<()> { - let spec = &self.spec; + #[instrument(skip(self, compute_state))] + pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + let spec = &compute_state.spec; let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. @@ -228,18 +258,18 @@ impl ComputeNode { info!("starting safekeepers syncing"); let lsn = self - .sync_safekeepers() + .sync_safekeepers(compute_state.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")?; info!("safekeepers synced at LSN {}", lsn); info!( "getting basebackup@{} from pageserver {}", - lsn, &self.pageserver_connstr + lsn, &compute_state.pageserver_connstr ); - self.get_basebackup(&lsn).with_context(|| { + self.get_basebackup(compute_state, &lsn).with_context(|| { format!( "failed to get basebackup@{} from pageserver {}", - lsn, &self.pageserver_connstr + lsn, &compute_state.pageserver_connstr ) })?; @@ -252,13 +282,16 @@ impl ComputeNode { /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. #[instrument(skip(self))] - pub fn start_postgres(&self) -> Result { + pub fn start_postgres( + &self, + storage_auth_token: Option, + ) -> Result { let pgdata_path = Path::new(&self.pgdata); // Run postgres as a child process. let mut pg = Command::new(&self.pgbin) .args(["-D", &self.pgdata]) - .envs(if let Some(storage_auth_token) = &self.storage_auth_token { + .envs(if let Some(storage_auth_token) = &storage_auth_token { vec![("NEON_AUTH_TOKEN", storage_auth_token)] } else { vec![] @@ -271,8 +304,9 @@ impl ComputeNode { Ok(pg) } - #[instrument(skip(self))] - pub fn apply_config(&self) -> Result<()> { + /// Do initial configuration of the already started Postgres. + #[instrument(skip(self, compute_state))] + pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { // If connection fails, // it may be the old node with `zenith_admin` superuser. // @@ -303,19 +337,19 @@ impl ComputeNode { }; // Proceed with post-startup configuration. Note, that order of operations is important. - handle_roles(&self.spec, &mut client)?; - handle_databases(&self.spec, &mut client)?; - handle_role_deletions(self, &mut client)?; - handle_grants(self, &mut client)?; + handle_roles(&compute_state.spec, &mut client)?; + handle_databases(&compute_state.spec, &mut client)?; + handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?; + handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?; create_writability_check_data(&mut client)?; - handle_extensions(&self.spec, &mut client)?; + handle_extensions(&compute_state.spec, &mut client)?; // 'Close' connection drop(client); info!( "finished configuration of compute for project {}", - self.spec.cluster.cluster_id + compute_state.spec.cluster.cluster_id ); Ok(()) @@ -323,21 +357,22 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self) -> Result { + let compute_state = self.state.lock().unwrap().clone(); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - self.spec.cluster.cluster_id, - self.spec.operation_uuid.as_ref().unwrap(), - self.tenant, - self.timeline, + compute_state.spec.cluster.cluster_id, + compute_state.spec.operation_uuid.as_ref().unwrap(), + compute_state.tenant, + compute_state.timeline, ); - self.prepare_pgdata()?; + self.prepare_pgdata(&compute_state)?; let start_time = Utc::now(); - let pg = self.start_postgres()?; + let pg = self.start_postgres(compute_state.storage_auth_token.clone())?; - self.apply_config()?; + self.apply_config(&compute_state)?; let startup_end_time = Utc::now(); self.metrics.config_ms.store( diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 199e0f3bd0..8620b10636 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -3,12 +3,16 @@ use std::net::SocketAddr; use std::sync::Arc; use std::thread; -use crate::compute::ComputeNode; +use crate::compute::{ComputeNode, ComputeStatus}; +use crate::http::requests::ConfigurationRequest; +use crate::http::responses::{ComputeStatusResponse, GenericAPIError}; + use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use num_cpus; use serde_json; +use tokio::task; use tracing::{error, info}; use tracing_utils::http::OtelName; @@ -23,8 +27,10 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving /status GET request"); - let state = compute.state.read().unwrap(); - Response::new(Body::from(serde_json::to_string(&*state).unwrap())) + let state = compute.state.lock().unwrap(); + let status_response = ComputeStatusResponse::from(state.clone()); + + Response::new(Body::from(serde_json::to_string(&status_response).unwrap())) } // Startup metrics in JSON format. Keep /metrics reserved for a possible @@ -37,12 +43,29 @@ async fn routes(req: Request, compute: &Arc) -> Response { info!("serving /insights GET request"); + let status = compute.get_status(); + if status != ComputeStatus::Running { + let msg = format!("compute is not running, current status: {:?}", status); + error!(msg); + return Response::new(Body::from(msg)); + } + let insights = compute.collect_insights().await; Response::new(Body::from(insights)) } (&Method::POST, "/check_writability") => { info!("serving /check_writability POST request"); + let status = compute.get_status(); + if status != ComputeStatus::Running { + let msg = format!( + "invalid compute status for check_writability request: {:?}", + status + ); + error!(msg); + return Response::new(Body::from(msg)); + } + let res = crate::checker::check_writability(compute).await; match res { Ok(_) => Response::new(Body::from("true")), @@ -61,6 +84,23 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /configure POST request"); + match handle_configure_request(req, compute).await { + Ok(msg) => Response::new(Body::from(msg)), + Err((msg, code)) => { + error!("error handling /configure request: {msg}"); + render_json_error(&msg, code) + } + } + } + // Return the `404 Not Found` for any other routes. _ => { let mut not_found = Response::new(Body::from("404 Not Found")); @@ -70,6 +110,88 @@ async fn routes(req: Request, compute: &Arc) -> Response, + compute: &Arc, +) -> Result { + if !compute.live_config_allowed { + return Err(( + "live configuration is not allowed for this compute node".to_string(), + StatusCode::PRECONDITION_FAILED, + )); + } + + let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); + if let Ok(request) = serde_json::from_str::(&spec_raw) { + let spec = request.spec; + // XXX: wrap state update under lock in code blocks. Otherwise, + // we will try to `Send` `mut state` into the spawned thread + // bellow, which will cause error: + // ``` + // error: future cannot be sent between threads safely + // ``` + { + let mut state = compute.state.lock().unwrap(); + if state.status != ComputeStatus::Empty { + let msg = format!( + "invalid compute status for configuration request: {:?}", + state.status.clone() + ); + return Err((msg, StatusCode::PRECONDITION_FAILED)); + } + state.spec = spec; + state.status = ComputeStatus::ConfigurationPending; + compute.state_changed.notify_all(); + drop(state); + info!("set new spec and notified waiters"); + } + + // Spawn a blocking thread to wait for compute to become Running. + // This is needed to do not block the main pool of workers and + // be able to serve other requests while some particular request + // is waiting for compute to finish configuration. + let c = compute.clone(); + task::spawn_blocking(move || { + let mut state = c.state.lock().unwrap(); + while state.status != ComputeStatus::Running { + state = c.state_changed.wait(state).unwrap(); + info!( + "waiting for compute to become Running, current status: {:?}", + state.status + ); + + if state.status == ComputeStatus::Failed { + let err = state.error.clone().unwrap_or("unknown error".to_string()); + let msg = format!("compute configuration failed: {:?}", err); + return Err((msg, StatusCode::INTERNAL_SERVER_ERROR)); + } + } + + Ok(()) + }) + .await + .unwrap()?; + + // Return current compute state if everything went well. + let state = compute.state.lock().unwrap().clone(); + let status_response = ComputeStatusResponse::from(state); + Ok(serde_json::to_string(&status_response).unwrap()) + } else { + Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST)) + } +} + +fn render_json_error(e: &str, status: StatusCode) -> Response { + let error = GenericAPIError { + error: e.to_string(), + }; + Response::builder() + .status(status) + .body(Body::from(serde_json::to_string(&error).unwrap())) + .unwrap() +} + // Main Hyper HTTP server function that runs it and blocks waiting on it forever. #[tokio::main] async fn serve(state: Arc) { diff --git a/compute_tools/src/http/mod.rs b/compute_tools/src/http/mod.rs index e5fdf85eed..e54b4e3341 100644 --- a/compute_tools/src/http/mod.rs +++ b/compute_tools/src/http/mod.rs @@ -1 +1,3 @@ pub mod api; +pub mod requests; +pub mod responses; diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index 5c74dfd2d2..bdb09d4a6b 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -11,7 +11,7 @@ paths: get: tags: - Info - summary: Get compute node internal status + summary: Get compute node internal status. description: "" operationId: getComputeStatus responses: @@ -26,7 +26,7 @@ paths: get: tags: - Info - summary: Get compute node startup metrics in JSON format + summary: Get compute node startup metrics in JSON format. description: "" operationId: getComputeMetricsJSON responses: @@ -41,9 +41,9 @@ paths: get: tags: - Info - summary: Get current compute insights in JSON format + summary: Get current compute insights in JSON format. description: | - Note, that this doesn't include any historical data + Note, that this doesn't include any historical data. operationId: getComputeInsights responses: 200: @@ -56,12 +56,12 @@ paths: /info: get: tags: - - "info" - summary: Get info about the compute Pod/VM + - Info + summary: Get info about the compute pod / VM. description: "" operationId: getInfo responses: - "200": + 200: description: Info content: application/json: @@ -72,7 +72,7 @@ paths: post: tags: - Check - summary: Check that we can write new data on this compute + summary: Check that we can write new data on this compute. description: "" operationId: checkComputeWritability responses: @@ -82,9 +82,64 @@ paths: text/plain: schema: type: string - description: Error text or 'true' if check passed + description: Error text or 'true' if check passed. example: "true" + /configure: + post: + tags: + - Configure + summary: Perform compute node configuration. + description: | + This is a blocking API endpoint, i.e. it blocks waiting until + compute is finished configuration and is in `Running` state. + Optional non-blocking mode could be added later. + operationId: configureCompute + requestBody: + description: Configuration request. + required: true + content: + application/json: + schema: + type: object + required: + - spec + properties: + spec: + # XXX: I don't want to explain current spec in the OpenAPI format, + # as it could be changed really soon. Consider doing it later. + type: object + responses: + 200: + description: Compute configuration finished. + content: + application/json: + schema: + $ref: "#/components/schemas/ComputeState" + 400: + description: Provided spec is invalid. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + 412: + description: | + It's not possible to do live-configuration of the compute. + It's either in the wrong state, or compute doesn't use pull + mode of configuration. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + 500: + description: | + Compute configuration request was processed, but error + occurred. Compute will likely shutdown soon. + content: + application/json: + schema: + $ref: "#/components/schemas/GenericError" + components: securitySchemes: JWT: @@ -95,7 +150,7 @@ components: schemas: ComputeMetrics: type: object - description: Compute startup metrics + description: Compute startup metrics. required: - sync_safekeepers_ms - basebackup_ms @@ -113,7 +168,7 @@ components: Info: type: object - description: Information about VM/Pod + description: Information about VM/Pod. required: - num_cpus properties: @@ -130,17 +185,26 @@ components: $ref: '#/components/schemas/ComputeStatus' last_active: type: string - description: The last detected compute activity timestamp in UTC and RFC3339 format + description: The last detected compute activity timestamp in UTC and RFC3339 format. example: "2022-10-12T07:20:50.52Z" error: type: string - description: Text of the error during compute startup, if any + description: Text of the error during compute startup, if any. + example: "" + tenant: + type: string + description: Identifier of the current tenant served by compute node, if any. + example: c9269c359e9a199fad1ea0981246a78f + timeline: + type: string + description: Identifier of the current timeline served by compute node, if any. + example: ece7de74d4b8cbe5433a68ce4d1b97b4 ComputeInsights: type: object properties: pg_stat_statements: - description: Contains raw output from pg_stat_statements in JSON format + description: Contains raw output from pg_stat_statements in JSON format. type: array items: type: object @@ -151,6 +215,19 @@ components: - init - failed - running + example: running + + # + # Errors + # + + GenericError: + type: object + required: + - error + properties: + error: + type: string security: - JWT: [] diff --git a/compute_tools/src/http/requests.rs b/compute_tools/src/http/requests.rs new file mode 100644 index 0000000000..2e41c7aea4 --- /dev/null +++ b/compute_tools/src/http/requests.rs @@ -0,0 +1,11 @@ +use serde::Deserialize; + +use crate::spec::ComputeSpec; + +/// We now pass only `spec` in the configuration request, but later we can +/// extend it and something like `restart: bool` or something else. So put +/// `spec` into a struct initially to be more flexible in the future. +#[derive(Deserialize, Debug)] +pub struct ConfigurationRequest { + pub spec: ComputeSpec, +} diff --git a/compute_tools/src/http/responses.rs b/compute_tools/src/http/responses.rs new file mode 100644 index 0000000000..1ef4b380a9 --- /dev/null +++ b/compute_tools/src/http/responses.rs @@ -0,0 +1,40 @@ +use serde::{Serialize, Serializer}; + +use chrono::{DateTime, Utc}; + +use crate::compute::{ComputeState, ComputeStatus}; + +#[derive(Serialize, Debug)] +pub struct GenericAPIError { + pub error: String, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "snake_case")] +pub struct ComputeStatusResponse { + pub tenant: String, + pub timeline: String, + pub status: ComputeStatus, + #[serde(serialize_with = "rfc3339_serialize")] + pub last_active: DateTime, + pub error: Option, +} + +impl From for ComputeStatusResponse { + fn from(state: ComputeState) -> Self { + ComputeStatusResponse { + tenant: state.tenant, + timeline: state.timeline, + status: state.status, + last_active: state.last_active, + error: state.error, + } + } +} + +fn rfc3339_serialize(x: &DateTime, s: S) -> Result +where + S: Serializer, +{ + x.to_rfc3339().serialize(s) +} diff --git a/compute_tools/src/monitor.rs b/compute_tools/src/monitor.rs index 7c9878ffcf..a30b52aed4 100644 --- a/compute_tools/src/monitor.rs +++ b/compute_tools/src/monitor.rs @@ -46,7 +46,7 @@ fn watch_compute_activity(compute: &ComputeNode) { AND usename != 'cloud_admin';", // XXX: find a better way to filter other monitors? &[], ); - let mut last_active = compute.state.read().unwrap().last_active; + let mut last_active = compute.state.lock().unwrap().last_active; if let Ok(backs) = backends { let mut idle_backs: Vec> = vec![]; @@ -87,7 +87,7 @@ fn watch_compute_activity(compute: &ComputeNode) { } // Update the last activity in the shared state if we got a more recent one. - let mut state = compute.state.write().unwrap(); + let mut state = compute.state.lock().unwrap(); if last_active > state.last_active { state.last_active = last_active; debug!("set the last compute activity time to: {}", last_active); diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index 01b192b2de..38d1a6d777 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -17,7 +17,7 @@ const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // mil /// Rust representation of Postgres role info with only those fields /// that matter for us. -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Debug)] pub struct Role { pub name: PgIdent, pub encrypted_password: Option, @@ -26,7 +26,7 @@ pub struct Role { /// Rust representation of Postgres database info with only those fields /// that matter for us. -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Debug)] pub struct Database { pub name: PgIdent, pub owner: PgIdent, @@ -36,7 +36,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Debug)] pub struct GenericOption { pub name: String, pub value: Option, diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 9694ba9a88..b7f15a99d1 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -8,14 +8,13 @@ use postgres::{Client, NoTls}; use serde::Deserialize; use tracing::{info, info_span, instrument, span_enabled, warn, Level}; -use crate::compute::ComputeNode; use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Debug, Default)] pub struct ComputeSpec { pub format_version: f32, pub timestamp: String, @@ -31,7 +30,7 @@ pub struct ComputeSpec { /// Cluster state seen from the perspective of the external tools /// like Rails web console. -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Debug, Default)] pub struct Cluster { pub cluster_id: String, pub name: String, @@ -47,13 +46,36 @@ pub struct Cluster { /// - DROP ROLE /// - ALTER ROLE name RENAME TO new_name /// - ALTER DATABASE name RENAME TO new_name -#[derive(Clone, Deserialize)] +#[derive(Clone, Deserialize, Debug)] pub struct DeltaOp { pub action: String, pub name: PgIdent, pub new_name: Option, } +/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT` +/// env variable is set, it will be used for authorization. +pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result { + let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec"); + let jwt: String = match std::env::var("NEON_CONSOLE_JWT") { + Ok(v) => v, + Err(_) => "".to_string(), + }; + info!("getting spec from control plane: {}", cp_uri); + + // TODO: check the response. We should distinguish cases when it's + // - network error, then retry + // - no spec for compute yet, then wait + // - compute id is unknown or any other error, then bail out + let spec = reqwest::blocking::Client::new() + .get(cp_uri) + .header("Authorization", jwt) + .send()? + .json()?; + + Ok(spec) +} + /// It takes cluster specification and does the following: /// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file. /// - Update `pg_hba.conf` to allow external connections. @@ -226,8 +248,8 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Reassign all dependent objects and delete requested roles. #[instrument(skip_all)] -pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result<()> { - if let Some(ops) = &node.spec.delta_operations { +pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { + if let Some(ops) = &spec.delta_operations { // First, reassign all dependent objects to db owners. info!("reassigning dependent objects of to-be-deleted roles"); @@ -244,7 +266,7 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< // Check that role is still present in Postgres, as this could be a // restart with the same spec after role deletion. if op.action == "delete_role" && existing_roles.iter().any(|r| r.name == op.name) { - reassign_owned_objects(node, &op.name)?; + reassign_owned_objects(spec, connstr, &op.name)?; } } @@ -268,10 +290,10 @@ pub fn handle_role_deletions(node: &ComputeNode, client: &mut Client) -> Result< } // Reassign all owned objects in all databases to the owner of the database. -fn reassign_owned_objects(node: &ComputeNode, role_name: &PgIdent) -> Result<()> { - for db in &node.spec.cluster.databases { +fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { + for db in &spec.cluster.databases { if db.owner != *role_name { - let mut conf = Config::from_str(node.connstr.as_str())?; + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); let mut client = conf.connect(NoTls)?; @@ -416,9 +438,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. #[instrument(skip_all)] -pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { - let spec = &node.spec; - +pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { info!("cluster spec grants:"); // We now have a separate `web_access` role to connect to the database @@ -450,8 +470,8 @@ pub fn handle_grants(node: &ComputeNode, client: &mut Client) -> Result<()> { // Do some per-database access adjustments. We'd better do this at db creation time, // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants // atomically. - for db in &node.spec.cluster.databases { - let mut conf = Config::from_str(node.connstr.as_str())?; + for db in &spec.cluster.databases { + let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); let mut db_client = conf.connect(NoTls)?;