From f0359fdbc0ea0675fe6020b4504a6be33e0a5944 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Fri, 31 Mar 2023 19:38:49 +0200 Subject: [PATCH] Allow starting `compute_ctl` without spec With this commit one can start compute with something like ```shell cargo run --bin compute_ctl -- -i no-compute \ -p http://localhost:9095 \ -D compute_pgdata \ -C "postgresql://cloud_admin@127.0.0.1:5434/postgres" \ -b ./pg_install/v15/bin/postgres ``` and it will hang waiting for spec. Then send one spec ```shell curl -d "$(cat ./compute-spec.json)" http://localhost:3080/spec ``` Postgres will be started and configured. Then reconfigure it with ```shell curl -d "$(cat ./compute-spec-new.json)" http://localhost:3080/spec ``` Most of safeguards and comments are added. Some polishing especially around HTTP API is still needed. --- compute_tools/src/bin/compute_ctl.rs | 124 ++++++++++++++++----------- compute_tools/src/compute.rs | 106 ++++++++++++++--------- compute_tools/src/http/api.rs | 25 +++++- compute_tools/src/spec.rs | 27 +++++- 4 files changed, 187 insertions(+), 95 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 4c8f94afd0..bb73d8e086 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -64,7 +64,7 @@ fn main() -> Result<()> { let connstr = matches .get_one::("connstr") .expect("Postgres connection string is required"); - let spec = matches.get_one::("spec"); + let spec_json = matches.get_one::("spec"); let spec_path = matches.get_one::("spec-path"); let compute_id = matches.get_one::("compute-id"); @@ -73,40 +73,81 @@ fn main() -> Result<()> { // Try to use just 'postgres' if no path is provided let pgbin = matches.get_one::("pgbin").unwrap(); - let spec: ComputeSpec = match spec { + let mut spec = Default::default(); + let mut spec_set = false; + let mut live_config_allowed = false; + match spec_json { // First, try to get cluster spec from the cli argument - Some(json) => serde_json::from_str(json)?, + Some(json) => { + spec = serde_json::from_str(json)?; + spec_set = true; + } None => { // Second, try to read it from the file if path is provided if let Some(sp) = spec_path { let path = Path::new(sp); let file = File::open(path)?; - serde_json::from_reader(file)? + spec = serde_json::from_reader(file)?; + spec_set = true; } else if let Some(id) = compute_id { if let Some(cp_base) = control_plane_uri { - let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec"); - let jwt: String = match std::env::var("NEON_CONSOLE_JWT") { - Ok(v) => v, - Err(_) => "".to_string(), - }; - - reqwest::blocking::Client::new() - .get(cp_uri) - .header("Authorization", jwt) - .send()? - .json()? + live_config_allowed = true; + if let Ok(s) = get_spec_from_control_plane(cp_base, id) { + spec = s; + spec_set = true; + } } else { - panic!( - "must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"", - control_plane_uri, compute_id - ); + panic!("must specify both --control-plane-uri and --compute-id or none"); } } else { - panic!("compute spec should be provided via --spec or --spec-path argument"); + panic!( + "compute spec should be provided by one of the following ways: \ + --spec OR --spec-path OR --control-plane-uri and --compute-id" + ); } } }; + // We have one configurator thread and async http server, so generally we + // have single consumer - multiple producers pattern here. That's why we use + // `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of + // producers is limited to one due to code logic, but we still need to + // pass `Sender` to several threads. + // + // Next, we use async `hyper` + `tokio` http server, but all the other code + // is completely synchronous. So we need to send data from async to sync, + // that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`. + // It doesn't make much sense to rewrite all code to async now, but we can + // consider doing this in the future. Unbound should not sound scary here, + // as again, only one unprocessed spec could be sent to channel at any + // given moment. + let (spec_tx, mut spec_rx) = mpsc::unbounded_channel::(); + + let compute_state = ComputeNode { + start_time: Utc::now(), + connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, + pgdata: pgdata.to_string(), + pgbin: pgbin.to_string(), + live_config_allowed, + metrics: ComputeMetrics::default(), + state: RwLock::new(ComputeState::new()), + }; + let compute = Arc::new(compute_state); + + // Launch http service first, so we were able to serve control-plane + // requests, while configuration is still in progress. + let _http_handle = + launch_http_server(&compute, spec_tx).expect("cannot launch http endpoint thread"); + + if !spec_set { + info!("no compute spec provided, waiting"); + if let Some(s) = spec_rx.blocking_recv() { + spec = s; + } else { + panic!("no compute spec received"); + } + } + // Extract OpenTelemetry context for the startup actions from the spec, and // attach it to the current tracing context. // @@ -147,38 +188,17 @@ fn main() -> Result<()> { .find("neon.timeline_id") .expect("tenant id should be provided"); - let compute_state = ComputeNode { - start_time: Utc::now(), - connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, - pgdata: pgdata.to_string(), - pgbin: pgbin.to_string(), - spec, - tenant, - timeline, - pageserver_connstr, - storage_auth_token, - metrics: ComputeMetrics::default(), - state: RwLock::new(ComputeState::new()), - }; - let compute = Arc::new(compute_state); + // We got all we need, fill in the state. + let mut state = compute.state.write().unwrap(); + state.spec = spec; + state.pageserver_connstr = pageserver_connstr; + state.storage_auth_token = storage_auth_token; + state.tenant = tenant; + state.timeline = timeline; + state.status = ComputeStatus::Init; + drop(state); - // We have one configurator thread and async http server, so generally we - // single consumer - multiple producers pattern here. That's why we use - // `mpsc` channel, not `tokio::sync::watch`. Actually, concurrency of - // producers is limited to one due to code logic, but we still need to - // pass `Sender` to several threads. - // - // Next, we use async `hyper` + `tokio` http server, but all the other code - // is completely synchronous. So we need to send data from async to sync, - // that's why we use `mpsc::unbounded_channel` here, not `mpsc::channel`. - // It doesn't make much sense to rewrite all code to async now, but we can - // consider doing this in the future. - let (spec_tx, spec_rx) = mpsc::unbounded_channel::(); - - // Launch service threads first, so we were able to serve availability - // requests, while configuration is still in progress. - let _http_handle = - launch_http_server(&compute, spec_tx).expect("cannot launch http endpoint thread"); + // Launch remaining service threads let _monitor_handle = launch_monitor(&compute).expect("cannot launch compute monitor thread"); let _configurator_handle = launch_configurator(&compute, spec_rx).expect("cannot launch configurator thread"); @@ -280,7 +300,7 @@ fn cli() -> clap::Command { Arg::new("control-plane-uri") .short('p') .long("control-plane-uri") - .value_name("CONTROL_PLANE"), + .value_name("CONTROL_PLANE_API_BASE_URI"), ) } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index dfb60e6aa0..4dd162821c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -41,12 +41,15 @@ pub struct ComputeNode { pub connstr: url::Url, pub pgdata: String, pub pgbin: String, - pub spec: ComputeSpec, - pub tenant: String, - pub timeline: String, - pub pageserver_connstr: String, - pub storage_auth_token: Option, pub metrics: ComputeMetrics, + // We only allow live re- / configuration of the compute node if + // it uses 'pull model', i.e. it can go to control-plane and fetch + // the latest configuration. Otherwise, there could be a case: + // - we start compute with some spec provided as argument + // - we push new spec and it does reconfiguration + // - but then something happens and compute pod / VM is destroyed, + // so k8s controller starts it again with the **old** spec + pub live_config_allowed: bool, /// Volatile part of the `ComputeNode` so should be used under `RwLock` /// to allow HTTP API server to serve status requests, while configuration /// is in progress. @@ -60,7 +63,7 @@ where x.to_rfc3339().serialize(s) } -#[derive(Serialize)] +#[derive(Serialize, Clone)] #[serde(rename_all = "snake_case")] pub struct ComputeState { pub status: ComputeStatus, @@ -68,14 +71,27 @@ pub struct ComputeState { #[serde(serialize_with = "rfc3339_serialize")] pub last_active: DateTime, pub error: Option, + #[serde(skip_serializing)] + pub spec: ComputeSpec, + pub tenant: String, + pub timeline: String, + #[serde(skip_serializing)] + pub pageserver_connstr: String, + #[serde(skip_serializing)] + pub storage_auth_token: Option, } impl ComputeState { pub fn new() -> Self { Self { - status: ComputeStatus::Init, + status: ComputeStatus::WaitingSpec, last_active: Utc::now(), error: None, + spec: ComputeSpec::default(), + tenant: String::new(), + timeline: String::new(), + pageserver_connstr: String::new(), + storage_auth_token: None, } } } @@ -136,15 +152,15 @@ impl ComputeNode { // Get basebackup from the libpq connection to pageserver using `connstr` and // unarchive it to `pgdata` directory overriding all its previous content. - #[instrument(skip(self))] - fn get_basebackup(&self, lsn: &str) -> Result<()> { + #[instrument(skip(self, compute_state))] + fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> { let start_time = Utc::now(); - let mut config = postgres::Config::from_str(&self.pageserver_connstr)?; + let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?; // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. - if let Some(storage_auth_token) = &self.storage_auth_token { + if let Some(storage_auth_token) = &compute_state.storage_auth_token { info!("Got storage auth token from spec file"); config.password(storage_auth_token); } else { @@ -153,8 +169,14 @@ impl ComputeNode { let mut client = config.connect(NoTls)?; let basebackup_cmd = match lsn { - "0/0" => format!("basebackup {} {}", &self.tenant, &self.timeline), // First start of the compute - _ => format!("basebackup {} {} {}", &self.tenant, &self.timeline, lsn), + "0/0" => format!( + "basebackup {} {}", + &compute_state.tenant, &compute_state.timeline + ), // First start of the compute + _ => format!( + "basebackup {} {} {}", + &compute_state.tenant, &compute_state.timeline, lsn + ), }; let copyreader = client.copy_out(basebackup_cmd.as_str())?; @@ -181,14 +203,14 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. - #[instrument(skip(self))] - fn sync_safekeepers(&self) -> Result { + #[instrument(skip(self, storage_auth_token))] + fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); let sync_handle = Command::new(&self.pgbin) .args(["--sync-safekeepers"]) .env("PGDATA", &self.pgdata) // we cannot use -D in this mode - .envs(if let Some(storage_auth_token) = &self.storage_auth_token { + .envs(if let Some(storage_auth_token) = &storage_auth_token { vec![("NEON_AUTH_TOKEN", storage_auth_token)] } else { vec![] @@ -229,9 +251,9 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. - #[instrument(skip(self))] - pub fn prepare_pgdata(&self) -> Result<()> { - let spec = &self.spec; + #[instrument(skip(self, compute_state))] + pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + let spec = &compute_state.spec; let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. @@ -240,18 +262,18 @@ impl ComputeNode { info!("starting safekeepers syncing"); let lsn = self - .sync_safekeepers() + .sync_safekeepers(compute_state.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")?; info!("safekeepers synced at LSN {}", lsn); info!( "getting basebackup@{} from pageserver {}", - lsn, &self.pageserver_connstr + lsn, &compute_state.pageserver_connstr ); - self.get_basebackup(&lsn).with_context(|| { + self.get_basebackup(compute_state, &lsn).with_context(|| { format!( "failed to get basebackup@{} from pageserver {}", - lsn, &self.pageserver_connstr + lsn, &compute_state.pageserver_connstr ) })?; @@ -264,13 +286,16 @@ impl ComputeNode { /// Start Postgres as a child process and manage DBs/roles. /// After that this will hang waiting on the postmaster process to exit. #[instrument(skip(self))] - pub fn start_postgres(&self) -> Result { + pub fn start_postgres( + &self, + storage_auth_token: Option, + ) -> Result { let pgdata_path = Path::new(&self.pgdata); // Run postgres as a child process. let mut pg = Command::new(&self.pgbin) .args(["-D", &self.pgdata]) - .envs(if let Some(storage_auth_token) = &self.storage_auth_token { + .envs(if let Some(storage_auth_token) = &storage_auth_token { vec![("NEON_AUTH_TOKEN", storage_auth_token)] } else { vec![] @@ -284,8 +309,8 @@ impl ComputeNode { } /// Do initial configuration of the already started Postgres. - #[instrument(skip(self))] - pub fn apply_config(&self) -> Result<()> { + #[instrument(skip(self, compute_state))] + pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> { // If connection fails, // it may be the old node with `zenith_admin` superuser. // @@ -316,19 +341,19 @@ impl ComputeNode { }; // Proceed with post-startup configuration. Note, that order of operations is important. - handle_roles(&self.spec, &mut client)?; - handle_databases(&self.spec, &mut client)?; - handle_role_deletions(&self.spec, self.connstr.as_str(), &mut client)?; - handle_grants(&self.spec, self.connstr.as_str(), &mut client)?; + handle_roles(&compute_state.spec, &mut client)?; + handle_databases(&compute_state.spec, &mut client)?; + handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?; + handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?; create_writability_check_data(&mut client)?; - handle_extensions(&self.spec, &mut client)?; + handle_extensions(&compute_state.spec, &mut client)?; // 'Close' connection drop(client); info!( "finished configuration of compute for project {}", - self.spec.cluster.cluster_id + compute_state.spec.cluster.cluster_id ); Ok(()) @@ -376,21 +401,22 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self) -> Result { + let compute_state = self.state.read().unwrap().clone(); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - self.spec.cluster.cluster_id, - self.spec.operation_uuid.as_ref().unwrap(), - self.tenant, - self.timeline, + compute_state.spec.cluster.cluster_id, + compute_state.spec.operation_uuid.as_ref().unwrap(), + compute_state.tenant, + compute_state.timeline, ); - self.prepare_pgdata()?; + self.prepare_pgdata(&compute_state)?; let start_time = Utc::now(); - let pg = self.start_postgres()?; + let pg = self.start_postgres(compute_state.storage_auth_token.clone())?; - self.apply_config()?; + self.apply_config(&compute_state)?; let startup_end_time = Utc::now(); self.metrics.config_ms.store( diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 52da762cac..ca6e18ffde 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -44,12 +44,29 @@ async fn routes( // Collect Postgres current usage insights (&Method::GET, "/insights") => { info!("serving /insights GET request"); + let status = compute.get_status(); + if status != ComputeStatus::Running { + let msg = format!("compute is not running, current status: {:?}", status); + error!(msg); + return Response::new(Body::from(msg)); + } + let insights = compute.collect_insights().await; Response::new(Body::from(insights)) } (&Method::POST, "/check_writability") => { info!("serving /check_writability POST request"); + let status = compute.get_status(); + if status != ComputeStatus::Running { + let msg = format!( + "invalid compute status for check_writability request: {:?}", + status + ); + error!(msg); + return Response::new(Body::from(msg)); + } + let res = crate::checker::check_writability(compute).await; match res { Ok(_) => Response::new(Body::from("true")), @@ -76,9 +93,15 @@ async fn routes( // watch compute state after reconfiguration request and to clean // restart in case of errors. // - // TODO: Errors should be in JSON format + // TODO: Errors should be in JSON format with proper status codes. (&Method::POST, "/spec") => { info!("serving /spec POST request"); + if !compute.live_config_allowed { + let msg = "live reconfiguration is not allowed for this compute node"; + error!(msg); + return Response::new(Body::from(msg)); + } + let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); if let Ok(spec) = serde_json::from_str::(&spec_raw) { diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 1bbbe717fc..b7f15a99d1 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -14,7 +14,7 @@ use crate::pg_helpers::*; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. -#[derive(Clone, Deserialize, Debug)] +#[derive(Clone, Deserialize, Debug, Default)] pub struct ComputeSpec { pub format_version: f32, pub timestamp: String, @@ -30,7 +30,7 @@ pub struct ComputeSpec { /// Cluster state seen from the perspective of the external tools /// like Rails web console. -#[derive(Clone, Deserialize, Debug)] +#[derive(Clone, Deserialize, Debug, Default)] pub struct Cluster { pub cluster_id: String, pub name: String, @@ -53,6 +53,29 @@ pub struct DeltaOp { pub new_name: Option, } +/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT` +/// env variable is set, it will be used for authorization. +pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result { + let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec"); + let jwt: String = match std::env::var("NEON_CONSOLE_JWT") { + Ok(v) => v, + Err(_) => "".to_string(), + }; + info!("getting spec from control plane: {}", cp_uri); + + // TODO: check the response. We should distinguish cases when it's + // - network error, then retry + // - no spec for compute yet, then wait + // - compute id is unknown or any other error, then bail out + let spec = reqwest::blocking::Client::new() + .get(cp_uri) + .header("Authorization", jwt) + .send()? + .json()?; + + Ok(spec) +} + /// It takes cluster specification and does the following: /// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file. /// - Update `pg_hba.conf` to allow external connections.