From bbd646325ccb8b75311f99be4a69ea31e86ee04e Mon Sep 17 00:00:00 2001 From: Tristan Partin Date: Wed, 3 Apr 2024 16:15:41 -0500 Subject: [PATCH] Add match arm for unused builtin resource managers Although we don't currently handle these, they are much different from an unrecognized resource manager, which the comment in the last match arm refers to. --- compute_tools/src/bin/compute_ctl.rs | 176 ++++++++++++-------------- compute_tools/src/catalog.rs | 2 +- compute_tools/src/compute.rs | 4 +- control_plane/src/endpoint.rs | 2 +- libs/postgres_ffi/src/pg_constants.rs | 9 ++ pageserver/src/walingest.rs | 1 - pgxn/neon/neon.c | 2 +- safekeeper/src/safekeeper.rs | 18 +-- test.sh | 12 ++ vendor/postgres-v15 | 2 +- vendor/postgres-v16 | 2 +- 11 files changed, 119 insertions(+), 111 deletions(-) create mode 100755 test.sh diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index d784941690..41a575ac15 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -44,7 +44,6 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; use clap::{Arg, ArgAction}; -use compute_api::spec::ComputeMode; use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; @@ -52,7 +51,7 @@ use tracing::{error, info, warn}; use url::Url; use compute_api::responses::ComputeStatus; -use compute_api::spec::ComputeSpec; +use compute_api::spec::{ComputeMode, ComputeSpec}; use compute_tools::compute::{ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, @@ -77,7 +76,7 @@ async fn main() -> Result<()> { // enable core dumping for all child processes setrlimit(Resource::CORE, rlimit::INFINITY, rlimit::INFINITY)?; - let (pg_handle, start_pg_result) = { + let (http_handle, (pg_handle, start_pg_result)) = { // Enter startup tracing context let _startup_context_guard = startup_context_from_env(); @@ -85,13 +84,61 @@ async fn main() -> Result<()> { let cli_spec = try_spec_from_cli(&clap_args, &cli_args)?; - let wait_spec_result = wait_spec(build_tag, cli_args, cli_spec)?; + let compute = Arc::new(ComputeNode { + connstr: Url::parse(cli_args.connstr).context("cannot parse connstr as a URL")?, + pgdata: cli_args.pgdata.to_string(), + pgroot: cli_args.pgroot.to_string(), + pgversion: cli_args.pgversion.to_string(), + http_port: cli_args.http_port, + live_config_allowed: cli_spec.live_config_allowed, + state: Mutex::new({ + let mut state = ComputeState::new(); - start_postgres(&clap_args, wait_spec_result).await? + if let Some(spec) = cli_spec.spec { + let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; + info!("new pspec.spec: {:?}", pspec.spec); + state.pspec = Some(pspec); + } + + state + }), + state_changed: Condvar::new(), + ext_remote_storage: cli_args.ext_remote_storage.map(|s| s.to_string()), + ext_download_progress: RwLock::new(HashMap::new()), + build_tag: build_tag.clone(), + }); + + // If this is a pooled VM, prewarm before starting HTTP server and becoming + // available for binding. Prewarming helps Postgres start quicker later, + // because QEMU will already have its memory allocated from the host, and + // the necessary binaries will already be cached. + if compute.state.lock().unwrap().pspec.is_none() { + compute.prewarm_postgres()?; + } + + // Launch http service first, so that we can serve control-plane requests + // while configuration is still in progress. + let http_handle = launch_http_server(cli_args.http_port, &compute) + .expect("cannot launch http endpoint thread"); + + wait_spec(&compute)?; + + ( + http_handle, + start_postgres( + compute, + #[cfg(target_os = "linux")] + &clap_args, + cli_args.resize_swap_on_bind, + ) + .await?, + ) // Startup is finished, exit the startup tracing span }; + let _ = http_handle.join(); + // PostgreSQL is now running, if startup was successful. Wait until it exits. let wait_pg_result = wait_postgres(pg_handle)?; @@ -291,63 +338,8 @@ struct CliSpecParams { live_config_allowed: bool, } -fn wait_spec( - build_tag: String, - ProcessCliResult { - connstr, - pgroot, - pgversion, - pgdata, - ext_remote_storage, - resize_swap_on_bind, - http_port, - .. - }: ProcessCliResult, - CliSpecParams { - spec, - live_config_allowed, - }: CliSpecParams, -) -> Result { - let mut new_state = ComputeState::new(); - let spec_set; - - if let Some(spec) = spec { - let pspec = ParsedSpec::try_from(spec).map_err(|msg| anyhow::anyhow!(msg))?; - info!("new pspec.spec: {:?}", pspec.spec); - new_state.pspec = Some(pspec); - spec_set = true; - } else { - spec_set = false; - } - let compute_node = ComputeNode { - connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, - pgdata: pgdata.to_string(), - pgroot: pgroot.to_string(), - pgversion: pgversion.to_string(), - http_port, - live_config_allowed, - state: Mutex::new(new_state), - state_changed: Condvar::new(), - ext_remote_storage: ext_remote_storage.map(|s| s.to_string()), - ext_download_progress: RwLock::new(HashMap::new()), - build_tag, - }; - let compute = Arc::new(compute_node); - - // If this is a pooled VM, prewarm before starting HTTP server and becoming - // available for binding. Prewarming helps Postgres start quicker later, - // because QEMU will already have its memory allocated from the host, and - // the necessary binaries will already be cached. - if !spec_set { - compute.prewarm_postgres()?; - } - - // Launch http service first, so that we can serve control-plane requests - // while configuration is still in progress. - let http_handle = - launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); - - if !spec_set { +fn wait_spec(compute: &Arc) -> Result<()> { + if compute.state.lock().unwrap().pspec.is_none() { // No spec provided, hang waiting for it. info!("no compute spec provided, waiting"); @@ -377,24 +369,13 @@ fn wait_spec( launch_lsn_lease_bg_task_for_static(&compute); - Ok(WaitSpecResult { - compute, - resize_swap_on_bind, - }) -} - -struct WaitSpecResult { - compute: Arc, - resize_swap_on_bind: bool, + Ok(()) } async fn start_postgres( - // need to allow unused because `matches` is only used if target_os = "linux" - #[allow(unused_variables)] matches: &clap::ArgMatches, - WaitSpecResult { - compute, - resize_swap_on_bind, - }: WaitSpecResult, + compute: Arc, + #[cfg(target_os = "linux")] matches: &clap::ArgMatches, + resize_swap_on_bind: bool, ) -> Result<(Option, StartPostgresResult)> { // We got all we need, update the state. let mut state = compute.state.lock().unwrap(); @@ -445,25 +426,32 @@ async fn start_postgres( } } - // Start Postgres + compute.prepare_compute().await?; + let mut pg = None; if !prestartup_failed { - pg = match compute.start_compute() { - Ok(pg) => Some(pg), - Err(err) => { - error!("could not start the compute node: {:#}", err); - let mut state = compute.state.lock().unwrap(); - state.error = Some(format!("{:?}", err)); - state.status = ComputeStatus::Failed; - // Notify others that Postgres failed to start. In case of configuring the - // empty compute, it's likely that API handler is still waiting for compute - // state change. With this we will notify it that compute is in Failed state, - // so control plane will know about it earlier and record proper error instead - // of timeout. - compute.state_changed.notify_all(); - drop(state); // unlock - delay_exit = true; - None + match compute.get_mode() { + ComputeMode::Upgrade => {} + _ => { + // Start Postgres + pg = match compute.start_compute() { + Ok(pg) => Some(pg), + Err(err) => { + error!("could not start the compute node: {:#}", err); + let mut state = compute.state.lock().unwrap(); + state.error = Some(format!("{:?}", err)); + state.status = ComputeStatus::Failed; + // Notify others that Postgres failed to start. In case of configuring the + // empty compute, it's likely that API handler is still waiting for compute + // state change. With this we will notify it that compute is in Failed state, + // so control plane will know about it earlier and record proper error instead + // of timeout. + compute.state_changed.notify_all(); + drop(state); // unlock + delay_exit = true; + None + } + } } }; } else { diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 95d37b1392..1040b7682d 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -4,7 +4,7 @@ use compute_api::{ }; use futures::Stream; use postgres::{Client, NoTls}; -use std::{path::Path, process::Stdio, result::Result, sync::Arc}; +use std::{process::Stdio, result::Result, sync::Arc}; use tokio::{ io::{AsyncBufReadExt, BufReader}, process::Command, diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index c5a01ba489..86960b5db2 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -943,7 +943,7 @@ impl ComputeNode { )?; // Step 7: Write the tarball into the Postgres WAL - info!("Writing initdb.tar.zst to WAL"); + info!("Writing {} to WAL", initdb_tar_path.as_str()); let postgres_bin = self.get_my_pg_binary("postgres"); let mut wal_log_cmd = Command::new(&postgres_bin); @@ -959,7 +959,7 @@ impl ComputeNode { match child.wait() { Ok(s) => { if !s.success() { - return Err(anyhow::anyhow!("Could not wal log upgrade tarball")); + return Err(anyhow::anyhow!("Could not WAL log upgrade tarball")); } } Err(e) => return Err(e.into()), diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 249f2e2831..604b466c93 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -504,7 +504,7 @@ impl Endpoint { /// Map safekeepers ids to the actual connection strings. fn build_safekeepers_connstrs(&self, sk_ids: Vec) -> Result> { let mut safekeeper_connstrings = Vec::new(); - if self.mode == ComputeMode::Primary { + if matches!(self.mode, ComputeMode::Primary | ComputeMode::Upgrade) { for sk_id in sk_ids { let sk = self .env diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index a4859cecb1..9384845176 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -167,7 +167,16 @@ pub const RM_RELMAP_ID: u8 = 7; pub const RM_STANDBY_ID: u8 = 8; pub const RM_HEAP2_ID: u8 = 9; pub const RM_HEAP_ID: u8 = 10; +pub const RM_BTREE_ID: u8 = 11; +pub const RM_HASH_ID: u8 = 12; +pub const RM_GIN_ID: u8 = 13; +pub const RM_GIST_ID: u8 = 14; +pub const RM_SEQ_ID: u8 = 15; +pub const RM_SPGIST_ID: u8 = 16; +pub const RM_BRIN_ID: u8 = 17; +pub const RM_COMMIT_TS_ID: u8 = 18; pub const RM_REPLORIGIN_ID: u8 = 19; +pub const RM_GENERIC_ID: u8 = 20; pub const RM_LOGICALMSG_ID: u8 = 21; // from neon_rmgr.h diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 49e9518b8a..b1314c2f52 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -473,7 +473,6 @@ impl<'t> WalIngest<'t> { | pg_constants::RM_SPGIST_ID | pg_constants::RM_BRIN_ID | pg_constants::RM_COMMIT_TS_ID - | pg_constants::RM_REPLORIGIN_ID | pg_constants::RM_GENERIC_ID => { // No special handling currently for these resource managers } diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index a93d921f45..1eb0e02dbe 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -838,7 +838,7 @@ wal_log_file(PG_FUNCTION_ARGS) * Entry point for `postgres --wal-log`. */ PGDLLEXPORT void -WalLogMain(int argc, char *argv[]) +WalLog(int argc, char *argv[]) { int rc; int fd; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index b3e006ab05..53c8fba081 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -580,15 +580,15 @@ where * because safekeepers parse WAL headers and the format * may change between versions. */ - if msg.pg_version / 10000 != self.state.server.pg_version / 10000 - && self.state.server.pg_version != UNKNOWN_SERVER_VERSION - { - bail!( - "incompatible server version {}, expected {}", - msg.pg_version, - self.state.server.pg_version - ); - } + // if msg.pg_version / 10000 != self.state.server.pg_version / 10000 + // && self.state.server.pg_version != UNKNOWN_SERVER_VERSION + // { + // bail!( + // "incompatible server version {}, expected {}", + // msg.pg_version, + // self.state.server.pg_version + // ); + // } if msg.tenant_id != self.state.tenant_id { bail!( diff --git a/test.sh b/test.sh new file mode 100755 index 0000000000..c62c6bcf2d --- /dev/null +++ b/test.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +cargo neon endpoint stop ep-main 2>/dev/null +kill $(pgrep compute_ctl) +cargo neon stop 2>/dev/null +rm -rf .neon +cargo neon init +cargo neon start +cargo neon tenant create --pg-version 15 --set-default +cargo neon endpoint create --pg-version 15 --upgrade-only +cargo neon endpoint start ep-main +curl -i -X POST http://localhost:55433/upgrade -d '{"pg_version": "16"}' diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 6840d2616e..732b361914 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 6840d2616ef202e733dd9b5b260abab146f8ec36 +Subproject commit 732b36191454de0d522db47f67c694778875029b diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 737cd9c696..3f8405a0d1 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 737cd9c696cdb92ff59600ab8eddff0c3d38da0c +Subproject commit 3f8405a0d1b8c23b65ae04cd7809e4a4dfaad2d0