diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index bce860b56b..633e603f6b 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -45,12 +45,11 @@ use url::Url; use compute_api::responses::ComputeStatus; -use compute_tools::compute::{ComputeNode, ComputeState}; +use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; use compute_tools::params::*; -use compute_tools::pg_helpers::*; use compute_tools::spec::*; fn main() -> Result<()> { @@ -73,28 +72,24 @@ fn main() -> Result<()> { // Try to use just 'postgres' if no path is provided let pgbin = matches.get_one::("pgbin").unwrap(); - let mut spec = Default::default(); - let mut spec_set = false; + let mut spec = None; let mut live_config_allowed = false; match spec_json { // First, try to get cluster spec from the cli argument Some(json) => { - spec = serde_json::from_str(json)?; - spec_set = true; + spec = Some(serde_json::from_str(json)?); } None => { // Second, try to read it from the file if path is provided if let Some(sp) = spec_path { let path = Path::new(sp); let file = File::open(path)?; - spec = serde_json::from_reader(file)?; - spec_set = true; + spec = Some(serde_json::from_reader(file)?); } else if let Some(id) = compute_id { if let Some(cp_base) = control_plane_uri { live_config_allowed = true; if let Ok(s) = get_spec_from_control_plane(cp_base, id) { - spec = s; - spec_set = true; + spec = Some(s); } } else { panic!("must specify both --control-plane-uri and --compute-id or none"); @@ -109,8 +104,13 @@ fn main() -> Result<()> { }; let mut new_state = ComputeState::new(); - if spec_set { - new_state.spec = spec; + let spec_set; + if let Some(spec) = spec { + let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; + new_state.pspec = Some(pspec); + spec_set = true; + } else { + spec_set = false; } let compute_node = ComputeNode { start_time: Utc::now(), @@ -142,33 +142,10 @@ fn main() -> Result<()> { } } - // We got all we need, fill in the state. + // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); - let pageserver_connstr = state - .spec - .cluster - .settings - .find("neon.pageserver_connstring") - .expect("pageserver connstr should be provided"); - let storage_auth_token = state.spec.storage_auth_token.clone(); - let tenant = state - .spec - .cluster - .settings - .find("neon.tenant_id") - .expect("tenant id should be provided"); - let timeline = state - .spec - .cluster - .settings - .find("neon.timeline_id") - .expect("tenant id should be provided"); - let startup_tracing_context = state.spec.startup_tracing_context.clone(); - - state.pageserver_connstr = pageserver_connstr; - state.storage_auth_token = storage_auth_token; - state.tenant = tenant; - state.timeline = timeline; + let pspec = state.pspec.as_ref().expect("spec must be set"); + let startup_tracing_context = pspec.spec.startup_tracing_context.clone(); state.status = ComputeStatus::Init; compute.state_changed.notify_all(); drop(state); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 689aa6ef43..94ec2a785c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -69,12 +69,7 @@ pub struct ComputeState { /// Timestamp of the last Postgres activity pub last_active: DateTime, pub error: Option, - pub spec: ComputeSpec, - pub tenant: String, - pub timeline: String, - pub pageserver_connstr: String, - pub storage_auth_token: Option, - + pub pspec: Option, pub metrics: ComputeMetrics, } @@ -84,11 +79,7 @@ impl ComputeState { status: ComputeStatus::Empty, last_active: Utc::now(), error: None, - spec: ComputeSpec::default(), - tenant: String::new(), - timeline: String::new(), - pageserver_connstr: String::new(), - storage_auth_token: None, + pspec: None, metrics: ComputeMetrics::default(), } } @@ -100,6 +91,45 @@ impl Default for ComputeState { } } +#[derive(Clone, Debug)] +pub struct ParsedSpec { + pub spec: ComputeSpec, + pub tenant: String, + pub timeline: String, + pub pageserver_connstr: String, + pub storage_auth_token: Option, +} + +impl TryFrom for ParsedSpec { + type Error = String; + fn try_from(spec: ComputeSpec) -> Result { + let pageserver_connstr = spec + .cluster + .settings + .find("neon.pageserver_connstring") + .ok_or("pageserver connstr should be provided")?; + let storage_auth_token = spec.storage_auth_token.clone(); + let tenant = spec + .cluster + .settings + .find("neon.tenant_id") + .ok_or("tenant id should be provided")?; + let timeline = spec + .cluster + .settings + .find("neon.timeline_id") + .ok_or("tenant id should be provided")?; + + Ok(ParsedSpec { + spec, + pageserver_connstr, + storage_auth_token, + tenant, + timeline, + }) + } +} + impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); @@ -126,13 +156,14 @@ impl ComputeNode { // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip(self, compute_state))] fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> { + let spec = compute_state.pspec.as_ref().expect("spec must be set"); let start_time = Utc::now(); - let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?; + let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?; // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. - if let Some(storage_auth_token) = &compute_state.storage_auth_token { + if let Some(storage_auth_token) = &spec.storage_auth_token { info!("Got storage auth token from spec file"); config.password(storage_auth_token); } else { @@ -141,14 +172,8 @@ impl ComputeNode { let mut client = config.connect(NoTls)?; let basebackup_cmd = match lsn { - "0/0" => format!( - "basebackup {} {}", - &compute_state.tenant, &compute_state.timeline - ), // First start of the compute - _ => format!( - "basebackup {} {} {}", - &compute_state.tenant, &compute_state.timeline, lsn - ), + "0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute + _ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn), }; let copyreader = client.copy_out(basebackup_cmd.as_str())?; @@ -218,27 +243,27 @@ impl ComputeNode { /// safekeepers sync, basebackup, etc. #[instrument(skip(self, compute_state))] pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { - let spec = &compute_state.spec; + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; info!("starting safekeepers syncing"); let lsn = self - .sync_safekeepers(compute_state.storage_auth_token.clone()) + .sync_safekeepers(pspec.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")?; info!("safekeepers synced at LSN {}", lsn); info!( "getting basebackup@{} from pageserver {}", - lsn, &compute_state.pageserver_connstr + lsn, &pspec.pageserver_connstr ); self.get_basebackup(compute_state, &lsn).with_context(|| { format!( "failed to get basebackup@{} from pageserver {}", - lsn, &compute_state.pageserver_connstr + lsn, &pspec.pageserver_connstr ) })?; @@ -306,19 +331,20 @@ impl ComputeNode { }; // Proceed with post-startup configuration. Note, that order of operations is important. - handle_roles(&compute_state.spec, &mut client)?; - handle_databases(&compute_state.spec, &mut client)?; - handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?; - handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?; + let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; + handle_roles(spec, &mut client)?; + handle_databases(spec, &mut client)?; + handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; + handle_grants(spec, self.connstr.as_str(), &mut client)?; create_writability_check_data(&mut client)?; - handle_extensions(&compute_state.spec, &mut client)?; + handle_extensions(spec, &mut client)?; // 'Close' connection drop(client); info!( "finished configuration of compute for project {}", - compute_state.spec.cluster.cluster_id + spec.cluster.cluster_id ); Ok(()) @@ -327,19 +353,20 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self) -> Result { let compute_state = self.state.lock().unwrap().clone(); + let spec = compute_state.pspec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - compute_state.spec.cluster.cluster_id, - compute_state.spec.operation_uuid.as_ref().unwrap(), - compute_state.tenant, - compute_state.timeline, + spec.spec.cluster.cluster_id, + spec.spec.operation_uuid.as_ref().unwrap(), + spec.tenant, + spec.timeline, ); self.prepare_pgdata(&compute_state)?; let start_time = Utc::now(); - let pg = self.start_postgres(compute_state.storage_auth_token.clone())?; + let pg = self.start_postgres(spec.storage_auth_token.clone())?; self.apply_config(&compute_state)?; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index cea45dc596..2ef2d898e1 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::thread; -use crate::compute::{ComputeNode, ComputeState}; +use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_api::requests::ConfigurationRequest; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; @@ -18,8 +18,8 @@ use tracing_utils::http::OtelName; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { - tenant: state.tenant.clone(), - timeline: state.timeline.clone(), + tenant: state.pspec.as_ref().map(|pspec| pspec.tenant.clone()), + timeline: state.pspec.as_ref().map(|pspec| pspec.timeline.clone()), status: state.status, last_active: state.last_active, error: state.error.clone(), @@ -135,6 +135,12 @@ async fn handle_configure_request( let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); if let Ok(request) = serde_json::from_str::(&spec_raw) { let spec = request.spec; + + let parsed_spec = match ParsedSpec::try_from(spec) { + Ok(ps) => ps, + Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)), + }; + // XXX: wrap state update under lock in code blocks. Otherwise, // we will try to `Send` `mut state` into the spawned thread // bellow, which will cause error: @@ -150,7 +156,7 @@ async fn handle_configure_request( ); return Err((msg, StatusCode::PRECONDITION_FAILED)); } - state.spec = spec; + state.pspec = Some(parsed_spec); state.status = ComputeStatus::ConfigurationPending; compute.state_changed.notify_all(); drop(state); diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index 43289a5e3e..a28c6e8996 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -12,8 +12,8 @@ pub struct GenericAPIError { #[derive(Serialize, Debug)] #[serde(rename_all = "snake_case")] pub struct ComputeStatusResponse { - pub tenant: String, - pub timeline: String, + pub tenant: Option, + pub timeline: Option, pub status: ComputeStatus, #[serde(serialize_with = "rfc3339_serialize")] pub last_active: DateTime,