diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index d784941690..41a575ac15 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -44,7 +44,6 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::{Arg, ArgAction}; -use compute_api::spec::ComputeMode; use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; @@ -52,7 +51,7 @@ use tracing::{error, info, warn}; use url::Url; use compute_api::responses::ComputeStatus; -use compute_api::spec::ComputeSpec; +use compute_api::spec::{ComputeMode, ComputeSpec}; use compute_tools::compute::{ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, @@ -77,7 +76,7 @@ async fn main() -> Result<()> { // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; - let (pg_handle, start_pg_result) = { + let (http_handle, (pg_handle, start_pg_result)) = { // Enter startup tracing context let _startup_context_guard = startup_context_from_env(); @@ -85,13 +84,61 @@ async fn main() -> Result<()> { let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?; - let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?; + let compute = Arc::new(ComputeNode { + connstr: Url::parse(cli_args.connstr).context("cannot parse connstr as a URL")?, + pgdata: cli_args.pgdata.to_string(), + pgroot: cli_args.pgroot.to_string(), + pgversion: cli_args.pgversion.to_string(), + http_port: cli_args.http_port, + live_config_allowed: cli_spec.live_config_allowed, + state: Mutex::new({ + let mut state = ComputeState::new(); - start_postgres(&clap_args, wait_spec_result).await? + if let Some(spec) = cli_spec.spec { + let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; + info!("new pspec.spec: {:?}", pspec.spec); + state.pspec = Some(pspec); + } + + state + }), + state_changed: Condvar::new(), + ext_remote_storage: cli_args.ext_remote_storage.map(|s| s.to_string()), + ext_download_progress: RwLock::new(HashMap::new()), + build_tag: build_tag.clone(), + }); + + // If this is a pooled VM, prewarm before starting HTTP server and becoming + // available for binding. Prewarming helps Postgres start quicker later, + // because QEMU will already have its memory allocated from the host, and + // the necessary binaries will already be cached. + if compute.state.lock().unwrap().pspec.is_none() { + compute.prewarm_postgres()?; + } + + // Launch http service first, so that we can serve control-plane requests + // while configuration is still in progress. + let http_handle = launch_http_server(cli_args.http_port, &compute) + .expect("cannot launch http endpoint thread"); + + wait_spec(&compute)?; + + ( + http_handle, + start_postgres( + compute, + #[cfg(target_os = "linux")] + &clap_args, + cli_args.resize_swap_on_bind, + ) + .await?, + ) // Startup is finished, exit the startup tracing span }; + let _ = http_handle.join(); + // PostgreSQL is now running, if startup was successful. Wait until it exits. let wait_pg_result = wait_postgres(pg_handle)?; @@ -291,63 +338,8 @@ struct CliSpecParams { live_config_allowed: bool, } -fn wait_spec( - build_tag: String, - ProcessCliResult { - connstr, - pgroot, - pgversion, - pgdata, - ext_remote_storage, - resize_swap_on_bind, - http_port, - .. - }: ProcessCliResult, - CliSpecParams { - spec, - live_config_allowed, - }: CliSpecParams, -) -> Result { - let mut new_state = ComputeState::new(); - let spec_set; - - if let Some(spec) = spec { - let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - info!("new pspec.spec: {:?}", pspec.spec); - new_state.pspec = Some(pspec); - spec_set = true; - } else { - spec_set = false; - } - let compute_node = ComputeNode { - connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, - pgdata: pgdata.to_string(), - pgroot: pgroot.to_string(), - pgversion: pgversion.to_string(), - http_port, - live_config_allowed, - state: Mutex::new(new_state), - state_changed: Condvar::new(), - ext_remote_storage: ext_remote_storage.map(|s| s.to_string()), - ext_download_progress: RwLock::new(HashMap::new()), - build_tag, - }; - let compute = Arc::new(compute_node); - - // If this is a pooled VM, prewarm before starting HTTP server and becoming - // available for binding. Prewarming helps Postgres start quicker later, - // because QEMU will already have its memory allocated from the host, and - // the necessary binaries will already be cached. - if !spec_set { - compute.prewarm_postgres()?; - } - - // Launch http service first, so that we can serve control-plane requests - // while configuration is still in progress. - let http_handle = - launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); - - if !spec_set { +fn wait_spec(compute: &Arc) -> Result<()> { + if compute.state.lock().unwrap().pspec.is_none() { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -377,24 +369,13 @@ fn wait_spec( launch_lsn_lease_bg_task_for_static(&compute); - Ok(WaitSpecResult { - compute, - resize_swap_on_bind, - }) -} - -struct WaitSpecResult { - compute: Arc, - resize_swap_on_bind: bool, + Ok(()) } async fn start_postgres( - // need to allow unused because `matches` is only used if target_os = "linux" - #[allow(unused_variables)] matches: &clap::ArgMatches, - WaitSpecResult { - compute, - resize_swap_on_bind, - }: WaitSpecResult, + compute: Arc, + #[cfg(target_os = "linux")] matches: &clap::ArgMatches, + resize_swap_on_bind: bool, ) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); @@ -445,25 +426,32 @@ async fn start_postgres( } } - // Start Postgres + compute.prepare_compute().await?; + let mut pg = None; if !prestartup_failed { - pg = match compute.start_compute() { - Ok(pg) => Some(pg), - Err(err) => { - error!("could not start the compute node: {:#}", err); - let mut state = compute.state.lock().unwrap(); - state.error = Some(format!("{:?}", err)); - state.status = ComputeStatus::Failed; - // Notify others that Postgres failed to start. In case of configuring the - // empty compute, it's likely that API handler is still waiting for compute - // state change. With this we will notify it that compute is in Failed state, - // so control plane will know about it earlier and record proper error instead - // of timeout. - compute.state_changed.notify_all(); - drop(state); // unlock - delay_exit = true; - None + match compute.get_mode() { + ComputeMode::Upgrade => {} + _ => { + // Start Postgres + pg = match compute.start_compute() { + Ok(pg) => Some(pg), + Err(err) => { + error!("could not start the compute node: {:#}", err); + let mut state = compute.state.lock().unwrap(); + state.error = Some(format!("{:?}", err)); + state.status = ComputeStatus::Failed; + // Notify others that Postgres failed to start. In case of configuring the + // empty compute, it's likely that API handler is still waiting for compute + // state change. With this we will notify it that compute is in Failed state, + // so control plane will know about it earlier and record proper error instead + // of timeout. + compute.state_changed.notify_all(); + drop(state); // unlock + delay_exit = true; + None + } + } } }; } else { diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 95d37b1392..1040b7682d 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -4,7 +4,7 @@ use compute_api::{ }; use futures::Stream; use postgres::{Client, NoTls}; -use std::{path::Path, process::Stdio, result::Result, sync::Arc}; +use std::{process::Stdio, result::Result, sync::Arc}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c5a01ba489..86960b5db2 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -943,7 +943,7 @@ impl ComputeNode { )?; // Step 7: Write the tarball into the Postgres WAL - info!("Writing initdb.tar.zst to WAL"); + info!("Writing {} to WAL", initdb_tar_path.as_str()); let postgres_bin = self.get_my_pg_binary("postgres"); let mut wal_log_cmd = Command::new(&postgres_bin); @@ -959,7 +959,7 @@ impl ComputeNode { match child.wait() { Ok(s) => { if !s.success() { - return Err(anyhow::anyhow!("Could not wal log upgrade tarball")); + return Err(anyhow::anyhow!("Could not WAL log upgrade tarball")); } } Err(e) => return Err(e.into()), diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 249f2e2831..604b466c93 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -504,7 +504,7 @@ impl Endpoint { /// Map safekeepers ids to the actual connection strings. fn build_safekeepers_connstrs(&self, sk_ids: Vec) -> Result> { let mut safekeeper_connstrings = Vec::new(); - if self.mode == ComputeMode::Primary { + if matches!(self.mode, ComputeMode::Primary | ComputeMode::Upgrade) { for sk_id in sk_ids { let sk = self .env diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index a4859cecb1..9384845176 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -167,7 +167,16 @@ pub const RM_RELMAP_ID: u8 = 7; pub const RM_STANDBY_ID: u8 = 8; pub const RM_HEAP2_ID: u8 = 9; pub const RM_HEAP_ID: u8 = 10; +pub const RM_BTREE_ID: u8 = 11; +pub const RM_HASH_ID: u8 = 12; +pub const RM_GIN_ID: u8 = 13; +pub const RM_GIST_ID: u8 = 14; +pub const RM_SEQ_ID: u8 = 15; +pub const RM_SPGIST_ID: u8 = 16; +pub const RM_BRIN_ID: u8 = 17; +pub const RM_COMMIT_TS_ID: u8 = 18; pub const RM_REPLORIGIN_ID: u8 = 19; +pub const RM_GENERIC_ID: u8 = 20; pub const RM_LOGICALMSG_ID: u8 = 21; // from neon_rmgr.h diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 49e9518b8a..b1314c2f52 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -473,7 +473,6 @@ impl<'t> WalIngest<'t> { | pg_constants::RM_SPGIST_ID | pg_constants::RM_BRIN_ID | pg_constants::RM_COMMIT_TS_ID - | pg_constants::RM_REPLORIGIN_ID | pg_constants::RM_GENERIC_ID => { // No special handling currently for these resource managers } diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index a93d921f45..1eb0e02dbe 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -838,7 +838,7 @@ wal_log_file(PG_FUNCTION_ARGS) * Entry point for `postgres --wal-log`. */ PGDLLEXPORT void -WalLogMain(int argc, char *argv[]) +WalLog(int argc, char *argv[]) { int rc; int fd; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index b3e006ab05..53c8fba081 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -580,15 +580,15 @@ where * because safekeepers parse WAL headers and the format * may change between versions. */ - if msg.pg_version / 10000 != self.state.server.pg_version / 10000 - && self.state.server.pg_version != UNKNOWN_SERVER_VERSION - { - bail!( - "incompatible server version {}, expected {}", - msg.pg_version, - self.state.server.pg_version - ); - } + // if msg.pg_version / 10000 != self.state.server.pg_version / 10000 + // && self.state.server.pg_version != UNKNOWN_SERVER_VERSION + // { + // bail!( + // "incompatible server version {}, expected {}", + // msg.pg_version, + // self.state.server.pg_version + // ); + // } if msg.tenant_id != self.state.tenant_id { bail!( diff --git a/test.sh b/test.sh new file mode 100755 index 0000000000..c62c6bcf2d --- /dev/null +++ b/test.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +cargo neon endpoint stop ep-main 2>/dev/null +kill $(pgrep compute_ctl) +cargo neon stop 2>/dev/null +rm -rf .neon +cargo neon init +cargo neon start +cargo neon tenant create --pg-version 15 --set-default +cargo neon endpoint create --pg-version 15 --upgrade-only +cargo neon endpoint start ep-main +curl -i -X POST http://localhost:55433/upgrade -d '{"pg_version": "16"}' diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 6840d2616e..732b361914 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 6840d2616ef202e733dd9b5b260abab146f8ec36 +Subproject commit 732b36191454de0d522db47f67c694778875029b diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 737cd9c696..3f8405a0d1 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 737cd9c696cdb92ff59600ab8eddff0c3d38da0c +Subproject commit 3f8405a0d1b8c23b65ae04cd7809e4a4dfaad2d0