From 6064a26963ec7811989da0f944904622997d964f Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 12 Apr 2023 01:13:17 +0300 Subject: [PATCH] Refactor 'spec' in ComputeState. Sometimes, it contained real values, sometimes just defaults if the spec was not received yet. Make the state more clear by making it an Option instead. One consequence is that if some of the required settings like neon.tenant_id are missing from the spec file sent to the /configure endpoint, it is spotted earlier and you get an immediate HTTP error response. Not that it matters very much, but it's nicer nevertheless. --- compute_tools/src/bin/compute_ctl.rs | 53 ++++---------- compute_tools/src/compute.rs | 101 +++++++++++++++++---------- compute_tools/src/http/api.rs | 14 ++-- libs/compute_api/src/responses.rs | 4 +- 4 files changed, 91 insertions(+), 81 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index bce860b56b..633e603f6b 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -45,12 +45,11 @@ use url::Url; use compute_api::responses::ComputeStatus; -use compute_tools::compute::{ComputeNode, ComputeState}; +use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; -use compute_tools::pg_helpers::*; use compute_tools::spec::*; fn main() -> Result<()> { @@ -73,28 +72,24 @@ fn main() -> Result<()> { // Try to use just 'postgres' if no path is provided let pgbin = matches.get_one::("pgbin").unwrap(); - let mut spec = Default::default(); - let mut spec_set = false; + let mut spec = None; let mut live_config_allowed = false; match spec_json { // First, try to get cluster spec from the cli argument Some(json) => { - spec = serde_json::from_str(json)?; - spec_set = true; + spec = Some(serde_json::from_str(json)?); } None => { // Second, try to read it from the file if path is provided if let Some(sp) = spec_path { let path = Path::new(sp); let file = File::open(path)?; - spec = serde_json::from_reader(file)?; - spec_set = true; + spec = Some(serde_json::from_reader(file)?); } else if let Some(id) = compute_id { if let Some(cp_base) = control_plane_uri { live_config_allowed = true; if let Ok(s) = get_spec_from_control_plane(cp_base, id) { - spec = s; - spec_set = true; + spec = Some(s); } } else { panic!("must specify both --control-plane-uri and --compute-id or none"); @@ -109,8 +104,13 @@ fn main() -> Result<()> { }; let mut new_state = ComputeState::new(); - if spec_set { - new_state.spec = spec; + let spec_set; + if let Some(spec) = spec { + let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; + new_state.pspec = Some(pspec); + spec_set = true; + } else { + spec_set = false; } let compute_node = ComputeNode { start_time: Utc::now(), @@ -142,33 +142,10 @@ fn main() -> Result<()> { } } - // We got all we need, fill in the state. + // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); - let pageserver_connstr = state - .spec - .cluster - .settings - .find("neon.pageserver_connstring") - .expect("pageserver connstr should be provided"); - let storage_auth_token = state.spec.storage_auth_token.clone(); - let tenant = state - .spec - .cluster - .settings - .find("neon.tenant_id") - .expect("tenant id should be provided"); - let timeline = state - .spec - .cluster - .settings - .find("neon.timeline_id") - .expect("tenant id should be provided"); - let startup_tracing_context = state.spec.startup_tracing_context.clone(); - - state.pageserver_connstr = pageserver_connstr; - state.storage_auth_token = storage_auth_token; - state.tenant = tenant; - state.timeline = timeline; + let pspec = state.pspec.as_ref().expect("spec must be set"); + let startup_tracing_context = pspec.spec.startup_tracing_context.clone(); state.status = ComputeStatus::Init; compute.state_changed.notify_all(); drop(state); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 689aa6ef43..94ec2a785c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -69,12 +69,7 @@ pub struct ComputeState { /// Timestamp of the last Postgres activity pub last_active: DateTime, pub error: Option, - pub spec: ComputeSpec, - pub tenant: String, - pub timeline: String, - pub pageserver_connstr: String, - pub storage_auth_token: Option, - + pub pspec: Option, pub metrics: ComputeMetrics, } @@ -84,11 +79,7 @@ impl ComputeState { status: ComputeStatus::Empty, last_active: Utc::now(), error: None, - spec: ComputeSpec::default(), - tenant: String::new(), - timeline: String::new(), - pageserver_connstr: String::new(), - storage_auth_token: None, + pspec: None, metrics: ComputeMetrics::default(), } } @@ -100,6 +91,45 @@ impl Default for ComputeState { } } +#[derive(Clone, Debug)] +pub struct ParsedSpec { + pub spec: ComputeSpec, + pub tenant: String, + pub timeline: String, + pub pageserver_connstr: String, + pub storage_auth_token: Option, +} + +impl TryFrom for ParsedSpec { + type Error = String; + fn try_from(spec: ComputeSpec) -> Result { + let pageserver_connstr = spec + .cluster + .settings + .find("neon.pageserver_connstring") + .ok_or("pageserver connstr should be provided")?; + let storage_auth_token = spec.storage_auth_token.clone(); + let tenant = spec + .cluster + .settings + .find("neon.tenant_id") + .ok_or("tenant id should be provided")?; + let timeline = spec + .cluster + .settings + .find("neon.timeline_id") + .ok_or("tenant id should be provided")?; + + Ok(ParsedSpec { + spec, + pageserver_connstr, + storage_auth_token, + tenant, + timeline, + }) + } +} + impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); @@ -126,13 +156,14 @@ impl ComputeNode { // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip(self, compute_state))] fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> { + let spec = compute_state.pspec.as_ref().expect("spec must be set"); let start_time = Utc::now(); - let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?; + let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?; // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. - if let Some(storage_auth_token) = &compute_state.storage_auth_token { + if let Some(storage_auth_token) = &spec.storage_auth_token { info!("Got storage auth token from spec file"); config.password(storage_auth_token); } else { @@ -141,14 +172,8 @@ impl ComputeNode { let mut client = config.connect(NoTls)?; let basebackup_cmd = match lsn { - "0/0" => format!( - "basebackup {} {}", - &compute_state.tenant, &compute_state.timeline - ), // First start of the compute - _ => format!( - "basebackup {} {} {}", - &compute_state.tenant, &compute_state.timeline, lsn - ), + "0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute + _ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn), }; let copyreader = client.copy_out(basebackup_cmd.as_str())?; @@ -218,27 +243,27 @@ impl ComputeNode { /// safekeepers sync, basebackup, etc. #[instrument(skip(self, compute_state))] pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { - let spec = &compute_state.spec; + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; info!("starting safekeepers syncing"); let lsn = self - .sync_safekeepers(compute_state.storage_auth_token.clone()) + .sync_safekeepers(pspec.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")?; info!("safekeepers synced at LSN {}", lsn); info!( "getting basebackup@{} from pageserver {}", - lsn, &compute_state.pageserver_connstr + lsn, &pspec.pageserver_connstr ); self.get_basebackup(compute_state, &lsn).with_context(|| { format!( "failed to get basebackup@{} from pageserver {}", - lsn, &compute_state.pageserver_connstr + lsn, &pspec.pageserver_connstr ) })?; @@ -306,19 +331,20 @@ impl ComputeNode { }; // Proceed with post-startup configuration. Note, that order of operations is important. - handle_roles(&compute_state.spec, &mut client)?; - handle_databases(&compute_state.spec, &mut client)?; - handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?; - handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?; + let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; + handle_roles(spec, &mut client)?; + handle_databases(spec, &mut client)?; + handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; + handle_grants(spec, self.connstr.as_str(), &mut client)?; create_writability_check_data(&mut client)?; - handle_extensions(&compute_state.spec, &mut client)?; + handle_extensions(spec, &mut client)?; // 'Close' connection drop(client); info!( "finished configuration of compute for project {}", - compute_state.spec.cluster.cluster_id + spec.cluster.cluster_id ); Ok(()) @@ -327,19 +353,20 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self) -> Result { let compute_state = self.state.lock().unwrap().clone(); + let spec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - compute_state.spec.cluster.cluster_id, - compute_state.spec.operation_uuid.as_ref().unwrap(), - compute_state.tenant, - compute_state.timeline, + spec.spec.cluster.cluster_id, + spec.spec.operation_uuid.as_ref().unwrap(), + spec.tenant, + spec.timeline, ); self.prepare_pgdata(&compute_state)?; let start_time = Utc::now(); - let pg = self.start_postgres(compute_state.storage_auth_token.clone())?; + let pg = self.start_postgres(spec.storage_auth_token.clone())?; self.apply_config(&compute_state)?; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index cea45dc596..2ef2d898e1 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::thread; -use crate::compute::{ComputeNode, ComputeState}; +use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_api::requests::ConfigurationRequest; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; @@ -18,8 +18,8 @@ use tracing_utils::http::OtelName; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { - tenant: state.tenant.clone(), - timeline: state.timeline.clone(), + tenant: state.pspec.as_ref().map(|pspec| pspec.tenant.clone()), + timeline: state.pspec.as_ref().map(|pspec| pspec.timeline.clone()), status: state.status, last_active: state.last_active, error: state.error.clone(), @@ -135,6 +135,12 @@ async fn handle_configure_request( let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); if let Ok(request) = serde_json::from_str::(&spec_raw) { let spec = request.spec; + + let parsed_spec = match ParsedSpec::try_from(spec) { + Ok(ps) => ps, + Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)), + }; + // XXX: wrap state update under lock in code blocks. Otherwise, // we will try to `Send` `mut state` into the spawned thread // bellow, which will cause error: @@ -150,7 +156,7 @@ async fn handle_configure_request( ); return Err((msg, StatusCode::PRECONDITION_FAILED)); } - state.spec = spec; + state.pspec = Some(parsed_spec); state.status = ComputeStatus::ConfigurationPending; compute.state_changed.notify_all(); drop(state); diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 43289a5e3e..a28c6e8996 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -12,8 +12,8 @@ pub struct GenericAPIError { #[derive(Serialize, Debug)] #[serde(rename_all = "snake_case")] pub struct ComputeStatusResponse { - pub tenant: String, - pub timeline: String, + pub tenant: Option, + pub timeline: Option, pub status: ComputeStatus, #[serde(serialize_with = "rfc3339_serialize")] pub last_active: DateTime,