diff --git a/Cargo.lock b/Cargo.lock index c78d3e75f5..cedc62faf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -922,6 +922,7 @@ name = "control_plane" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap 4.1.4", "comfy-table", "compute_api", diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 198aa94661..58ae2ff1d5 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -44,8 +44,9 @@ use tracing::{error, info}; use url::Url; use compute_api::responses::ComputeStatus; +use compute_api::spec::{ComputeSpecAnyVersion, ComputeSpecV2}; -use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec}; +use compute_tools::compute::{ComputeNode, ComputeState}; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -75,7 +76,7 @@ fn main() -> Result<()> { // Try to use just 'postgres' if no path is provided let pgbin = matches.get_one::("pgbin").unwrap(); - let mut spec = None; + let mut spec: Option = None; let mut live_config_allowed = false; match spec_json { // First, try to get cluster spec from the cli argument @@ -109,8 +110,9 @@ fn main() -> Result<()> { let mut new_state = ComputeState::new(); let spec_set; if let Some(spec) = spec { - let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - new_state.pspec = Some(pspec); + // Parse the spec file, upgrading it from older format if necessary + let spec: ComputeSpecV2 = ComputeSpecV2::try_from(spec)?; + new_state.spec = Some(spec); spec_set = true; } else { spec_set = false; @@ -148,8 +150,8 @@ fn main() -> Result<()> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); - let pspec = state.pspec.as_ref().expect("spec must be set"); - let startup_tracing_context = pspec.spec.startup_tracing_context.clone(); + let spec = state.spec.as_ref().expect("spec must be set"); + let startup_tracing_context = spec.startup_tracing_context.clone(); state.status = ComputeStatus::Init; compute.state_changed.notify_all(); drop(state); diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 9a171da4e5..41e835ef8c 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -26,11 +26,10 @@ use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; use tokio_postgres; use tracing::{info, instrument, warn}; -use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; -use compute_api::spec::ComputeSpec; +use compute_api::spec::ComputeSpecV2; use crate::checker::create_writability_check_data; use crate::config; @@ -71,7 +70,7 @@ pub struct ComputeState { /// Timestamp of the last Postgres activity pub last_active: DateTime, pub error: Option, - pub pspec: Option, + pub spec: Option, pub metrics: ComputeMetrics, } @@ -81,7 +80,7 @@ impl ComputeState { status: ComputeStatus::Empty, last_active: Utc::now(), error: None, - pspec: None, + spec: None, metrics: ComputeMetrics::default(), } } @@ -93,64 +92,6 @@ impl Default for ComputeState { } } -#[derive(Clone, Debug)] -pub struct ParsedSpec { - pub spec: ComputeSpec, - pub tenant_id: TenantId, - pub timeline_id: TimelineId, - pub lsn: Option, - pub pageserver_connstr: String, - pub storage_auth_token: Option, -} - -impl TryFrom for ParsedSpec { - type Error = String; - fn try_from(spec: ComputeSpec) -> Result { - // Extract the options from the spec file that are needed to connect to - // the storage system. - // - // For backwards-compatibility, the top-level fields in the spec file - // may be empty. In that case, we need to dig them from the GUCs in the - // cluster.settings field. - let pageserver_connstr = spec - .pageserver_connstring - .clone() - .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring")) - .ok_or("pageserver connstr should be provided")?; - let storage_auth_token = spec.storage_auth_token.clone(); - let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id { - tenant_id - } else { - spec.cluster - .settings - .find("neon.tenant_id") - .ok_or("tenant id should be provided") - .map(|s| TenantId::from_str(&s))? - .or(Err("invalid tenant id"))? - }; - let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id { - timeline_id - } else { - spec.cluster - .settings - .find("neon.timeline_id") - .ok_or("timeline id should be provided") - .map(|s| TimelineId::from_str(&s))? - .or(Err("invalid timeline id"))? - }; - let lsn = spec.lsn; - - Ok(ParsedSpec { - spec, - pageserver_connstr, - storage_auth_token, - tenant_id, - timeline_id, - lsn, - }) - } -} - impl ComputeNode { pub fn set_status(&self, status: ComputeStatus) { let mut state = self.state.lock().unwrap(); @@ -177,10 +118,10 @@ impl ComputeNode { // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip(self, compute_state))] fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { - let spec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = compute_state.spec.as_ref().expect("spec must be set"); let start_time = Utc::now(); - let mut config = postgres::Config::from_str(&spec.pageserver_connstr)?; + let mut config = postgres::Config::from_str(&spec.pageserver_connstring)?; // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. @@ -264,21 +205,21 @@ impl ComputeNode { /// safekeepers sync, basebackup, etc. #[instrument(skip(self, compute_state))] pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = compute_state.spec.as_ref().expect("spec must be set"); let pgdata_path = Path::new(&self.pgdata); // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; - config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; + config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &spec)?; - let lsn = if let Some(lsn) = pspec.lsn { + let lsn = if let Some(lsn) = spec.lsn { // Read-only node, anchored at 'lsn' lsn } else { // Primary that continues to write at end of the timeline info!("starting safekeepers syncing"); let last_lsn = self - .sync_safekeepers(pspec.storage_auth_token.clone()) + .sync_safekeepers(spec.storage_auth_token.clone()) .with_context(|| "failed to sync safekeepers")?; info!("safekeepers synced at LSN {}", last_lsn); last_lsn @@ -286,12 +227,12 @@ impl ComputeNode { info!( "getting basebackup@{} from pageserver {}", - lsn, &pspec.pageserver_connstr + lsn, &spec.pageserver_connstring ); self.get_basebackup(compute_state, lsn).with_context(|| { format!( "failed to get basebackup@{} from pageserver {}", - lsn, &pspec.pageserver_connstr + lsn, &spec.pageserver_connstring ) })?; @@ -359,7 +300,7 @@ impl ComputeNode { }; // Proceed with post-startup configuration. Note, that order of operations is important. - let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec; + let spec = &compute_state.spec.as_ref().expect("spec must be set"); handle_roles(spec, &mut client)?; handle_databases(spec, &mut client)?; handle_role_deletions(spec, self.connstr.as_str(), &mut client)?; @@ -370,10 +311,7 @@ impl ComputeNode { // 'Close' connection drop(client); - info!( - "finished configuration of compute for project {}", - spec.cluster.cluster_id - ); + info!("finished configuration of compute"); Ok(()) } @@ -381,11 +319,11 @@ impl ComputeNode { #[instrument(skip(self))] pub fn start_compute(&self) -> Result { let compute_state = self.state.lock().unwrap().clone(); - let spec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = compute_state.spec.as_ref().expect("spec must be set"); info!( "starting compute for project {}, operation {}, tenant {}, timeline {}", - spec.spec.cluster.cluster_id, - spec.spec.operation_uuid.as_deref().unwrap_or("None"), + spec.project_id.as_deref().unwrap_or("None"), + spec.operation_uuid.as_deref().unwrap_or("None"), spec.tenant_id, spec.timeline_id, ); diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index fcd41287fd..67fe38fb6b 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -6,8 +6,7 @@ use std::path::Path; use anyhow::Result; use crate::pg_helpers::escape_conf_value; -use crate::pg_helpers::PgOptionsSerialize; -use compute_api::spec::ComputeSpec; +use compute_api::spec::ComputeSpecV2; /// Check that `line` is inside a text file and put it there if it is not. /// Create file if it doesn't exist. @@ -33,24 +32,37 @@ pub fn line_in_file(path: &Path, line: &str) -> Result { } /// Create or completely rewrite configuration file specified by `path` -pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { +pub fn write_postgres_conf(path: &Path, spec: &ComputeSpecV2) -> Result<()> { // File::create() destroys the file content if it exists. let mut file = File::create(path)?; // Write the postgresql.conf content from the spec file as is. - if let Some(conf) = &spec.cluster.postgresql_conf { + if let Some(conf) = &spec.postgresql_conf { writeln!(file, "{}", conf)?; } - // Add options for connecting to storage - writeln!(file, "# Neon storage settings")?; - if let Some(s) = &spec.pageserver_connstring { - writeln!( - file, - "neon.pageserver_connstring='{}'", - escape_conf_value(s) - )?; + // Append any extra options from the spec file + if let Some(settings) = &spec.settings { + writeln!(file, "\n# Extra settings from spec document")?; + + for setting in settings { + if let Some(value) = &setting.value { + let escaped_value: String = value.replace('\'', "''").replace('\\', "\\\\"); + writeln!(file, "{} = '{}'", setting.name, escaped_value)?; + } else { + // If there is no value, then just append the line verbatim + writeln!(file, "{}", setting.name)?; + } + } } + + // Append options for connecting to storage + writeln!(file, "\n# Neon storage settings")?; + writeln!( + file, + "neon.pageserver_connstring='{}'", + escape_conf_value(&spec.pageserver_connstring) + )?; if !spec.safekeeper_connstrings.is_empty() { writeln!( file, @@ -58,27 +70,16 @@ pub fn write_postgres_conf(path: &Path, spec: &ComputeSpec) -> Result<()> { escape_conf_value(&spec.safekeeper_connstrings.join(",")) )?; } - if let Some(s) = &spec.tenant_id { - writeln!( - file, - "neon.tenant_id='{}'", - escape_conf_value(&s.to_string()) - )?; - } - if let Some(s) = &spec.timeline_id { - writeln!( - file, - "neon.timeline_id='{}'", - escape_conf_value(&s.to_string()) - )?; - } - - // If there are any extra options in the 'settings' field, append those - if spec.cluster.settings.is_some() { - writeln!(file, "# Managed by compute_ctl: begin")?; - writeln!(file, "{}", spec.cluster.settings.as_pg_settings())?; - writeln!(file, "# Managed by compute_ctl: end")?; - } + writeln!( + file, + "neon.tenant_id='{}'", + escape_conf_value(&spec.tenant_id.to_string()) + )?; + writeln!( + file, + "neon.timeline_id='{}'", + escape_conf_value(&spec.timeline_id.to_string()) + )?; Ok(()) } diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index ba1eccec18..c958b8dfb6 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -3,9 +3,10 @@ use std::net::SocketAddr; use std::sync::Arc; use std::thread; -use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; +use crate::compute::{ComputeNode, ComputeState}; use compute_api::requests::ConfigurationRequest; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; +use compute_api::spec::ComputeSpecV2; use anyhow::Result; use hyper::service::{make_service_fn, service_fn}; @@ -18,14 +19,8 @@ use tracing_utils::http::OtelName; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { - tenant: state - .pspec - .as_ref() - .map(|pspec| pspec.tenant_id.to_string()), - timeline: state - .pspec - .as_ref() - .map(|pspec| pspec.timeline_id.to_string()), + tenant: state.spec.as_ref().map(|spec| spec.tenant_id.to_string()), + timeline: state.spec.as_ref().map(|spec| spec.timeline_id.to_string()), status: state.status, last_active: state.last_active, error: state.error.clone(), @@ -140,11 +135,9 @@ async fn handle_configure_request( let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap(); if let Ok(request) = serde_json::from_str::(&spec_raw) { - let spec = request.spec; - - let parsed_spec = match ParsedSpec::try_from(spec) { + let specv2 = match ComputeSpecV2::try_from(request.spec) { Ok(ps) => ps, - Err(msg) => return Err((msg, StatusCode::PRECONDITION_FAILED)), + Err(err) => return Err((err.to_string(), StatusCode::PRECONDITION_FAILED)), }; // XXX: wrap state update under lock in code blocks. Otherwise, @@ -162,7 +155,7 @@ async fn handle_configure_request( ); return Err((msg, StatusCode::PRECONDITION_FAILED)); } - state.pspec = Some(parsed_spec); + state.spec = Some(specv2); state.status = ComputeStatus::ConfigurationPending; compute.state_changed.notify_all(); drop(state); diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index b93e887f10..95c89603ea 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -29,7 +29,6 @@ pub fn escape_conf_value(s: &str) -> String { trait GenericOptionExt { fn to_pg_option(&self) -> String; - fn to_pg_setting(&self) -> String; } impl GenericOptionExt for GenericOption { @@ -44,23 +43,10 @@ impl GenericOptionExt for GenericOption { self.name.to_owned() } } - - /// Represent `GenericOption` as configuration option. - fn to_pg_setting(&self) -> String { - if let Some(val) = &self.value { - match self.vartype.as_ref() { - "string" => format!("{} = '{}'", self.name, escape_conf_value(val)), - _ => format!("{} = {}", self.name, val), - } - } else { - self.name.to_owned() - } - } } pub trait PgOptionsSerialize { fn as_pg_options(&self) -> String; - fn as_pg_settings(&self) -> String; } impl PgOptionsSerialize for GenericOptions { @@ -76,20 +62,6 @@ impl PgOptionsSerialize for GenericOptions { "".to_string() } } - - /// Serialize an optional collection of `GenericOption`'s to - /// `postgresql.conf` compatible format. - fn as_pg_settings(&self) -> String { - if let Some(ops) = &self { - ops.iter() - .map(|op| op.to_pg_setting()) - .collect::>() - .join("\n") - + "\n" // newline after last setting - } else { - "".to_string() - } - } } pub trait GenericOptionsSearch { diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 2350113c39..d7907bc938 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,3 +1,4 @@ +//! Functions to reconciliate Postgres cluster with the spec file use std::path::Path; use std::str::FromStr; @@ -10,11 +11,14 @@ use crate::config; use crate::params::PG_HBA_ALL_MD5; use crate::pg_helpers::*; -use compute_api::spec::{ComputeSpec, Database, PgIdent, Role}; +use compute_api::spec::{ComputeSpecAnyVersion, ComputeSpecV2, Database, PgIdent, Role}; /// Request spec from the control-plane by compute_id. If `NEON_CONSOLE_JWT` /// env variable is set, it will be used for authorization. -pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result { +pub fn get_spec_from_control_plane( + base_uri: &str, + compute_id: &str, +) -> Result { let cp_uri = format!("{base_uri}/management/api/v2/computes/{compute_id}/spec"); let jwt: String = match std::env::var("NEON_CONSOLE_JWT") { Ok(v) => v, @@ -26,19 +30,18 @@ pub fn get_spec_from_control_plane(base_uri: &str, compute_id: &str) -> Result Result<()> { +pub fn handle_configuration(spec: &ComputeSpecV2, pgdata_path: &Path) -> Result<()> { // File `postgresql.conf` is no longer included into `basebackup`, so just // always write all config into it creating new file. config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), spec)?; @@ -66,7 +69,7 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { /// Given a cluster spec json and open transaction it handles roles creation, /// deletion and update. #[instrument(skip_all)] -pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { +pub fn handle_roles(spec: &ComputeSpecV2, client: &mut Client) -> Result<()> { let mut xact = client.transaction()?; let existing_roles: Vec = get_existing_roles(&mut xact)?; @@ -122,7 +125,7 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let existing_roles: Vec = get_existing_roles(&mut xact)?; info!("cluster spec roles:"); - for role in &spec.cluster.roles { + for role in &spec.roles { let name = &role.name; // XXX: with a limited number of roles it is fine, but consider making it a HashMap let pg_role = existing_roles.iter().find(|r| r.name == *name); @@ -207,7 +210,11 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Reassign all dependent objects and delete requested roles. #[instrument(skip_all)] -pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { +pub fn handle_role_deletions( + spec: &ComputeSpecV2, + connstr: &str, + client: &mut Client, +) -> Result<()> { if let Some(ops) = &spec.delta_operations { // First, reassign all dependent objects to db owners. info!("reassigning dependent objects of to-be-deleted roles"); @@ -249,8 +256,8 @@ pub fn handle_role_deletions(spec: &ComputeSpec, connstr: &str, client: &mut Cli } // Reassign all owned objects in all databases to the owner of the database. -fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent) -> Result<()> { - for db in &spec.cluster.databases { +fn reassign_owned_objects(spec: &ComputeSpecV2, connstr: &str, role_name: &PgIdent) -> Result<()> { + for db in &spec.databases { if db.owner != *role_name { let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); @@ -284,7 +291,7 @@ fn reassign_owned_objects(spec: &ComputeSpec, connstr: &str, role_name: &PgIdent /// atomicity should be enough here due to the order of operations and various checks, /// which together provide us idempotency. #[instrument(skip_all)] -pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { +pub fn handle_databases(spec: &ComputeSpecV2, client: &mut Client) -> Result<()> { let existing_dbs: Vec = get_existing_dbs(client)?; // Print a list of existing Postgres databases (only in debug mode) @@ -332,7 +339,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { let existing_dbs: Vec = get_existing_dbs(client)?; info!("cluster spec databases:"); - for db in &spec.cluster.databases { + for db in &spec.databases { let name = &db.name; // XXX: with a limited number of databases it is fine, but consider making it a HashMap @@ -397,7 +404,7 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> { /// Grant CREATE ON DATABASE to the database owner and do some other alters and grants /// to allow users creating trusted extensions and re-creating `public` schema, for example. #[instrument(skip_all)] -pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> { +pub fn handle_grants(spec: &ComputeSpecV2, connstr: &str, client: &mut Client) -> Result<()> { info!("cluster spec grants:"); // We now have a separate `web_access` role to connect to the database @@ -407,13 +414,12 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> // XXX: later we should stop messing with Postgres ACL in such horrible // ways. let roles = spec - .cluster .roles .iter() .map(|r| r.name.pg_quote()) .collect::>(); - for db in &spec.cluster.databases { + for db in &spec.databases { let dbname = &db.name; let query: String = format!( @@ -429,7 +435,7 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> // Do some per-database access adjustments. We'd better do this at db creation time, // but CREATE DATABASE isn't transactional. So we cannot create db + do some grants // atomically. - for db in &spec.cluster.databases { + for db in &spec.databases { let mut conf = Config::from_str(connstr)?; conf.dbname(&db.name); @@ -499,14 +505,11 @@ pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> /// Create required system extensions #[instrument(skip_all)] -pub fn handle_extensions(spec: &ComputeSpec, client: &mut Client) -> Result<()> { - if let Some(libs) = spec.cluster.settings.find("shared_preload_libraries") { - if libs.contains("pg_stat_statements") { - // Create extension only if this compute really needs it - let query = "CREATE EXTENSION IF NOT EXISTS pg_stat_statements"; - info!("creating system extensions with query: {}", query); - client.simple_query(query)?; - } +pub fn handle_extensions(spec: &ComputeSpecV2, client: &mut Client) -> Result<()> { + for extension in &spec.extensions { + let query = format!("CREATE EXTENSION IF NOT EXISTS {}", extension.pg_quote()); + info!("creating system extensions with query: {}", query); + client.simple_query(&query)?; } Ok(()) diff --git a/compute_tools/tests/pg_helpers_tests.rs b/compute_tools/tests/pg_helpers_tests.rs index a63ee038c7..428f8211d9 100644 --- a/compute_tools/tests/pg_helpers_tests.rs +++ b/compute_tools/tests/pg_helpers_tests.rs @@ -1,57 +1,24 @@ #[cfg(test)] mod pg_helpers_tests { - use std::fs::File; - - use compute_api::spec::{ComputeSpec, GenericOption, GenericOptions, PgIdent}; + use anyhow::Result; + use compute_api::spec::{ComputeSpecV2, GenericOption, GenericOptions, PgIdent}; use compute_tools::pg_helpers::*; #[test] - fn params_serialize() { - let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap(); - let spec: ComputeSpec = serde_json::from_reader(file).unwrap(); + fn params_serialize() -> Result<()> { + let spec_v1_str = + std::fs::read_to_string("../libs/compute_api/tests/spec-v1.json").unwrap(); + let spec = ComputeSpecV2::parse_and_upgrade(&spec_v1_str)?; assert_eq!( - spec.cluster.databases.first().unwrap().to_pg_options(), + spec.databases.first().unwrap().to_pg_options(), "LC_COLLATE 'C' LC_CTYPE 'C' TEMPLATE template0 OWNER \"alexk\"" ); assert_eq!( - spec.cluster.roles.first().unwrap().to_pg_options(), + spec.roles.first().unwrap().to_pg_options(), "LOGIN PASSWORD 'md56b1d16b78004bbd51fa06af9eda75972'" ); - } - - #[test] - fn settings_serialize() { - let file = File::open("../libs/compute_api/tests/cluster_spec.json").unwrap(); - let spec: ComputeSpec = serde_json::from_reader(file).unwrap(); - - assert_eq!( - spec.cluster.settings.as_pg_settings(), - r#"fsync = off -wal_level = replica -hot_standby = on -neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501' -wal_log_hints = on -log_connections = on -shared_buffers = 32768 -port = 55432 -max_connections = 100 -max_wal_senders = 10 -listen_addresses = '0.0.0.0' -wal_sender_timeout = 0 -password_encryption = md5 -maintenance_work_mem = 65536 -max_parallel_workers = 8 -max_worker_processes = 8 -neon.tenant_id = 'b0554b632bd4d547a63b86c3630317e8' -max_replication_slots = 10 -neon.timeline_id = '2414a61ffc94e428f14b5758fe308e13' -shared_preload_libraries = 'neon' -synchronous_standby_names = 'walproposer' -neon.pageserver_connstring = 'host=127.0.0.1 port=6400' -test.escaping = 'here''s a backslash \\ and a quote '' and a double-quote " hooray' -"# - ); + Ok(()) } #[test] diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 8cf5ee5c08..1d6504d9c6 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +chrono.workspace = true clap.workspace = true comfy-table.workspace = true git-version.workspace = true diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 1c95d339c4..fb914bfed0 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -55,7 +55,7 @@ use crate::pageserver::PageServerNode; use crate::postgresql_conf::PostgresConf; use compute_api::responses::{ComputeState, ComputeStatus}; -use compute_api::spec::{Cluster, ComputeSpec}; +use compute_api::spec::ComputeSpecV2; // contents of a endpoint.json file #[serde_as] @@ -402,26 +402,29 @@ impl Endpoint { } // Create spec file - let spec = ComputeSpec { - format_version: 1.0, + let spec = ComputeSpecV2 { + format_version: 2, + + project_id: None, + endpoint_id: Some(self.endpoint_id.clone()), operation_uuid: None, - cluster: Cluster { - cluster_id: "FIXME".to_string(), - name: "FIXME".to_string(), - state: None, - roles: vec![], - databases: vec![], - settings: None, - postgresql_conf: Some(postgresql_conf), - }, - delta_operations: None, - tenant_id: Some(self.tenant_id), - timeline_id: Some(self.timeline_id), + + startup_tracing_context: None, + + tenant_id: self.tenant_id, + timeline_id: self.timeline_id, lsn: self.lsn, - pageserver_connstring: Some(pageserver_connstring), + pageserver_connstring, safekeeper_connstrings, storage_auth_token: auth_token.clone(), - startup_tracing_context: None, + + postgresql_conf: Some(postgresql_conf), + settings: None, + + roles: vec![], + databases: vec![], + extensions: vec![], + delta_operations: None, }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; diff --git a/libs/compute_api/src/models.rs b/libs/compute_api/src/models.rs index f6fa1f3396..06749e8018 100644 --- a/libs/compute_api/src/models.rs +++ b/libs/compute_api/src/models.rs @@ -1,6 +1,7 @@ //! Structs representing the JSON formats used in the compute_ctl's HTTP API. +use crate::rfc3339_serialize; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize, Serializer}; +use serde::{Deserialize, Serialize}; /// Response of the /status API #[derive(Deserialize, Serialize)] @@ -21,13 +22,6 @@ pub enum ComputeStatus { Failed, } -fn rfc3339_serialize(x: &DateTime, s: S) -> Result -where - S: Serializer, -{ - x.to_rfc3339().serialize(s) -} - /// Response of the /metrics.json API #[derive(Clone, Default, Serialize)] pub struct ComputeMetrics { diff --git a/libs/compute_api/src/requests.rs b/libs/compute_api/src/requests.rs index 5896c7dc65..c8cf8f727f 100644 --- a/libs/compute_api/src/requests.rs +++ b/libs/compute_api/src/requests.rs @@ -1,6 +1,6 @@ //! Structs representing the JSON formats used in the compute_ctl's HTTP API. -use crate::spec::ComputeSpec; +use crate::spec::ComputeSpecAnyVersion; use serde::Deserialize; /// Request of the /configure API @@ -10,5 +10,5 @@ use serde::Deserialize; /// `spec` into a struct initially to be more flexible in the future. #[derive(Deserialize, Debug)] pub struct ConfigurationRequest { - pub spec: ComputeSpec, + pub spec: ComputeSpecAnyVersion, } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index a8006e52cb..1a90d453b2 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -3,9 +3,11 @@ //! The spec.json file is used to pass information to 'compute_ctl'. It contains //! all the information needed to start up the right version of PostgreSQL, //! and connect it to the storage nodes. +use anyhow::anyhow; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use std::collections::HashMap; +use std::str::FromStr; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; @@ -16,17 +18,17 @@ pub type PgIdent = String; /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. #[serde_as] -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -pub struct ComputeSpec { - pub format_version: f32, +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ComputeSpecV2 { + pub format_version: u64, - // The control plane also includes a 'timestamp' field in the JSON document, - // but we don't use it for anything. Serde will ignore missing fields when - // deserializing it. + // For debugging purposes only + pub project_id: Option, + pub endpoint_id: Option, pub operation_uuid: Option, - /// Expected cluster state at the end of transition process. - pub cluster: Cluster, - pub delta_operations: Option>, + + /// W3C trace context of the launch operation, for OpenTelemetry tracing + pub startup_tracing_context: Option>, // Information needed to connect to the storage layer. // @@ -36,47 +38,170 @@ pub struct ComputeSpec { // the end of the timeline. If 'lsn' is set, this is a read-only node // "anchored" at that LSN. 'safekeeper_connstrings' must be non-empty for a // primary. - // - // For backwards compatibility, the control plane may leave out all of - // these, and instead set the "neon.tenant_id", "neon.timeline_id", - // etc. GUCs in cluster.settings. TODO: This is deprecated; once the control - // plane has been updated to fill these fields, we can make these non - // optional. - #[serde_as(as = "Option")] - pub tenant_id: Option, - #[serde_as(as = "Option")] - pub timeline_id: Option, + #[serde_as(as = "DisplayFromStr")] + pub tenant_id: TenantId, + #[serde_as(as = "DisplayFromStr")] + pub timeline_id: TimelineId, #[serde_as(as = "Option")] pub lsn: Option, - pub pageserver_connstring: Option, + pub pageserver_connstring: String, pub safekeeper_connstrings: Vec, /// If set, 'storage_auth_token' is used as the password to authenticate to /// the pageserver and safekeepers. pub storage_auth_token: Option, - /// W3C trace context of the launch operation, for OpenTelemetry tracing + /// Contents of postgresql.conf file + pub postgresql_conf: Option, + + /// Extra settings to append to the postgresql.conf + pub settings: GenericOptions, + + // Expected cluster state at the end of transition process. + pub roles: Vec, + pub databases: Vec, + pub extensions: Vec, + pub delta_operations: Option>, +} + +#[derive(Deserialize)] +struct FormatVersionOnly { + format_version: u64, +} + +impl TryFrom for ComputeSpecV2 { + type Error = anyhow::Error; + + fn try_from(input: ComputeSpecAnyVersion) -> Result { + // First check the 'format_version' field + match serde_json::from_value::(input.0.clone())?.format_version { + 1 => { + let v1: ComputeSpecV1 = serde_json::from_value(input.0)?; + + ComputeSpecV2::upgrade_from_v1(v1) + } + 2 => { + let v2: ComputeSpecV2 = serde_json::from_value(input.0)?; + Ok(v2) + } + other => Err(anyhow::anyhow!( + "unexpected format version {other} in spec file" + )), + } + } +} + +impl ComputeSpecV2 { + pub fn parse_and_upgrade(input: &str) -> anyhow::Result { + ComputeSpecV2::try_from(ComputeSpecAnyVersion(serde_json::from_str::< + serde_json::Value, + >(input)?)) + } + + pub fn upgrade_from_v1(spec_v1: ComputeSpecV1) -> anyhow::Result { + let mut tenant_id = None; + let mut timeline_id = None; + let mut pageserver_connstring = None; + let mut safekeeper_connstrings: Vec = Vec::new(); + + let mut extensions: Vec = Vec::new(); + + let mut settings: Vec = Vec::new(); + for setting in &spec_v1.cluster.settings { + if let Some(value) = &setting.value { + match setting.name.as_str() { + "neon.tenant_id" => { + tenant_id = Some(TenantId::from_str(value)?); + } + "neon.timeline_id" => { + timeline_id = Some(TimelineId::from_str(value)?); + } + "neon.pageserver_connstring" => { + pageserver_connstring = Some(value.clone()); + } + "neon.safekeepers" => { + // neon.safekeepers is a comma-separated list of poestgres connection URLs + safekeeper_connstrings = + value.split(',').map(|s| s.trim().to_string()).collect(); + } + "shared_preload_libraries" => { + if value.contains("pg_stat_statements") { + extensions.push("pg_stat_statements".to_string()); + } + settings.push(setting.clone()) + } + _ => settings.push(setting.clone()), + } + } else { + settings.push(setting.clone()) + } + } + let tenant_id = + tenant_id.ok_or_else(|| anyhow!("neon.tenant_id missing from spec file"))?; + let timeline_id = + timeline_id.ok_or_else(|| anyhow!("neon.timeline_id missing from spec file"))?; + let pageserver_connstring = pageserver_connstring + .ok_or_else(|| anyhow!("neon.pageserver_connstring missing from spec file"))?; + + Ok(ComputeSpecV2 { + format_version: 2, + + project_id: Some(spec_v1.cluster.cluster_id), + endpoint_id: Some(spec_v1.cluster.name), + operation_uuid: spec_v1.operation_uuid, + + startup_tracing_context: spec_v1.startup_tracing_context, + + tenant_id, + timeline_id, + lsn: None, // Not supported in V1 + pageserver_connstring, + safekeeper_connstrings, + + storage_auth_token: spec_v1.storage_auth_token, + + postgresql_conf: None, + settings: Some(settings), + + roles: spec_v1.cluster.roles, + databases: spec_v1.cluster.databases, + extensions, + delta_operations: spec_v1.delta_operations, + }) + } +} + +#[serde_as] +#[derive(Deserialize, Debug)] +pub struct ComputeSpecAnyVersion(pub serde_json::Value); + +// Old format that didn't have explicit 'tenant_id', 'timeline_id, 'pageserver_connstring' +// and 'safekeeper_connstrings' fields. They were stored in as GUCS in the 'cluster.settings' +// list +#[serde_as] +#[derive(Clone, Deserialize, Serialize)] +pub struct ComputeSpecV1 { + pub format_version: u64, + + // The control plane also includes a 'timestamp' field in the JSON document, + // but we don't use it for anything. Serde will ignore missing fields when + // deserializing it. + pub operation_uuid: Option, + pub cluster: ClusterV1, + pub delta_operations: Option>, + pub storage_auth_token: Option, + pub startup_tracing_context: Option>, } -#[derive(Clone, Debug, Default, Deserialize, Serialize)] -pub struct Cluster { +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct ClusterV1 { pub cluster_id: String, pub name: String, pub state: Option, pub roles: Vec, pub databases: Vec, - - /// Desired contents of 'postgresql.conf' file. (The 'compute_ctl' - /// tool may add additional settings to the final file.) - pub postgresql_conf: Option, - - /// Additional settings that will be appended to the 'postgresql.conf' file. - /// - /// TODO: This is deprecated. The control plane should append all the settings - /// directly in postgresql_conf. Remove this once the control plane has been - /// updated. - pub settings: GenericOptions, + pub settings: Vec, } /// Single cluster state changing operation that could not be represented as @@ -113,7 +238,7 @@ pub struct Database { /// Common type representing both SQL statement params with or without value, /// like `LOGIN` or `OWNER username` in the `CREATE/ALTER ROLE`, and config /// options like `wal_level = logical`. -#[derive(Clone, Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] pub struct GenericOption { pub name: String, pub value: Option, @@ -127,11 +252,70 @@ pub type GenericOptions = Option>; #[cfg(test)] mod tests { use super::*; - use std::fs::File; #[test] - fn parse_spec_file() { - let file = File::open("tests/cluster_spec.json").unwrap(); - let _spec: ComputeSpec = serde_json::from_reader(file).unwrap(); + fn test_upgrade_v1_to_v2() -> anyhow::Result<()> { + let spec_v1_str = std::fs::read_to_string("tests/spec-v1.json").unwrap(); + let spec_v2 = ComputeSpecV2::parse_and_upgrade(&spec_v1_str)?; + + // The original V1 file contains also neon.tenant_id, neon.timeline_id, + // neon.pageserver_connstring and neon.safekeepers. They are put to exclicit + // fields at the top level in V2. + assert_eq!( + spec_v2.tenant_id, + TenantId::from_str("3d1f7595b468230304e0b73cecbcb081")? + ); + assert_eq!( + spec_v2.timeline_id, + TimelineId::from_str("7f2aff2a1042b93a2617f44851638422")? + ); + assert_eq!(spec_v2.pageserver_connstring, "host=172.30.42.12 port=6400"); + assert_eq!( + spec_v2.safekeeper_connstrings, + vec![ + "172.30.42.23:6500", + "172.30.42.22:6500", + "172.30.42.21:6500" + ] + ); + + fn opt(name: &str, value: &str, vartype: &str) -> GenericOption { + GenericOption { + name: name.to_string(), + value: Some(value.to_string()), + vartype: vartype.to_string(), + } + } + + assert_eq!(spec_v2.postgresql_conf, None); + assert_eq!( + spec_v2.settings.as_ref().unwrap(), + &vec![ + opt("max_replication_write_lag", "500", "integer"), + opt("restart_after_crash", "off", "bool"), + opt("password_encryption", "md5", "enum"), + opt( + "shared_preload_libraries", + "neon, pg_stat_statements", + "string" + ), + opt("synchronous_standby_names", "walproposer", "string"), + opt("wal_level", "replica", "enum"), + opt("listen_addresses", "0.0.0.0", "string"), + opt("neon.max_cluster_size", "10240", "integer"), + opt("shared_buffers", "65536", "integer"), + opt( + "test.escaping", + r#"here's a backslash \ and a quote ' and a double-quote " hooray"#, + "string" + ), + ] + ); + + assert_eq!(spec_v2.extensions, vec!["pg_stat_statements"]); + + eprintln!("SPEC: {}", serde_json::to_string_pretty(&spec_v2)?); + + Ok(()) } } diff --git a/libs/compute_api/tests/cluster_spec.json b/libs/compute_api/tests/cluster_spec.json deleted file mode 100644 index 8f81e7b3bd..0000000000 --- a/libs/compute_api/tests/cluster_spec.json +++ /dev/null @@ -1,209 +0,0 @@ -{ - "format_version": 1.0, - - "timestamp": "2021-05-23T18:25:43.511Z", - "operation_uuid": "0f657b36-4b0f-4a2d-9c2e-1dcd615e7d8b", - - "cluster": { - "cluster_id": "test-cluster-42", - "name": "Zenith Test", - "state": "restarted", - "roles": [ - { - "name": "postgres", - "encrypted_password": "6b1d16b78004bbd51fa06af9eda75972", - "options": null - }, - { - "name": "alexk", - "encrypted_password": null, - "options": null - }, - { - "name": "zenith \"new\"", - "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972", - "options": null - }, - { - "name": "zen", - "encrypted_password": "9b1d16b78004bbd51fa06af9eda75972" - }, - { - "name": "\"name\";\\n select 1;", - "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972" - }, - { - "name": "MyRole", - "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972" - } - ], - "databases": [ - { - "name": "DB2", - "owner": "alexk", - "options": [ - { - "name": "LC_COLLATE", - "value": "C", - "vartype": "string" - }, - { - "name": "LC_CTYPE", - "value": "C", - "vartype": "string" - }, - { - "name": "TEMPLATE", - "value": "template0", - "vartype": "enum" - } - ] - }, - { - "name": "zenith", - "owner": "MyRole" - }, - { - "name": "zen", - "owner": "zen" - } - ], - "settings": [ - { - "name": "fsync", - "value": "off", - "vartype": "bool" - }, - { - "name": "wal_level", - "value": "replica", - "vartype": "enum" - }, - { - "name": "hot_standby", - "value": "on", - "vartype": "bool" - }, - { - "name": "neon.safekeepers", - "value": "127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501", - "vartype": "string" - }, - { - "name": "wal_log_hints", - "value": "on", - "vartype": "bool" - }, - { - "name": "log_connections", - "value": "on", - "vartype": "bool" - }, - { - "name": "shared_buffers", - "value": "32768", - "vartype": "integer" - }, - { - "name": "port", - "value": "55432", - "vartype": "integer" - }, - { - "name": "max_connections", - "value": "100", - "vartype": "integer" - }, - { - "name": "max_wal_senders", - "value": "10", - "vartype": "integer" - }, - { - "name": "listen_addresses", - "value": "0.0.0.0", - "vartype": "string" - }, - { - "name": "wal_sender_timeout", - "value": "0", - "vartype": "integer" - }, - { - "name": "password_encryption", - "value": "md5", - "vartype": "enum" - }, - { - "name": "maintenance_work_mem", - "value": "65536", - "vartype": "integer" - }, - { - "name": "max_parallel_workers", - "value": "8", - "vartype": "integer" - }, - { - "name": "max_worker_processes", - "value": "8", - "vartype": "integer" - }, - { - "name": "neon.tenant_id", - "value": "b0554b632bd4d547a63b86c3630317e8", - "vartype": "string" - }, - { - "name": "max_replication_slots", - "value": "10", - "vartype": "integer" - }, - { - "name": "neon.timeline_id", - "value": "2414a61ffc94e428f14b5758fe308e13", - "vartype": "string" - }, - { - "name": "shared_preload_libraries", - "value": "neon", - "vartype": "string" - }, - { - "name": "synchronous_standby_names", - "value": "walproposer", - "vartype": "string" - }, - { - "name": "neon.pageserver_connstring", - "value": "host=127.0.0.1 port=6400", - "vartype": "string" - }, - { - "name": "test.escaping", - "value": "here's a backslash \\ and a quote ' and a double-quote \" hooray", - "vartype": "string" - } - ] - }, - "delta_operations": [ - { - "action": "delete_db", - "name": "zenith_test" - }, - { - "action": "rename_db", - "name": "DB", - "new_name": "DB2" - }, - { - "action": "delete_role", - "name": "zenith2" - }, - { - "action": "rename_role", - "name": "zenith new", - "new_name": "zenith \"new\"" - } - ] -} diff --git a/libs/compute_api/tests/spec-v1.json b/libs/compute_api/tests/spec-v1.json new file mode 100644 index 0000000000..d4f525c55f --- /dev/null +++ b/libs/compute_api/tests/spec-v1.json @@ -0,0 +1,175 @@ +{ + "cluster": { + "cluster_id": "young-snowflake-871338", + "name": "young-snowflake-871338", + "settings": [ + { + "name": "max_replication_write_lag", + "value": "500", + "vartype": "integer" + }, + { + "name": "neon.pageserver_connstring", + "value": "host=172.30.42.12 port=6400", + "vartype": "string" + }, + { + "name": "restart_after_crash", + "value": "off", + "vartype": "bool" + }, + { + "name": "password_encryption", + "value": "md5", + "vartype": "enum" + }, + { + "name": "shared_preload_libraries", + "value": "neon, pg_stat_statements", + "vartype": "string" + }, + { + "name": "synchronous_standby_names", + "value": "walproposer", + "vartype": "string" + }, + { + "name": "neon.tenant_id", + "value": "3d1f7595b468230304e0b73cecbcb081", + "vartype": "string" + }, + { + "name": "neon.timeline_id", + "value": "7f2aff2a1042b93a2617f44851638422", + "vartype": "string" + }, + { + "name": "wal_level", + "value": "replica", + "vartype": "enum" + }, + { + "name": "listen_addresses", + "value": "0.0.0.0", + "vartype": "string" + }, + { + "name": "neon.safekeepers", + "value": "172.30.42.23:6500,172.30.42.22:6500,172.30.42.21:6500", + "vartype": "string" + }, + { + "name": "neon.max_cluster_size", + "value": "10240", + "vartype": "integer" + }, + { + "name": "shared_buffers", + "value": "65536", + "vartype": "integer" + }, + { + "name": "test.escaping", + "value": "here's a backslash \\ and a quote ' and a double-quote \" hooray", + "vartype": "string" + } + ], + "roles": [ + { + "name": "postgres", + "encrypted_password": "6b1d16b78004bbd51fa06af9eda75972", + "options": null + }, + { + "name": "testuser", + "encrypted_password": "SCRAM-SHA-256$4096:R4V8wIc+aH8T7vy3weC5qg==$aXXM6IQKnEWsRgeyjbxydif6f29LZOGvAWe/oOnuXSM=:5IE7U/woZLZbYSYOJ3v4x3qlLOXS6xcsdJYnMdVkzQY=", + "options": null + }, + { + "name": "alexk", + "encrypted_password": null, + "options": null + }, + { + "name": "neon \"new\"", + "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972", + "options": null + }, + { + "name": "bar", + "encrypted_password": "9b1d16b78004bbd51fa06af9eda75972" + }, + { + "name": "\"name\";\\n select 1;", + "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972" + }, + { + "name": "MyRole", + "encrypted_password": "5b1d16b78004bbd51fa06af9eda75972" + } + ], + "databases": [ + { + "name": "DB2", + "owner": "alexk", + "options": [ + { + "name": "LC_COLLATE", + "value": "C", + "vartype": "string" + }, + { + "name": "LC_CTYPE", + "value": "C", + "vartype": "string" + }, + { + "name": "TEMPLATE", + "value": "template0", + "vartype": "enum" + } + ] + }, + { + "name": "neondb", + "owner": "testuser", + "options": null + }, + { + "name": "mydb", + "owner": "MyRole" + }, + { + "name": "foo", + "owner": "bar" + } + ] + }, + "delta_operations": [ + { + "action": "delete_db", + "name": "neon_test" + }, + { + "action": "rename_db", + "name": "DB", + "new_name": "DB2" + }, + { + "action": "delete_role", + "name": "neon2" + }, + { + "action": "rename_role", + "name": "neon new", + "new_name": "neon \"new\"" + } + ], + "format_version": 1, + "operation_uuid": "73c843c3-46dd-496f-b819-e6c5a190f584", + "timestamp": "2023-03-25T21:36:16.729366596Z", + "storage_auth_token": "dummy", + "startup_tracing_context": { + "traceparent": "00-1b79dca0e798ee42961cd13990326551-5e0222e8d7314785-01" + } +} diff --git a/test_runner/regress/test_neon_local_cli.py b/test_runner/regress/test_neon_local_cli.py index f60ac546d2..3314e7fbf6 100644 --- a/test_runner/regress/test_neon_local_cli.py +++ b/test_runner/regress/test_neon_local_cli.py @@ -12,7 +12,9 @@ def test_neon_cli_basics(neon_env_builder: NeonEnvBuilder, port_distributor: Por pg_port = port_distributor.get_port() http_port = port_distributor.get_port() - env.neon_cli.endpoint_start(endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port) + env.neon_cli.endpoint_start( + endpoint_id="ep-basic-main", pg_port=pg_port, http_port=http_port + ) env.neon_cli.create_branch(new_branch_name="migration_check") pg_port = port_distributor.get_port()