Introduce new V2 spec file format.

This includes code that can still read the old V1 format, and convert
it to the new ComputeSpecV2 struct.

For better test coverage of the upgrade function, update the sample V1
cluster spec file in the tests to match more closely what the control
plane actually generates.

The intention is to change the real web console to also use the V2 format,
and then remove the support for V1 format altogether.
This commit is contained in:
Heikki Linnakangas
2023-04-03 18:03:58 +03:00
parent 51f3128657
commit b1fb59ef6e
16 changed files with 529 additions and 502 deletions

View File

@@ -44,8 +44,9 @@ use tracing::{error, info};
use url::Url;
use compute_api::responses::ComputeStatus;
use compute_api::spec::{ComputeSpecAnyVersion, ComputeSpecV2};
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::compute::{ComputeNode, ComputeState};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -75,7 +76,7 @@ fn main() -> Result<()> {
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
let mut spec = None;
let mut spec: Option<ComputeSpecAnyVersion> = None;
let mut live_config_allowed = false;
match spec_json {
// First, try to get cluster spec from the cli argument
@@ -109,8 +110,9 @@ fn main() -> Result<()> {
let mut new_state = ComputeState::new();
let spec_set;
if let Some(spec) = spec {
let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?;
new_state.pspec = Some(pspec);
// Parse the spec file, upgrading it from older format if necessary
let spec: ComputeSpecV2 = ComputeSpecV2::try_from(spec)?;
new_state.spec = Some(spec);
spec_set = true;
} else {
spec_set = false;
@@ -148,8 +150,8 @@ fn main() -> Result<()> {
// We got all we need, update the state.
let mut state = compute.state.lock().unwrap();
let pspec = state.pspec.as_ref().expect("spec must be set");
let startup_tracing_context = pspec.spec.startup_tracing_context.clone();
let spec = state.spec.as_ref().expect("spec must be set");
let startup_tracing_context = spec.startup_tracing_context.clone();
state.status = ComputeStatus::Init;
compute.state_changed.notify_all();
drop(state);

View File

@@ -26,11 +26,10 @@ use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tokio_postgres;
use tracing::{info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
use compute_api::spec::ComputeSpecV2;
use crate::checker::create_writability_check_data;
use crate::config;
@@ -71,7 +70,7 @@ pub struct ComputeState {
/// Timestamp of the last Postgres activity
pub last_active: DateTime<Utc>,
pub error: Option<String>,
pub pspec: Option<ParsedSpec>,
pub spec: Option<ComputeSpecV2>,
pub metrics: ComputeMetrics,
}
@@ -81,7 +80,7 @@ impl ComputeState {
status: ComputeStatus::Empty,
last_active: Utc::now(),
error: None,
pspec: None,
spec: None,
metrics: ComputeMetrics::default(),
}
}
@@ -93,64 +92,6 @@ impl Default for ComputeState {
}
}
#[derive(Clone, Debug)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
}
impl TryFrom<ComputeSpec> for ParsedSpec {
type Error = String;
fn try_from(spec: ComputeSpec) -> Result<Self, String> {
// Extract the options from the spec file that are needed to connect to
// the storage system.
//
// For backwards-compatibility, the top-level fields in the spec file
// may be empty. In that case, we need to dig them from the GUCs in the
// cluster.settings field.
let pageserver_connstr = spec
.pageserver_connstring
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
} else {
spec.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?
};
let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id {
timeline_id
} else {
spec.cluster
.settings
.find("neon.timeline_id")
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?
};
let lsn = spec.lsn;
Ok(ParsedSpec {
spec,
pageserver_connstr,
storage_auth_token,
tenant_id,
timeline_id,
lsn,
})
}
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
@@ -177,10 +118,10 @@ impl ComputeNode {
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.spec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?;
let mut config = postgres::Config::from_str(&spec.pageserver_connstring)?;
// Use the storage auth token from the config file, if given.
// Note: this overrides any password set in the connection string.
@@ -264,21 +205,21 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.spec.as_ref().expect("spec must be set");
let pgdata_path = Path::new(&self.pgdata);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?;
let lsn = if let Some(lsn) = pspec.lsn {
let lsn = if let Some(lsn) = spec.lsn {
// Read-only node, anchored at 'lsn'
lsn
} else {
// Primary that continues to write at end of the timeline
info!("starting safekeepers syncing");
let last_lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.sync_safekeepers(spec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", last_lsn);
last_lsn
@@ -286,12 +227,12 @@ impl ComputeNode {
info!(
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
lsn, &spec.pageserver_connstring
);
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
lsn, &spec.pageserver_connstring
)
})?;
@@ -359,7 +300,7 @@ impl ComputeNode {
};
// Proceed with post-startup configuration. Note, that order of operations is important.
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
let spec = &compute_state.spec.as_ref().expect("spec must be set");
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
@@ -370,10 +311,7 @@ impl ComputeNode {
// 'Close' connection
drop(client);
info!(
"finished configuration of compute for project {}",
spec.cluster.cluster_id
);
info!("finished configuration of compute");
Ok(())
}
@@ -381,11 +319,11 @@ impl ComputeNode {
#[instrument(skip(self))]
pub fn start_compute(&self) -> Result<std::process::Child> {
let compute_state = self.state.lock().unwrap().clone();
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = compute_state.spec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
spec.spec.cluster.cluster_id,
spec.spec.operation_uuid.as_deref().unwrap_or("None"),
spec.project_id.as_deref().unwrap_or("None"),
spec.operation_uuid.as_deref().unwrap_or("None"),
spec.tenant_id,
spec.timeline_id,
);

View File

@@ -6,8 +6,7 @@ use std::path::Path;
use anyhow::Result;
use crate::pg_helpers::escape_conf_value;
use crate::pg_helpers::PgOptionsSerialize;
use compute_api::spec::ComputeSpec;
use compute_api::spec::ComputeSpecV2;
/// Check that `line` is inside a text file and put it there if it is not.
/// Create file if it doesn't exist.
@@ -33,24 +32,37 @@ pub fn line_in_file(path: &Path, line: &str) -> Result<bool> {
}
/// Create or completely rewrite configuration file specified by `path`
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
pub fn write_postgres_conf(path: &Path, spec: &ComputeSpecV2) -> Result<()> {
// File::create() destroys the file content if it exists.
let mut file = File::create(path)?;
// Write the postgresql.conf content from the spec file as is.
if let Some(conf) = &spec.cluster.postgresql_conf {
if let Some(conf) = &spec.postgresql_conf {
writeln!(file, "{}", conf)?;
}
// Add options for connecting to storage
writeln!(file, "# Neon storage settings")?;
if let Some(s) = &spec.pageserver_connstring {
writeln!(
file,
"neon.pageserver_connstring='{}'",
escape_conf_value(s)
)?;
// Append any extra options from the spec file
if let Some(settings) = &spec.settings {
writeln!(file, "\n# Extra settings from spec document")?;
for setting in settings {
if let Some(value) = &setting.value {
let escaped_value: String = value.replace('\'', "''").replace('\\', "\\\\");
writeln!(file, "{} = '{}'", setting.name, escaped_value)?;
} else {
// If there is no value, then just append the line verbatim
writeln!(file, "{}", setting.name)?;
}
}
}
// Append options for connecting to storage
writeln!(file, "\n# Neon storage settings")?;
writeln!(
file,
"neon.pageserver_connstring='{}'",
escape_conf_value(&spec.pageserver_connstring)
)?;
if !spec.safekeeper_connstrings.is_empty() {
writeln!(
file,
@@ -58,27 +70,16 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> {
escape_conf_value(&spec.safekeeper_connstrings.join(","))
)?;
}
if let Some(s) = &spec.tenant_id {
writeln!(
file,
"neon.tenant_id='{}'",
escape_conf_value(&s.to_string())
)?;
}
if let Some(s) = &spec.timeline_id {
writeln!(
file,
"neon.timeline_id='{}'",
escape_conf_value(&s.to_string())
)?;
}
// If there are any extra options in the 'settings' field, append those
if spec.cluster.settings.is_some() {
writeln!(file, "# Managed by compute_ctl: begin")?;
writeln!(file, "{}", spec.cluster.settings.as_pg_settings())?;
writeln!(file, "# Managed by compute_ctl: end")?;
}
writeln!(
file,
"neon.tenant_id='{}'",
escape_conf_value(&spec.tenant_id.to_string())
)?;
writeln!(
file,
"neon.timeline_id='{}'",
escape_conf_value(&spec.timeline_id.to_string())
)?;
Ok(())
}

View File

@@ -3,9 +3,10 @@ use std::net::SocketAddr;
use std::sync::Arc;
use std::thread;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use crate::compute::{ComputeNode, ComputeState};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use compute_api::spec::ComputeSpecV2;
use anyhow::Result;
use hyper::service::{make_service_fn, service_fn};
@@ -18,14 +19,8 @@ use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state
.pspec
.as_ref()
.map(|pspec| pspec.tenant_id.to_string()),
timeline: state
.pspec
.as_ref()
.map(|pspec| pspec.timeline_id.to_string()),
tenant: state.spec.as_ref().map(|spec| spec.tenant_id.to_string()),
timeline: state.spec.as_ref().map(|spec| spec.timeline_id.to_string()),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),
@@ -140,11 +135,9 @@ async fn handle_configure_request(
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
let spec = request.spec;
let parsed_spec = match ParsedSpec::try_from(spec) {
let specv2 = match ComputeSpecV2::try_from(request.spec) {
Ok(ps) => ps,
Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)),
Err(err) => return Err((err.to_string(), StatusCode::PRECONDITION_FAILED)),
};
// XXX: wrap state update under lock in code blocks. Otherwise,
@@ -162,7 +155,7 @@ async fn handle_configure_request(
);
return Err((msg, StatusCode::PRECONDITION_FAILED));
}
state.pspec = Some(parsed_spec);
state.spec = Some(specv2);
state.status = ComputeStatus::ConfigurationPending;
compute.state_changed.notify_all();
drop(state);

View File

@@ -29,7 +29,6 @@ pub fn escape_conf_value(s: &str) -> String {
trait GenericOptionExt {
fn to_pg_option(&self) -> String;
fn to_pg_setting(&self) -> String;
}
impl GenericOptionExt for GenericOption {
@@ -44,23 +43,10 @@ impl GenericOptionExt for GenericOption {
self.name.to_owned()
}
}
/// Represent `GenericOption` as configuration option.
fn to_pg_setting(&self) -> String {
if let Some(val) = &self.value {
match self.vartype.as_ref() {
"string" => format!("{} = '{}'", self.name, escape_conf_value(val)),
_ => format!("{} = {}", self.name, val),
}
} else {
self.name.to_owned()
}
}
}
pub trait PgOptionsSerialize {
fn as_pg_options(&self) -> String;
fn as_pg_settings(&self) -> String;
}
impl PgOptionsSerialize for GenericOptions {
@@ -76,20 +62,6 @@ impl PgOptionsSerialize for GenericOptions {
"".to_string()
}
}
/// Serialize an optional collection of `GenericOption`'s to
/// `postgresql.conf` compatible format.
fn as_pg_settings(&self) -> String {
if let Some(ops) = &self {
ops.iter()
.map(|op| op.to_pg_setting())
.collect::<Vec<String>>()
.join("\n")
+ "\n" // newline after last setting
} else {
"".to_string()
}
}
}
pub trait GenericOptionsSearch {

View File

@@ -1,3 +1,4 @@
//! Functions to reconciliate Postgres cluster with the spec file
use std::path::Path;
use std::str::FromStr;
@@ -10,11 +11,14 @@ use crate::config;
use crate::params::PG_HBA_ALL_MD5;
use crate::pg_helpers::*;
use compute_api::spec::{ComputeSpec, Database, PgIdent, Role};
use compute_api::spec::{ComputeSpecAnyVersion, ComputeSpecV2, Database, PgIdent, Role};
/// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT`
/// env variable is set, it will be used for authorization.
pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<ComputeSpec> {
pub fn get_spec_from_control_plane(
base_uri: &str,
compute_id: &str,
) -> Result<ComputeSpecAnyVersion> {
let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
@@ -26,19 +30,18 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result<C
// - network error, then retry
// - no spec for compute yet, then wait
// - compute id is unknown or any other error, then bail out
let spec = reqwest::blocking::Client::new()
let json = reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?;
Ok(spec)
Ok(ComputeSpecAnyVersion(json))
}
/// It takes cluster specification and does the following:
/// - Serialize cluster config and put it into `postgresql.conf` completely rewriting the file.
/// - Update `pg_hba.conf` to allow external connections.
pub fn handle_configuration(spec: &ComputeSpec, pgdata_path: &Path) -> Result<()> {
pub fn handle_configuration(spec: &ComputeSpecV2, pgdata_path: &Path) -> Result<()> {
// File `postgresql.conf` is no longer included into `basebackup`, so just
// always write all config into it creating new file.
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?;
@@ -66,7 +69,7 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
/// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update.
#[instrument(skip_all)]
pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
pub fn handle_roles(spec: &ComputeSpecV2, client: &mut Client) -> Result<()> {
let mut xact = client.transaction()?;
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
@@ -122,7 +125,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let existing_roles: Vec<Role> = get_existing_roles(&mut xact)?;
info!("cluster spec roles:");
for role in &spec.cluster.roles {
for role in &spec.roles {
let name = &role.name;
// XXX: with a limited number of roles it is fine, but consider making it a HashMap
let pg_role = existing_roles.iter().find(|r| r.name == *name);
@@ -207,7 +210,11 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Reassign all dependent objects and delete requested roles.
#[instrument(skip_all)]
pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
pub fn handle_role_deletions(
spec: &ComputeSpecV2,
connstr: &str,
client: &mut Client,
) -> Result<()> {
if let Some(ops) = &spec.delta_operations {
// First, reassign all dependent objects to db owners.
info!("reassigning dependent objects of to-be-deleted roles");
@@ -249,8 +256,8 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli
}
// Reassign all owned objects in all databases to the owner of the database.
fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.cluster.databases {
fn reassign_owned_objects(spec: &ComputeSpecV2, connstr: &str, role_name: &PgIdent) -> Result<()> {
for db in &spec.databases {
if db.owner != *role_name {
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
@@ -284,7 +291,7 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent
/// atomicity should be enough here due to the order of operations and various checks,
/// which together provide us idempotency.
#[instrument(skip_all)]
pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
pub fn handle_databases(spec: &ComputeSpecV2, client: &mut Client) -> Result<()> {
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
// Print a list of existing Postgres databases (only in debug mode)
@@ -332,7 +339,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let existing_dbs: Vec<Database> = get_existing_dbs(client)?;
info!("cluster spec databases:");
for db in &spec.cluster.databases {
for db in &spec.databases {
let name = &db.name;
// XXX: with a limited number of databases it is fine, but consider making it a HashMap
@@ -397,7 +404,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
pub fn handle_grants(spec: &ComputeSpecV2, connstr: &str, client: &mut Client) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
@@ -407,13 +414,12 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) ->
// XXX: later we should stop messing with Postgres ACL in such horrible
// ways.
let roles = spec
.cluster
.roles
.iter()
.map(|r| r.name.pg_quote())
.collect::<Vec<_>>();
for db in &spec.cluster.databases {
for db in &spec.databases {
let dbname = &db.name;
let query: String = format!(
@@ -429,7 +435,7 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) ->
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.
for db in &spec.cluster.databases {
for db in &spec.databases {
let mut conf = Config::from_str(connstr)?;
conf.dbname(&db.name);
@@ -499,14 +505,11 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) ->
/// Create required system extensions
#[instrument(skip_all)]
pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") {
if libs.contains("pg_stat_statements") {
// Create extension only if this compute really needs it
let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements";
info!("creating system extensions with query: {}", query);
client.simple_query(query)?;
}
pub fn handle_extensions(spec: &ComputeSpecV2, client: &mut Client) -> Result<()> {
for extension in &spec.extensions {
let query = format!("CREATE EXTENSION IF NOT EXISTS {}", extension.pg_quote());
info!("creating system extensions with query: {}", query);
client.simple_query(&query)?;
}
Ok(())