From d7d3fb332f99e6e4b5c44049d3bbe03a12473125 Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Fri, 23 May 2025 13:23:53 -0500 Subject: [PATCH] Remove notion of ParsedSpec Signed-off-by: Tristan Partin --- Cargo.lock | 4 + compute_tools/src/compute.rs | 283 ++++++------------ compute_tools/src/compute_prewarm.rs | 37 ++- compute_tools/src/config.rs | 70 +++-- compute_tools/src/http/routes/configure.rs | 9 +- .../src/http/routes/extension_server.rs | 3 +- compute_tools/src/http/routes/mod.rs | 10 +- compute_tools/src/lsn_lease.rs | 23 +- compute_tools/src/spec_apply.rs | 4 +- compute_tools/src/sync_sk.rs | 2 +- control_plane/src/bin/neon_local.rs | 31 +- control_plane/src/endpoint.rs | 90 +++--- libs/compute_api/Cargo.toml | 3 + libs/compute_api/src/spec.rs | 102 +++++-- libs/utils/src/id.rs | 6 +- storage_controller/Cargo.toml | 3 +- storage_controller/src/compute_hook.rs | 5 +- 17 files changed, 336 insertions(+), 349 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3ee261e885..b4541b7e94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1278,10 +1278,13 @@ dependencies = [ "chrono", "indexmap 2.9.0", "jsonwebtoken", + "postgres", "regex", "remote_storage", "serde", "serde_json", + "tokio-postgres", + "url", "utils", ] @@ -6747,6 +6750,7 @@ dependencies = [ "chrono", "clap", "clashmap", + "compute_api", "control_plane", "cron", "diesel", diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 7b6f923f7d..fa41b32b0d 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -17,7 +17,6 @@ use nix::unistd::Pid; use once_cell::sync::Lazy; use remote_storage::{DownloadError, RemotePath}; use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; use std::os::unix::fs::{PermissionsExt, symlink}; use std::path::Path; use std::process::{Command, Stdio}; @@ -30,7 +29,6 @@ use tokio::spawn; use tokio_postgres::{NoTls, error::SqlState}; use tracing::{Instrument, debug, error, info, instrument, warn}; use url::Url; -use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use utils::measured_stream::MeasuredReader; @@ -142,7 +140,7 @@ pub struct ComputeState { /// Compute spec. This can be received from the CLI or - more likely - /// passed by the control plane with a /configure HTTP request. - pub pspec: Option, + pub spec: Option, /// If the spec is passed by a /configure request, 'startup_span' is the /// /configure request's tracing span. The main thread enters it when it @@ -169,7 +167,7 @@ impl ComputeState { status: ComputeStatus::Empty, last_active: None, error: None, - pspec: None, + spec: None, startup_span: None, metrics: ComputeMetrics::default(), lfc_prewarm_state: LfcPrewarmState::default(), @@ -201,94 +199,6 @@ impl Default for ComputeState { } } -#[derive(Clone, Debug)] -pub struct ParsedSpec { - pub spec: ComputeSpec, - pub tenant_id: TenantId, - pub timeline_id: TimelineId, - pub pageserver_connstr: String, - pub safekeeper_connstrings: Vec, - pub storage_auth_token: Option, - pub endpoint_storage_addr: Option, - pub endpoint_storage_token: Option, -} - -impl TryFrom for ParsedSpec { - type Error = String; - fn try_from(spec: ComputeSpec) -> Result { - // Extract the options from the spec file that are needed to connect to - // the storage system. - // - // For backwards-compatibility, the top-level fields in the spec file - // may be empty. In that case, we need to dig them from the GUCs in the - // cluster.settings field. - let pageserver_connstr = spec - .pageserver_connstring - .clone() - .or_else(|| spec.cluster.settings.find("neon.pageserver_connstring")) - .ok_or("pageserver connstr should be provided")?; - let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() { - if matches!(spec.mode, ComputeMode::Primary) { - spec.cluster - .settings - .find("neon.safekeepers") - .ok_or("safekeeper connstrings should be provided")? - .split(',') - .map(|str| str.to_string()) - .collect() - } else { - vec![] - } - } else { - spec.safekeeper_connstrings.clone() - }; - let storage_auth_token = spec.storage_auth_token.clone(); - let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id { - tenant_id - } else { - spec.cluster - .settings - .find("neon.tenant_id") - .ok_or("tenant id should be provided") - .map(|s| TenantId::from_str(&s))? - .or(Err("invalid tenant id"))? - }; - let timeline_id: TimelineId = if let Some(timeline_id) = spec.timeline_id { - timeline_id - } else { - spec.cluster - .settings - .find("neon.timeline_id") - .ok_or("timeline id should be provided") - .map(|s| TimelineId::from_str(&s))? - .or(Err("invalid timeline id"))? - }; - - let endpoint_storage_addr: Option = spec - .endpoint_storage_addr - .clone() - .or_else(|| spec.cluster.settings.find("neon.endpoint_storage_addr")) - .unwrap_or_default() - .parse() - .ok(); - let endpoint_storage_token = spec - .endpoint_storage_token - .clone() - .or_else(|| spec.cluster.settings.find("neon.endpoint_storage_token")); - - Ok(ParsedSpec { - spec, - pageserver_connstr, - safekeeper_connstrings, - storage_auth_token, - tenant_id, - timeline_id, - endpoint_storage_addr, - endpoint_storage_token, - }) - } -} - /// If we are a VM, returns a [`Command`] that will run in the `neon-postgres` /// cgroup. Otherwise returns the default `Command::new(cmd)` /// @@ -366,10 +276,7 @@ impl ComputeNode { tokio_conn_conf.options(&options); let mut new_state = ComputeState::new(); - if let Some(spec) = config.spec { - let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - new_state.pspec = Some(pspec); - } + new_state.spec = config.spec; Ok(ComputeNode { params, @@ -387,7 +294,7 @@ impl ComputeNode { pub async fn run(self) -> Result> { let this = Arc::new(self); - let cli_spec = this.state.lock().unwrap().pspec.clone(); + let cli_spec = this.state.lock().unwrap().spec.clone(); // If this is a pooled VM, prewarm before starting HTTP server and becoming // available for binding. Prewarming helps Postgres start quicker later, @@ -423,7 +330,7 @@ impl ComputeNode { // If we got a spec from the CLI already, use that. Otherwise wait for the // control plane to pass it to us with a /configure HTTP request - let pspec = if let Some(cli_spec) = cli_spec { + let spec = if let Some(cli_spec) = cli_spec { cli_spec } else { this.wait_spec()? @@ -440,7 +347,7 @@ impl ComputeNode { Ok(()) => { // Success! Launch remaining services (just vm-monitor currently) vm_monitor = - Some(this.start_vm_monitor(pspec.spec.disable_lfc_resizing.unwrap_or(false))); + Some(this.start_vm_monitor(spec.disable_lfc_resizing.unwrap_or(false))); } Err(err) => { // Something went wrong with the startup. Log it and expose the error to @@ -496,7 +403,7 @@ impl ComputeNode { Ok(exit_code) } - pub fn wait_spec(&self) -> Result { + pub fn wait_spec(&self) -> Result { info!("no compute spec provided, waiting"); let mut state = self.state.lock().unwrap(); while state.status != ComputeStatus::ConfigurationPending { @@ -504,7 +411,7 @@ impl ComputeNode { } info!("got spec, continue configuration"); - let spec = state.pspec.as_ref().unwrap().clone(); + let spec = state.spec.as_ref().unwrap().clone(); // Record for how long we slept waiting for the spec. let now = Utc::now(); @@ -572,18 +479,17 @@ impl ComputeNode { compute_state = state_guard.clone() } - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = compute_state.spec.as_ref().expect("spec must be set"); info!( - "starting compute for project {}, operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}", - pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"), - pspec.spec.operation_uuid.as_deref().unwrap_or("None"), - pspec.tenant_id, - pspec.timeline_id, - pspec.spec.project_id.as_deref().unwrap_or("None"), - pspec.spec.branch_id.as_deref().unwrap_or("None"), - pspec.spec.endpoint_id.as_deref().unwrap_or("None"), - pspec.spec.features, - pspec.spec.remote_extensions, + "starting compute for operation {}, tenant {}, timeline {}, project {}, branch {}, endpoint {}, features {:?}, spec.remote_extensions {:?}", + spec.operation_uuid.as_deref().unwrap_or("None"), + spec.tenant_id, + spec.timeline_id, + spec.project_id, + spec.branch_id, + spec.endpoint_id, + spec.features, + spec.remote_extensions, ); ////// PRE-STARTUP PHASE: things that need to be finished before we start the Postgres process @@ -604,8 +510,8 @@ impl ComputeNode { let tls_config = self.tls_config(&pspec.spec); // If there are any remote extensions in shared_preload_libraries, start downloading them - if pspec.spec.remote_extensions.is_some() { - let (this, spec) = (self.clone(), pspec.spec.clone()); + if spec.remote_extensions.is_some() { + let (this, spec) = (self.clone(), spec.clone()); pre_tasks.spawn(async move { this.download_preload_extensions(&spec) .in_current_span() @@ -620,9 +526,7 @@ impl ComputeNode { } // Resize swap to the desired size if the compute spec says so - if let (Some(size_bytes), true) = - (pspec.spec.swap_size_bytes, self.params.resize_swap_on_bind) - { + if let (Some(size_bytes), true) = (spec.swap_size_bytes, self.params.resize_swap_on_bind) { pre_tasks.spawn_blocking_child(move || { // To avoid 'swapoff' hitting postgres startup, we need to run resize-swap to completion // *before* starting postgres. @@ -640,7 +544,7 @@ impl ComputeNode { // Set disk quota if the compute spec says so if let (Some(disk_quota_bytes), Some(disk_quota_fs_mountpoint)) = ( - pspec.spec.disk_quota_bytes, + spec.disk_quota_bytes, self.params.set_disk_quota_for_fs.as_ref(), ) { let disk_quota_fs_mountpoint = disk_quota_fs_mountpoint.clone(); @@ -655,7 +559,7 @@ impl ComputeNode { } // tune pgbouncer - if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings { + if let Some(pgbouncer_settings) = &spec.pgbouncer_settings { info!("tuning pgbouncer"); let pgbouncer_settings = pgbouncer_settings.clone(); @@ -673,7 +577,7 @@ impl ComputeNode { } // configure local_proxy - if let Some(local_proxy) = &pspec.spec.local_proxy_config { + if let Some(local_proxy) = &spec.local_proxy_config { info!("configuring local_proxy"); // Spawn a background task to do the configuration, @@ -691,7 +595,7 @@ impl ComputeNode { } // Configure and start rsyslog for compliance audit logging - match pspec.spec.audit_log_level { + match spec.audit_log_level { ComputeAudit::Hipaa | ComputeAudit::Extended | ComputeAudit::Full => { let remote_endpoint = std::env::var("AUDIT_LOGGING_ENDPOINT").unwrap_or("".to_string()); @@ -702,16 +606,10 @@ impl ComputeNode { let log_directory_path = Path::new(&self.params.pgdata).join("log"); let log_directory_path = log_directory_path.to_string_lossy().to_string(); - // Add project_id,endpoint_id to identify the logs. - // - // These ids are passed from cplane, - let endpoint_id = pspec.spec.endpoint_id.as_deref().unwrap_or(""); - let project_id = pspec.spec.project_id.as_deref().unwrap_or(""); - configure_audit_rsyslog( log_directory_path.clone(), - endpoint_id, - project_id, + &spec.endpoint_id, + &spec.project_id, &remote_endpoint, )?; @@ -722,7 +620,7 @@ impl ComputeNode { } // Configure and start rsyslog for Postgres logs export - let conf = PostgresLogsRsyslogConfig::new(pspec.spec.logs_export_host.as_deref()); + let conf = PostgresLogsRsyslogConfig::new(spec.logs_export_host.as_deref()); configure_postgres_logs_export(conf)?; // Launch remaining service threads @@ -736,14 +634,14 @@ impl ComputeNode { ////// START POSTGRES let start_time = Utc::now(); - let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?; + let pg_process = self.start_postgres(spec.storage_auth_token.clone())?; let postmaster_pid = pg_process.pid(); *pg_handle = Some(pg_process); // If this is a primary endpoint, perform some post-startup configuration before // opening it up for the world. let config_time = Utc::now(); - if pspec.spec.mode == ComputeMode::Primary { + if spec.mode == ComputeMode::Primary { self.configure_as_primary(&compute_state)?; let conf = self.get_tokio_conn_conf(None); @@ -784,6 +682,7 @@ impl ComputeNode { if pspec.spec.autoprewarm { self.prewarm_lfc(); } + Ok(()) } @@ -862,10 +761,10 @@ impl ComputeNode { async fn cleanup_after_postgres_exit(&self) -> Result { // Maybe sync safekeepers again, to speed up next startup let compute_state = self.state.lock().unwrap().clone(); - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); - if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) { + let spec = compute_state.spec.as_ref().expect("spec must be set"); + if matches!(spec.mode, compute_api::spec::ComputeMode::Primary) { info!("syncing safekeepers on shutdown"); - let storage_auth_token = pspec.storage_auth_token.clone(); + let storage_auth_token = spec.storage_auth_token.clone(); let lsn = self.sync_safekeepers(storage_auth_token).await?; info!("synced safekeepers at lsn {lsn}"); } @@ -892,9 +791,9 @@ impl ComputeNode { self.state .lock() .unwrap() - .pspec + .spec .as_ref() - .is_some_and(|s| s.spec.features.contains(&feature)) + .is_some_and(|spec| spec.features.contains(&feature)) } pub fn set_status(&self, status: ComputeStatus) { @@ -911,13 +810,15 @@ impl ComputeNode { self.state.lock().unwrap().status } - pub fn get_timeline_id(&self) -> Option { + pub fn get_timeline_id(&self) -> TimelineId { self.state .lock() .unwrap() - .pspec + .spec .as_ref() - .map(|s| s.timeline_id) + .unwrap() + .timeline_id + .clone() } // Remove `pgdata` directory and create it again with right permissions. @@ -936,11 +837,10 @@ impl ComputeNode { // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip_all, fields(%lsn))] fn try_get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { - let spec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = compute_state.spec.as_ref().expect("spec must be set"); let start_time = Instant::now(); - let shard0_connstr = spec.pageserver_connstr.split(',').next().unwrap(); - let mut config = postgres::Config::from_str(shard0_connstr)?; + let mut config = postgres::Config::from(&spec.pageservers[0]); // Use the storage auth token from the config file, if given. // Note: this overrides any password set in the connection string. @@ -952,11 +852,8 @@ impl ComputeNode { } config.application_name("compute_ctl"); - if let Some(spec) = &compute_state.pspec { - config.options(&format!( - "-c neon.compute_mode={}", - spec.spec.mode.to_type_str() - )); + if let Some(spec) = &compute_state.spec { + config.options(&format!("-c neon.compute_mode={}", spec.mode.to_type_str())); } // Connect to pageserver @@ -965,7 +862,7 @@ impl ComputeNode { let basebackup_cmd = match lsn { Lsn(0) => { - if spec.spec.mode != ComputeMode::Primary { + if spec.mode != ComputeMode::Primary { format!( "basebackup {} {} --gzip --replica", spec.tenant_id, spec.timeline_id @@ -975,7 +872,7 @@ impl ComputeNode { } } _ => { - if spec.spec.mode != ComputeMode::Primary { + if spec.mode != ComputeMode::Primary { format!( "basebackup {} {} {} --gzip --replica", spec.tenant_id, spec.timeline_id, lsn @@ -1051,35 +948,34 @@ impl ComputeNode { compute_state: &ComputeState, ) -> Result> { // Construct a connection config for each safekeeper - let pspec: ParsedSpec = compute_state - .pspec + let spec = compute_state + .spec .as_ref() .expect("spec must be set") .clone(); - let sk_connstrs: Vec = pspec.safekeeper_connstrings.clone(); - let sk_configs = sk_connstrs.into_iter().map(|connstr| { - // Format connstr - let id = connstr.clone(); - let connstr = format!("postgresql://no_user@{}", connstr); - let options = format!( - "-c timeline_id={} tenant_id={}", - pspec.timeline_id, pspec.tenant_id - ); + let safekeepers = spec + .safekeepers + .iter() + .map(|s| { + let mut config = tokio_postgres::Config::from(s); - // Construct client - let mut config = tokio_postgres::Config::from_str(&connstr).unwrap(); - config.options(&options); - if let Some(storage_auth_token) = pspec.storage_auth_token.clone() { - config.password(storage_auth_token); - } + config.user("no_user"); + config.options(&format!( + "-c timeline_id={} tenant_id={}", + spec.timeline_id, spec.tenant_id + )); + if let Some(storage_auth_token) = &spec.storage_auth_token { + config.password(storage_auth_token); + } - (id, config) - }); + (format!("{}:{}", s.host, s.port), config) + }) + .collect::>(); // Create task set to query all safekeepers let mut tasks = FuturesUnordered::new(); - let quorum = sk_configs.len() / 2 + 1; - for (id, config) in sk_configs { + let quorum = safekeepers.len() / 2 + 1; + for (id, config) in safekeepers { let timeout = tokio::time::Duration::from_millis(100); let task = tokio::time::timeout(timeout, ping_safekeeper(id, config)); tasks.push(tokio::spawn(task)); @@ -1204,8 +1100,7 @@ impl ComputeNode { /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); - let spec = &pspec.spec; + let spec = compute_state.spec.as_ref().expect("spec must be set"); let pgdata_path = Path::new(&self.params.pgdata); let tls_config = self.tls_config(&pspec.spec); @@ -1214,7 +1109,7 @@ impl ComputeNode { self.create_pgdata()?; config::write_postgres_conf( pgdata_path, - &pspec.spec, + spec, self.params.internal_http_port, tls_config, )?; @@ -1230,7 +1125,7 @@ impl ComputeNode { lsn } else { info!("starting safekeepers syncing"); - self.sync_safekeepers(pspec.storage_auth_token.clone()) + self.sync_safekeepers(spec.storage_auth_token.clone()) .await .with_context(|| "failed to sync safekeepers")? }; @@ -1248,13 +1143,13 @@ impl ComputeNode { }; info!( - "getting basebackup@{} from pageserver {}", - lsn, &pspec.pageserver_connstr + "getting basebackup@{} from pageserver {}:{}", + lsn, spec.pageservers[0].host, spec.pageservers[0].port ); self.get_basebackup(compute_state, lsn).with_context(|| { format!( - "failed to get basebackup@{} from pageserver {}", - lsn, &pspec.pageserver_connstr + "failed to get basebackup@{} from pageserver {}:{}", + lsn, spec.pageservers[0].host, spec.pageservers[0].port ) })?; @@ -1536,10 +1431,9 @@ impl ComputeNode { let conf = Arc::new(conf); let spec = Arc::new( compute_state - .pspec + .spec .as_ref() .expect("spec must be set") - .spec .clone(), ); @@ -1608,7 +1502,7 @@ impl ComputeNode { /// as it's used to reconfigure a previously started and configured Postgres node. #[instrument(skip_all)] pub fn reconfigure(&self) -> Result<()> { - let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec; + let spec = self.state.lock().unwrap().spec.as_ref().unwrap().clone(); let tls_config = self.tls_config(&spec); @@ -1690,10 +1584,10 @@ impl ComputeNode { #[instrument(skip_all)] pub fn configure_as_primary(&self, compute_state: &ComputeState) -> Result<()> { - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = compute_state.spec.as_ref().expect("spec must be set"); - assert!(pspec.spec.mode == ComputeMode::Primary); - if !pspec.spec.skip_pg_catalog_updates { + assert!(spec.mode == ComputeMode::Primary); + if !spec.skip_pg_catalog_updates { let pgdata_path = Path::new(&self.params.pgdata); // temporarily reset max_cluster_size in config // to avoid the possibility of hitting the limit, while we are applying config: @@ -2189,24 +2083,23 @@ LIMIT 100", /// the pageserver connection strings has changed. /// /// The operation will time out after a specified duration. - pub fn wait_timeout_while_pageserver_connstr_unchanged(&self, duration: Duration) { + pub fn wait_timeout_while_pageservers_unchanged(&self, duration: Duration) { let state = self.state.lock().unwrap(); - let old_pageserver_connstr = state - .pspec + let old_pageservers = state + .spec .as_ref() .expect("spec must be set") - .pageserver_connstr + .pageservers .clone(); let mut unchanged = true; let _ = self .state_changed .wait_timeout_while(state, duration, |s| { - let pageserver_connstr = &s - .pspec - .as_ref() - .expect("spec must be set") - .pageserver_connstr; - unchanged = pageserver_connstr == &old_pageserver_connstr; + let current_pageservers = &s.spec.as_ref().expect("spec must be set").pageservers; + unchanged = current_pageservers + .iter() + .zip(&old_pageservers) + .all(|(c, o)| c == o); unchanged }) .unwrap(); diff --git a/compute_tools/src/compute_prewarm.rs b/compute_tools/src/compute_prewarm.rs index a6a84b3f1f..866232975e 100644 --- a/compute_tools/src/compute_prewarm.rs +++ b/compute_tools/src/compute_prewarm.rs @@ -3,6 +3,7 @@ use anyhow::{Context, Result, bail}; use async_compression::tokio::bufread::{ZstdDecoder, ZstdEncoder}; use compute_api::responses::LfcOffloadState; use compute_api::responses::LfcPrewarmState; +use compute_api::spec::ComputeSpec; use http::StatusCode; use reqwest::Client; use std::sync::Arc; @@ -25,24 +26,30 @@ struct EndpointStoragePair { } const KEY: &str = "lfc_state"; -impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair { +impl TryFrom<&ComputeSpec> for EndpointStoragePair { type Error = anyhow::Error; - fn try_from(pspec: &crate::compute::ParsedSpec) -> Result { - let Some(ref endpoint_id) = pspec.spec.endpoint_id else { - bail!("pspec.endpoint_id missing") + fn try_from(spec: &ComputeSpec) -> Result { + let Some(ref addr) = spec.endpoint_storage_addr else { + bail!("spec.endpoint_storage_addr missing") }; - let Some(ref base_uri) = pspec.endpoint_storage_addr else { - bail!("pspec.endpoint_storage_addr missing") - }; - let tenant_id = pspec.tenant_id; - let timeline_id = pspec.timeline_id; - let url = format!("http://{base_uri}/{tenant_id}/{timeline_id}/{endpoint_id}/{KEY}"); - let Some(ref token) = pspec.endpoint_storage_token else { - bail!("pspec.endpoint_storage_token missing") + let url = format!( + "http://{addr}/{tenant_id}/{timeline_id}/{endpoint_id}/{key}", + addr = addr, + tenant_id = spec.tenant_id, + timeline_id = spec.timeline_id, + endpoint_id = spec.endpoint_id, + key = KEY + ); + + let Some(ref token) = spec.endpoint_storage_token else { + bail!("spec.endpoint_storage_token missing") }; - let token = token.clone(); - Ok(EndpointStoragePair { url, token }) + + Ok(EndpointStoragePair { + url, + token: token.clone(), + }) } } @@ -111,7 +118,7 @@ impl ComputeNode { fn endpoint_storage_pair(&self) -> Result { let state = self.state.lock().unwrap(); - state.pspec.as_ref().unwrap().try_into() + state.spec.as_ref().unwrap().try_into() } async fn prewarm_impl(&self) -> Result<()> { diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 933b30134f..4c8e55d5b3 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -56,13 +56,24 @@ pub fn write_postgres_conf( // Add options for connecting to storage writeln!(file, "# Neon storage settings")?; - if let Some(s) = &spec.pageserver_connstring { - writeln!(file, "neon.pageserver_connstring={}", escape_conf_value(s))?; + if !spec.pageservers.is_empty() { + writeln!( + file, + "neon.pageserver_connstring={}", + escape_conf_value( + &spec + .pageservers + .iter() + .map(|p| format!("host={} port={}", p.host, p.port)) + .collect::>() + .join(",") + ) + )?; } if let Some(stripe_size) = spec.shard_stripe_size { writeln!(file, "neon.stripe_size={stripe_size}")?; } - if !spec.safekeeper_connstrings.is_empty() { + if !spec.safekeepers.is_empty() { let mut neon_safekeepers_value = String::new(); tracing::info!( "safekeepers_connstrings is not zero, gen: {:?}", @@ -72,32 +83,45 @@ pub fn write_postgres_conf( if let Some(generation) = spec.safekeepers_generation { write!(neon_safekeepers_value, "g#{}:", generation)?; } - neon_safekeepers_value.push_str(&spec.safekeeper_connstrings.join(",")); + neon_safekeepers_value.push_str( + &spec + .safekeepers + .iter() + .map(|s| format!("{}:{}", s.host.to_string(), s.port)) + .collect::>() + .join(","), + ); writeln!( file, "neon.safekeepers={}", escape_conf_value(&neon_safekeepers_value) )?; } - if let Some(s) = &spec.tenant_id { - writeln!(file, "neon.tenant_id={}", escape_conf_value(&s.to_string()))?; - } - if let Some(s) = &spec.timeline_id { - writeln!( - file, - "neon.timeline_id={}", - escape_conf_value(&s.to_string()) - )?; - } - if let Some(s) = &spec.project_id { - writeln!(file, "neon.project_id={}", escape_conf_value(s))?; - } - if let Some(s) = &spec.branch_id { - writeln!(file, "neon.branch_id={}", escape_conf_value(s))?; - } - if let Some(s) = &spec.endpoint_id { - writeln!(file, "neon.endpoint_id={}", escape_conf_value(s))?; - } + writeln!( + file, + "neon.tenant_id={}", + escape_conf_value(&spec.tenant_id.to_string()) + )?; + writeln!( + file, + "neon.timeline_id={}", + escape_conf_value(&spec.timeline_id.to_string()) + )?; + writeln!( + file, + "neon.project_id={}", + escape_conf_value(&spec.project_id) + )?; + writeln!( + file, + "neon.branch_id={}", + escape_conf_value(&spec.branch_id) + )?; + writeln!( + file, + "neon.endpoint_id={}", + escape_conf_value(&spec.endpoint_id) + )?; // tls if let Some(tls_config) = tls_config { diff --git a/compute_tools/src/http/routes/configure.rs b/compute_tools/src/http/routes/configure.rs index c29e3a97da..b0e33e4bec 100644 --- a/compute_tools/src/http/routes/configure.rs +++ b/compute_tools/src/http/routes/configure.rs @@ -8,7 +8,7 @@ use http::StatusCode; use tokio::task; use tracing::info; -use crate::compute::{ComputeNode, ParsedSpec}; +use crate::compute::ComputeNode; use crate::http::JsonResponse; use crate::http::extract::Json; @@ -22,11 +22,6 @@ pub(in crate::http) async fn configure( State(compute): State>, request: Json, ) -> Response { - let pspec = match ParsedSpec::try_from(request.0.spec) { - Ok(p) => p, - Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e), - }; - // XXX: wrap state update under lock in a code block. Otherwise, we will try // to `Send` `mut state` into the spawned thread bellow, which will cause // the following rustc error: @@ -43,7 +38,7 @@ pub(in crate::http) async fn configure( // configure request for tracing purposes. state.startup_span = Some(tracing::Span::current()); - state.pspec = Some(pspec); + state.spec = Some(request.spec.clone()); state.set_status(ComputeStatus::ConfigurationPending, &compute.state_changed); drop(state); } diff --git a/compute_tools/src/http/routes/extension_server.rs b/compute_tools/src/http/routes/extension_server.rs index e141a48b7f..60339a22e3 100644 --- a/compute_tools/src/http/routes/extension_server.rs +++ b/compute_tools/src/http/routes/extension_server.rs @@ -31,8 +31,7 @@ pub(in crate::http) async fn download_extension( let ext = { let state = compute.state.lock().unwrap(); - let pspec = state.pspec.as_ref().unwrap(); - let spec = &pspec.spec; + let spec = &state.spec.as_ref().unwrap(); let remote_extensions = match spec.remote_extensions.as_ref() { Some(r) => r, diff --git a/compute_tools/src/http/routes/mod.rs b/compute_tools/src/http/routes/mod.rs index 432e66a830..803a88d41d 100644 --- a/compute_tools/src/http/routes/mod.rs +++ b/compute_tools/src/http/routes/mod.rs @@ -21,14 +21,8 @@ impl From<&ComputeState> for ComputeStatusResponse { fn from(state: &ComputeState) -> Self { ComputeStatusResponse { start_time: state.start_time, - tenant: state - .pspec - .as_ref() - .map(|pspec| pspec.tenant_id.to_string()), - timeline: state - .pspec - .as_ref() - .map(|pspec| pspec.timeline_id.to_string()), + tenant: state.spec.as_ref().map(|spec| spec.tenant_id.to_string()), + timeline: state.spec.as_ref().map(|spec| spec.timeline_id.to_string()), status: state.status, last_active: state.last_active, error: state.error.clone(), diff --git a/compute_tools/src/lsn_lease.rs b/compute_tools/src/lsn_lease.rs index b4ec675ff4..d8f112a2ae 100644 --- a/compute_tools/src/lsn_lease.rs +++ b/compute_tools/src/lsn_lease.rs @@ -18,8 +18,8 @@ use crate::compute::ComputeNode; pub fn launch_lsn_lease_bg_task_for_static(compute: &Arc) { let (tenant_id, timeline_id, lsn) = { let state = compute.state.lock().unwrap(); - let spec = state.pspec.as_ref().expect("Spec must be set"); - match spec.spec.mode { + let spec = state.spec.as_ref().expect("Spec must be set"); + match spec.mode { ComputeMode::Static(lsn) => (spec.tenant_id, spec.timeline_id, lsn), _ => return, } @@ -58,7 +58,7 @@ fn lsn_lease_bg_task( "Request succeeded, sleeping for {} seconds", sleep_duration.as_secs() ); - compute.wait_timeout_while_pageserver_connstr_unchanged(sleep_duration); + compute.wait_timeout_while_pageservers_unchanged(sleep_duration); } } @@ -79,18 +79,11 @@ fn acquire_lsn_lease_with_retry( let configs = { let state = compute.state.lock().unwrap(); - let spec = state.pspec.as_ref().expect("spec must be set"); + let spec = state.spec.as_ref().expect("spec must be set"); - let conn_strings = spec.pageserver_connstr.split(','); - - conn_strings - .map(|connstr| { - let mut config = postgres::Config::from_str(connstr).expect("Invalid connstr"); - if let Some(storage_auth_token) = &spec.storage_auth_token { - config.password(storage_auth_token.clone()); - } - config - }) + spec.pageservers + .iter() + .map(|p| postgres::Config::from(p)) .collect::>() }; @@ -105,7 +98,7 @@ fn acquire_lsn_lease_with_retry( Err(e) => { warn!("Failed to acquire lsn lease: {e} (attempt {attempts})"); - compute.wait_timeout_while_pageserver_connstr_unchanged(Duration::from_millis( + compute.wait_timeout_while_pageservers_unchanged(Duration::from_millis( retry_period_ms as u64, )); retry_period_ms *= 1.5; diff --git a/compute_tools/src/spec_apply.rs b/compute_tools/src/spec_apply.rs index 0d1389dbad..76973ffdc6 100644 --- a/compute_tools/src/spec_apply.rs +++ b/compute_tools/src/spec_apply.rs @@ -4,7 +4,7 @@ use std::future::Future; use std::iter::{empty, once}; use std::sync::Arc; -use anyhow::{Context, Result}; +use anyhow::Result; use compute_api::responses::ComputeStatus; use compute_api::spec::{ComputeAudit, ComputeSpec, Database, PgIdent, Role}; use futures::future::join_all; @@ -74,7 +74,7 @@ impl ComputeNode { let mut drop_subscriptions_done = false; if spec.drop_subscriptions_before_start { - let timeline_id = self.get_timeline_id().context("timeline_id must be set")?; + let timeline_id = self.get_timeline_id(); info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id); diff --git a/compute_tools/src/sync_sk.rs b/compute_tools/src/sync_sk.rs index 22b7027b93..1951bcaed6 100644 --- a/compute_tools/src/sync_sk.rs +++ b/compute_tools/src/sync_sk.rs @@ -37,7 +37,7 @@ pub async fn ping_safekeeper( // Parse result info!("done with {}", id); - if let postgres::SimpleQueryMessage::Row(row) = &result[0] { + if let tokio_postgres::SimpleQueryMessage::Row(row) = &result[0] { use std::str::FromStr; let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse { flush_lsn: Lsn::from_str(row.get("flush_lsn").unwrap())?, diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 76e33e4bff..2593788ef8 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -1493,7 +1493,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res let conf = env.get_pageserver_conf(pageserver_id).unwrap(); let parsed = parse_host_port(&conf.listen_pg_addr).expect("Bad config"); ( - vec![(parsed.0, parsed.1.unwrap_or(5432))], + vec![compute_api::spec::Pageserver { + host: parsed.0, + port: parsed.1.unwrap_or(5432), + }], // If caller is telling us what pageserver to use, this is not a tenant which is // full managed by storage controller, therefore not sharded. DEFAULT_STRIPE_SIZE, @@ -1516,11 +1519,11 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .await?; } - anyhow::Ok(( - Host::parse(&shard.listen_pg_addr) + anyhow::Ok(compute_api::spec::Pageserver { + host: Host::parse(&shard.listen_pg_addr) .expect("Storage controller reported bad hostname"), - shard.listen_pg_port, - )) + port: shard.listen_pg_port, + }) }), ) .await?; @@ -1576,10 +1579,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?; let pageservers = if let Some(ps_id) = args.endpoint_pageserver_id { let pageserver = PageServerNode::from_env(env, env.get_pageserver_conf(ps_id)?); - vec![( - pageserver.pg_connection_config.host().clone(), - pageserver.pg_connection_config.port(), - )] + vec![compute_api::spec::Pageserver { + host: pageserver.pg_connection_config.host().clone(), + port: pageserver.pg_connection_config.port(), + }] } else { let storage_controller = StorageController::from_env(env); storage_controller @@ -1587,12 +1590,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res .await? .shards .into_iter() - .map(|shard| { - ( - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported malformed host"), - shard.listen_pg_port, - ) + .map(|shard| compute_api::spec::Pageserver { + host: Host::parse(&shard.listen_pg_addr) + .expect("Storage controller reported malformed host"), + port: shard.listen_pg_port, }) .collect::>() }; diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 774a0053f8..0ac12fe499 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -52,8 +52,8 @@ use compute_api::responses::{ ComputeConfig, ComputeCtlConfig, ComputeStatus, ComputeStatusResponse, TlsConfig, }; use compute_api::spec::{ - Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, - RemoteExtSpec, Role, + Cluster, ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, Database, Pageserver, PgIdent, + RemoteExtSpec, Role, Safekeeper, }; use jsonwebtoken::jwk::{ AlgorithmParameters, CommonParameters, EllipticCurve, Jwk, JwkSet, KeyAlgorithm, KeyOperations, @@ -606,29 +606,25 @@ impl Endpoint { } } - fn build_pageserver_connstr(pageservers: &[(Host, u16)]) -> String { - pageservers - .iter() - .map(|(host, port)| format!("postgresql://no_user@{host}:{port}")) - .collect::>() - .join(",") - } + fn safekeepers_from_nodes(&self, ids: Vec) -> Result> { + let mut s = Vec::new(); - /// Map safekeepers ids to the actual connection strings. - fn build_safekeepers_connstrs(&self, sk_ids: Vec) -> Result> { - let mut safekeeper_connstrings = Vec::new(); if self.mode == ComputeMode::Primary { - for sk_id in sk_ids { + for id in ids { let sk = self .env .safekeepers .iter() - .find(|node| node.id == sk_id) - .ok_or_else(|| anyhow!("safekeeper {sk_id} does not exist"))?; - safekeeper_connstrings.push(format!("127.0.0.1:{}", sk.get_compute_port())); + .find(|node| node.id == id) + .ok_or_else(|| anyhow!("safekeeper {id} does not exist"))?; + s.push(Safekeeper { + host: Host::parse("127.0.0.1")?, + port: sk.get_compute_port(), + }); } } - Ok(safekeeper_connstrings) + + Ok(s) } /// Generate a JWT with the correct claims. @@ -654,7 +650,7 @@ impl Endpoint { endpoint_storage_addr: String, safekeepers_generation: Option, safekeepers: Vec, - pageservers: Vec<(Host, u16)>, + pageservers: Vec, remote_ext_base_url: Option<&String>, shard_stripe_size: usize, create_test_user: bool, @@ -672,11 +668,6 @@ impl Endpoint { std::fs::remove_dir_all(self.pgdata())?; } - let pageserver_connstring = Self::build_pageserver_connstr(&pageservers); - assert!(!pageserver_connstring.is_empty()); - - let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?; - // check for file remote_extensions_spec.json // if it is present, read it and pass to compute_ctl let remote_extensions_spec_path = self.endpoint_path().join("remote_extensions_spec.json"); @@ -727,15 +718,34 @@ impl Endpoint { postgresql_conf: Some(postgresql_conf.clone()), }, delta_operations: None, - tenant_id: Some(self.tenant_id), - timeline_id: Some(self.timeline_id), - project_id: None, - branch_id: None, - endpoint_id: Some(self.endpoint_id.clone()), + tenant_id: self.tenant_id.clone(), + timeline_id: self.timeline_id.clone(), + project_id: self.tenant_id.to_string(), + branch_id: self.timeline_id.to_string(), + endpoint_id: self.endpoint_id.clone(), mode: self.mode, - pageserver_connstring: Some(pageserver_connstring), + pageservers, + safekeepers: { + let mut s = Vec::new(); + + if self.mode == ComputeMode::Primary { + for id in safekeepers { + let sk = self + .env + .safekeepers + .iter() + .find(|node| node.id == id) + .ok_or_else(|| anyhow!("safekeeper {id} does not exist"))?; + s.push(Safekeeper { + host: Host::parse("127.0.0.1")?, + port: sk.get_compute_port(), + }); + } + } + + s + }, safekeepers_generation: safekeepers_generation.map(|g| g.into_inner()), - safekeeper_connstrings, storage_auth_token: auth_token.clone(), remote_extensions, pgbouncer_settings: None, @@ -939,7 +949,7 @@ impl Endpoint { pub async fn reconfigure( &self, - mut pageservers: Vec<(Host, u16)>, + pageservers: Vec, stripe_size: Option, safekeepers: Option>, ) -> Result<()> { @@ -958,30 +968,24 @@ impl Endpoint { if pageservers.is_empty() { let storage_controller = StorageController::from_env(&self.env); let locate_result = storage_controller.tenant_locate(self.tenant_id).await?; - pageservers = locate_result + spec.pageservers = locate_result .shards .into_iter() - .map(|shard| { - ( - Host::parse(&shard.listen_pg_addr) - .expect("Storage controller reported bad hostname"), - shard.listen_pg_port, - ) + .map(|shard| Pageserver { + host: Host::parse(&shard.listen_pg_addr) + .expect("Storage controller reported bad hostname"), + port: shard.listen_pg_port, }) .collect::>(); } - let pageserver_connstr = Self::build_pageserver_connstr(&pageservers); - assert!(!pageserver_connstr.is_empty()); - spec.pageserver_connstring = Some(pageserver_connstr); if stripe_size.is_some() { spec.shard_stripe_size = stripe_size.map(|s| s.0 as usize); } // If safekeepers are not specified, don't change them. if let Some(safekeepers) = safekeepers { - let safekeeper_connstrings = self.build_safekeepers_connstrs(safekeepers)?; - spec.safekeeper_connstrings = safekeeper_connstrings; + spec.safekeepers = self.safekeepers_from_nodes(safekeepers)?; } let client = reqwest::Client::builder() diff --git a/libs/compute_api/Cargo.toml b/libs/compute_api/Cargo.toml index 81b0cd19a1..5216c05ced 100644 --- a/libs/compute_api/Cargo.toml +++ b/libs/compute_api/Cargo.toml @@ -9,8 +9,11 @@ anyhow.workspace = true chrono.workspace = true indexmap.workspace = true jsonwebtoken.workspace = true +postgres.workspace = true serde.workspace = true serde_json.workspace = true +tokio-postgres.workspace = true +url.workspace = true regex.workspace = true utils = { path = "../utils" } diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 0e23b70265..cacc5bc907 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -9,7 +9,8 @@ use indexmap::IndexMap; use regex::Regex; use remote_storage::RemotePath; use serde::{Deserialize, Serialize}; -use utils::id::{TenantId, TimelineId}; +use url::Host; +use utils::id::{BranchId, EndpointId, ProjectId, TenantId, TimelineId}; use utils::lsn::Lsn; use crate::responses::TlsConfig; @@ -21,13 +22,77 @@ pub type PgIdent = String; /// String type alias representing Postgres extension version pub type ExtVersion = String; +/// Pageserver settings. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct Pageserver { + /// Hostname of the pageserver. + pub host: Host, + + /// Port that the safekeeper listens on. + pub port: u16, +} + +impl From<&Pageserver> for postgres::Config { + fn from(ps: &Pageserver) -> Self { + let mut config = postgres::Config::new(); + + config.host(&ps.host.to_string()); + config.port(ps.port); + + config + } +} + +impl From<&Pageserver> for tokio_postgres::Config { + fn from(ps: &Pageserver) -> Self { + let mut config = tokio_postgres::Config::new(); + + config.host(&ps.host.to_string()); + config.port(ps.port); + + config + } +} + +/// Safekeeper settings. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct Safekeeper { + /// Hostname of the safekeeper. + pub host: Host, + + /// Port that the safekeeper listens on. + pub port: u16, +} + +impl From<&Safekeeper> for postgres::Config { + fn from(sk: &Safekeeper) -> Self { + let mut config = postgres::Config::new(); + + config.host(&sk.host.to_string()); + config.port(sk.port); + + config + } +} + +impl From<&Safekeeper> for tokio_postgres::Config { + fn from(sk: &Safekeeper) -> Self { + let mut config = tokio_postgres::Config::new(); + + config.host(&sk.host.to_string()); + config.port(sk.port); + + config + } +} + fn default_reconfigure_concurrency() -> usize { 1 } /// Cluster spec or configuration represented as an optional number of /// delta operations + final cluster state description. -#[derive(Clone, Debug, Default, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct ComputeSpec { pub format_version: f32, @@ -90,25 +155,13 @@ pub struct ComputeSpec { // Information needed to connect to the storage layer. // - // `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed. - // // Depending on `mode`, this can be a primary read-write node, a read-only // replica, or a read-only node pinned at an older LSN. // `safekeeper_connstrings` must be set for a primary. - // - // For backwards compatibility, the control plane may leave out all of - // these, and instead set the "neon.tenant_id", "neon.timeline_id", - // etc. GUCs in cluster.settings. TODO: Once the control plane has been - // updated to fill these fields, we can make these non optional. - pub tenant_id: Option, - pub timeline_id: Option, - pub pageserver_connstring: Option, + pub pageservers: Vec, - // More neon ids that we expose to the compute_ctl - // and to postgres as neon extension GUCs. - pub project_id: Option, - pub branch_id: Option, - pub endpoint_id: Option, + #[serde(default)] + pub safekeepers_generation: Option, /// Safekeeper membership config generation. It is put in /// neon.safekeepers GUC and serves two purposes: @@ -120,9 +173,18 @@ pub struct ComputeSpec { /// Note: it could be SafekeeperGeneration, but this needs linking /// compute_ctl with postgres_ffi. #[serde(default)] - pub safekeepers_generation: Option, - #[serde(default)] - pub safekeeper_connstrings: Vec, + pub safekeepers: Vec, + + /// The Neon tenant ID. Exposed to Postgres as `neon.tenant_id`. + pub tenant_id: TenantId, + /// The Neon timeline ID. Exposed to Postgres as `neon.timeline_id`. + pub timeline_id: TimelineId, + /// The Neon project ID. Exposed to Postgres as `neon.project_id`. + pub project_id: ProjectId, + /// The Neon branch ID. Exposed to Postgres as `neon.branch_id`. + pub branch_id: BranchId, + /// The Neon endpoint ID. Exposed to Postgres as `neon.endpoint_id`. + pub endpoint_id: EndpointId, #[serde(default)] pub mode: ComputeMode, diff --git a/libs/utils/src/id.rs b/libs/utils/src/id.rs index 68cb1f0209..8fa9a4e006 100644 --- a/libs/utils/src/id.rs +++ b/libs/utils/src/id.rs @@ -295,7 +295,11 @@ pub struct TenantId(Id); id_newtype!(TenantId); -/// If needed, reuse small string from proxy/src/types.rc +/// Type representing a project ID. +pub type ProjectId = String; +/// Type representing a branch ID. +pub type BranchId = String; +/// Type representing an endpoint ID. pub type EndpointId = String; // A pair uniquely identifying Neon instance. diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index c41e174d9d..c5b117daa7 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -65,8 +65,9 @@ diesel-async = { version = "0.5.2", features = ["postgres", "bb8", "async-connec diesel_migrations = { version = "2.2.0" } scoped-futures = "0.1.4" +compute_api = { path = "../libs/compute_api/" } http-utils = { path = "../libs/http-utils/" } utils = { path = "../libs/utils/" } metrics = { path = "../libs/metrics/" } control_plane = { path = "../control_plane" } -workspace_hack = { version = "0.1", path = "../workspace_hack" } \ No newline at end of file +workspace_hack = { version = "0.1", path = "../workspace_hack" } diff --git a/storage_controller/src/compute_hook.rs b/storage_controller/src/compute_hook.rs index 57709302e1..6c7e974ed3 100644 --- a/storage_controller/src/compute_hook.rs +++ b/storage_controller/src/compute_hook.rs @@ -428,7 +428,10 @@ impl ComputeHook { .expect("Unknown pageserver"); let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr) .expect("Unable to parse listen_pg_addr"); - (pg_host, pg_port.unwrap_or(5432)) + compute_api::spec::Pageserver { + host: pg_host, + port: pg_port.unwrap_or(5432), + } }) .collect::>();