Refactor 'spec' in ComputeState.

Sometimes, it contained real values, sometimes just defaults if the
spec was not received yet. Make the state more clear by making it an
Option instead.

One consequence is that if some of the required settings like
neon.tenant_id are missing from the spec file sent to the /configure
endpoint, it is spotted earlier and you get an immediate HTTP error
response. Not that it matters very much, but it's nicer nevertheless.
This commit is contained in:
Heikki Linnakangas
2023-04-12 01:13:17 +03:00
committed by Heikki Linnakangas
parent 3c9f42a2e2
commit 6064a26963
4 changed files with 91 additions and 81 deletions

View File

@@ -45,12 +45,11 @@ use url::Url;
use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState};
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::pg_helpers::*;
use compute_tools::spec::*;
fn main() -> Result<()> {
@@ -73,28 +72,24 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let mut spec = Default::default();
let mut spec_set = false;
let mut spec = None;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
Some(json) => {
spec = serde_json::from_str(json)?;
spec_set = true;
spec = Some(serde_json::from_str(json)?);
}
None => {
// Second, try to read it from the file if path is provided
if let Some(sp) = spec_path {
let path = Path::new(sp);
let file = File::open(path)?;
spec = serde_json::from_reader(file)?;
spec_set = true;
spec = Some(serde_json::from_reader(file)?);
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
live_config_allowed = true;
if let Ok(s) = get_spec_from_control_plane(cp_base, id) {
spec = s;
spec_set = true;
spec = Some(s);
}
} else {
panic!("must specify both --control-plane-uri and --compute-id or none");
@@ -109,8 +104,13 @@ fn main() -> Result<()> {
};
let mut new_state = ComputeState::new();
if spec_set {
new_state.spec = spec;
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
spec_set = true;
} else {
spec_set = false;
}
let compute_node = ComputeNode {
start_time: Utc::now(),
@@ -142,33 +142,10 @@ fn main() -> Result<()> {
}
}
// We got all we need, fill in the state.
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
let pageserver_connstr = state
.spec
.cluster
.settings
.find("neon.pageserver_connstring")
.expect("pageserver connstr should be provided");
let storage_auth_token = state.spec.storage_auth_token.clone();
let tenant = state
.spec
.cluster
.settings
.find("neon.tenant_id")
.expect("tenant id should be provided");
let timeline = state
.spec
.cluster
.settings
.find("neon.timeline_id")
.expect("tenant id should be provided");
let startup_tracing_context = state.spec.startup_tracing_context.clone();
state.pageserver_connstr = pageserver_connstr;
state.storage_auth_token = storage_auth_token;
state.tenant = tenant;
state.timeline = timeline;
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);

View File

@@ -69,12 +69,7 @@ pub struct ComputeState {
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
pub error: Option<String>,
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
pub pspec: Option<ParsedSpec>,
pub metrics: ComputeMetrics,
}
@@ -84,11 +79,7 @@ impl ComputeState {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
spec: ComputeSpec::default(),
tenant: String::new(),
timeline: String::new(),
pageserver_connstr: String::new(),
storage_auth_token: None,
pspec: None,
metrics: ComputeMetrics::default(),
}
}
@@ -100,6 +91,45 @@ impl Default for ComputeState {
}
}
#[derive(Clone, Debug)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
let pageserver_connstr = spec
.cluster
.settings
.find("neon.pageserver_connstring")
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")?;
let timeline = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("tenant id should be provided")?;
Ok(ParsedSpec {
spec,
pageserver_connstr,
storage_auth_token,
tenant,
timeline,
})
}
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
@@ -126,13 +156,14 @@ impl ComputeNode {
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&compute_state.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
if let Some(storage_auth_token) = &compute_state.storage_auth_token {
if let Some(storage_auth_token) = &spec.storage_auth_token {
info!("Got storage auth token from spec file");
config.password(storage_auth_token);
} else {
@@ -141,14 +172,8 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!(
"basebackup {} {}",
&compute_state.tenant, &compute_state.timeline
), // First start of the compute
_ => format!(
"basebackup {} {} {}",
&compute_state.tenant, &compute_state.timeline, lsn
),
"0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -218,27 +243,27 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let spec = &compute_state.spec;
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(compute_state.storage_auth_token.clone())
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
info!(
"getting basebackup@{} from pageserver {}",
lsn, &compute_state.pageserver_connstr
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, &lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &compute_state.pageserver_connstr
lsn, &pspec.pageserver_connstr
)
})?;
@@ -306,19 +331,20 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
handle_roles(&compute_state.spec, &mut client)?;
handle_databases(&compute_state.spec, &mut client)?;
handle_role_deletions(&compute_state.spec, self.connstr.as_str(), &mut client)?;
handle_grants(&compute_state.spec, self.connstr.as_str(), &mut client)?;
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
create_writability_check_data(&mut client)?;
handle_extensions(&compute_state.spec, &mut client)?;
handle_extensions(spec, &mut client)?;
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
compute_state.spec.cluster.cluster_id
spec.cluster.cluster_id
);
Ok(())
@@ -327,19 +353,20 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let spec = compute_state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
compute_state.spec.cluster.cluster_id,
compute_state.spec.operation_uuid.as_ref().unwrap(),
compute_state.tenant,
compute_state.timeline,
spec.spec.cluster.cluster_id,
spec.spec.operation_uuid.as_ref().unwrap(),
spec.tenant,
spec.timeline,
);
self.prepare_pgdata(&compute_state)?;
let start_time = Utc::now();
let pg = self.start_postgres(compute_state.storage_auth_token.clone())?;
let pg = self.start_postgres(spec.storage_auth_token.clone())?;
self.apply_config(&compute_state)?;

View File

@@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ComputeState};
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
@@ -18,8 +18,8 @@ use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state.tenant.clone(),
timeline: state.timeline.clone(),
tenant: state.pspec.as_ref().map(|pspec| pspec.tenant.clone()),
timeline: state.pspec.as_ref().map(|pspec| pspec.timeline.clone()),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
@@ -135,6 +135,12 @@ async fn handle_configure_request(
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
let spec = request.spec;
let parsed_spec = match ParsedSpec::try_from(spec) {
Ok(ps) => ps,
Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)),
};
// XXX: wrap state update under lock in code blocks. Otherwise,
// we will try to `Send` `mut state` into the spawned thread
// bellow, which will cause error:
@@ -150,7 +156,7 @@ async fn handle_configure_request(
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.spec = spec;
state.pspec = Some(parsed_spec);
state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
drop(state);

View File

@@ -12,8 +12,8 @@ pub struct GenericAPIError {
#[derive(Serialize, Debug)]
#[serde(rename_all = "snake_case")]
pub struct ComputeStatusResponse {
pub tenant: String,
pub timeline: String,
pub tenant: Option<String>,
pub timeline: Option<String>,
pub status: ComputeStatus,
#[serde(serialize_with = "rfc3339_serialize")]
pub last_active: DateTime<Utc>,