diff --git a/.gitmodules b/.gitmodules index 1d925674a1..055c9e8079 100644 --- a/.gitmodules +++ b/.gitmodules @@ -5,8 +5,8 @@ [submodule "vendor/postgres-v15"] path = vendor/postgres-v15 url = https://github.com/neondatabase/postgres.git - branch = REL_15_STABLE_neon + branch = tristan957/15/pg_upgrade [submodule "vendor/postgres-v16"] path = vendor/postgres-v16 url = https://github.com/neondatabase/postgres.git - branch = REL_16_STABLE_neon + branch = tristan957/pg_upgrade diff --git a/Cargo.lock b/Cargo.lock index 3ca6acbc3e..28b2a8a08d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1220,6 +1220,7 @@ dependencies = [ "anyhow", "async-compression", "bytes", + "camino", "cfg-if", "chrono", "clap", @@ -1237,6 +1238,7 @@ dependencies = [ "reqwest 0.12.4", "rlimit", "rust-ini", + "scopeguard", "serde", "serde_json", "signal-hook", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 8af0ed43ce..ec4785efeb 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -12,6 +12,7 @@ testing = [] [dependencies] anyhow.workspace = true async-compression.workspace = true +camino.workspace = true chrono.workspace = true cfg-if.workspace = true clap.workspace = true @@ -24,6 +25,7 @@ num_cpus.workspace = true opentelemetry.workspace = true postgres.workspace = true regex.workspace = true +scopeguard.workspace = true serde.workspace = true serde_json.workspace = true signal-hook.workspace = true diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 639381f2e1..d784941690 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -43,7 +43,8 @@ use std::{thread, time::Duration}; use anyhow::{Context, Result}; use chrono::Utc; -use clap::Arg; +use clap::{Arg, ArgAction}; +use compute_api::spec::ComputeMode; use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static; use signal_hook::consts::{SIGQUIT, SIGTERM}; use signal_hook::{consts::SIGINT, iterator::Signals}; @@ -57,7 +58,6 @@ use compute_tools::compute::{ forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID, }; use compute_tools::configurator::launch_configurator; -use compute_tools::extension_server::get_pg_version; use compute_tools::http::api::launch_http_server; use compute_tools::logger::*; use compute_tools::monitor::launch_monitor; @@ -121,11 +121,14 @@ fn init() -> Result<(String, clap::ArgMatches)> { } fn process_cli(matches: &clap::ArgMatches) -> Result { - let pgbin_default = "postgres"; - let pgbin = matches - .get_one::("pgbin") + let pgroot = matches + .get_one::("pgroot") .map(|s| s.as_str()) - .unwrap_or(pgbin_default); + .expect("pgroot is required"); + let pgversion = matches + .get_one::("pgversion") + .map(|s| s.as_str()) + .expect("pgversion is required"); let ext_remote_storage = matches .get_one::("remote-ext-config") @@ -156,7 +159,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result { Ok(ProcessCliResult { connstr, pgdata, - pgbin, + pgroot, + pgversion, ext_remote_storage, http_port, spec_json, @@ -168,7 +172,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result { struct ProcessCliResult<'clap> { connstr: &'clap str, pgdata: &'clap str, - pgbin: &'clap str, + pgroot: &'clap str, + pgversion: &'clap str, ext_remote_storage: Option<&'clap str>, http_port: u16, spec_json: Option<&'clap String>, @@ -290,8 +295,9 @@ fn wait_spec( build_tag: String, ProcessCliResult { connstr, + pgroot, + pgversion, pgdata, - pgbin, ext_remote_storage, resize_swap_on_bind, http_port, @@ -316,8 +322,9 @@ fn wait_spec( let compute_node = ComputeNode { connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?, pgdata: pgdata.to_string(), - pgbin: pgbin.to_string(), - pgversion: get_pg_version(pgbin), + pgroot: pgroot.to_string(), + pgversion: pgversion.to_string(), + http_port, live_config_allowed, state: Mutex::new(new_state), state_changed: Condvar::new(), @@ -337,7 +344,7 @@ fn wait_spec( // Launch http service first, so that we can serve control-plane requests // while configuration is still in progress. - let _http_handle = + let http_handle = launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread"); if !spec_set { @@ -372,15 +379,12 @@ fn wait_spec( Ok(WaitSpecResult { compute, - http_port, resize_swap_on_bind, }) } struct WaitSpecResult { compute: Arc, - // passed through from ProcessCliResult - http_port: u16, resize_swap_on_bind: bool, } @@ -389,7 +393,6 @@ async fn start_postgres( #[allow(unused_variables)] matches: &clap::ArgMatches, WaitSpecResult { compute, - http_port, resize_swap_on_bind, }: WaitSpecResult, ) -> Result<(Option, StartPostgresResult)> { @@ -442,12 +445,10 @@ async fn start_postgres( } } - let extension_server_port: u16 = http_port; - // Start Postgres let mut pg = None; if !prestartup_failed { - pg = match compute.start_compute(extension_server_port).await { + pg = match compute.start_compute() { Ok(pg) => Some(pg), Err(err) => { error!("could not start the compute node: {:#}", err); @@ -687,11 +688,17 @@ fn cli() -> clap::Command { .required(true), ) .arg( - Arg::new("pgbin") - .short('b') - .long("pgbin") - .default_value("postgres") - .value_name("POSTGRES_PATH"), + Arg::new("pgroot") + .short('R') + .long("pgroot") + .value_name("POSTGRES_ROOT") + .required(true), + ) + .arg( + Arg::new("pgversion") + .long("pgversion") + .value_name("POSTGRES_VERSION") + .required(true), ) .arg( Arg::new("spec") @@ -751,6 +758,11 @@ fn cli() -> clap::Command { .long("resize-swap-on-bind") .action(clap::ArgAction::SetTrue), ) + .arg( + Arg::new("no-postgres") + .long("no-postgres") + .action(ArgAction::SetTrue), + ) } /// When compute_ctl is killed, send also termination signal to sync-safekeepers diff --git a/compute_tools/src/catalog.rs b/compute_tools/src/catalog.rs index 4fefa831e0..95d37b1392 100644 --- a/compute_tools/src/catalog.rs +++ b/compute_tools/src/catalog.rs @@ -55,9 +55,7 @@ pub async fn get_database_schema( compute: &Arc, dbname: &str, ) -> Result>, SchemaDumpError> { - let pgbin = &compute.pgbin; - let basepath = Path::new(pgbin).parent().unwrap(); - let pgdump = basepath.join("pg_dump"); + let pgdump = compute.get_my_pg_binary("pg_dump"); let mut connstr = compute.connstr.clone(); connstr.set_path(dbname); let mut cmd = Command::new(pgdump) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index b595316960..c5a01ba489 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -3,6 +3,7 @@ use std::env; use std::fs; use std::os::unix::fs::{symlink, PermissionsExt}; use std::path::Path; +use std::path::PathBuf; use std::process::{Command, Stdio}; use std::str::FromStr; use std::sync::atomic::AtomicU32; @@ -13,6 +14,7 @@ use std::time::Instant; use anyhow::{Context, Result}; use bytes::{Buf, BufMut}; +use camino::Utf8Path; use chrono::{DateTime, Utc}; use futures::future::join_all; use futures::stream::FuturesUnordered; @@ -20,9 +22,12 @@ use futures::StreamExt; use nix::unistd::Pid; use postgres::error::SqlState; use postgres::{Client, NoTls}; +use tokio; +use tokio_postgres; use tracing::{debug, error, info, instrument, warn}; use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; +use utils::zstd::create_zst_tarball; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec}; @@ -46,8 +51,9 @@ pub struct ComputeNode { // Url type maintains proper escaping pub connstr: url::Url, pub pgdata: String, - pub pgbin: String, + pub pgroot: String, pub pgversion: String, + pub http_port: u16, /// We should only allow live re- / configuration of the compute node if /// it uses 'pull model', i.e. it can go to control-plane and fetch /// the latest configuration. Otherwise, there could be a case: @@ -308,6 +314,13 @@ impl ComputeNode { self.state.lock().unwrap().status } + /// Get the mode of this compute. + pub fn get_mode(&self) -> ComputeMode { + let state = self.state.lock().unwrap(); + + state.pspec.as_ref().unwrap().spec.mode + } + // Remove `pgdata` directory and create it again with right permissions. fn create_pgdata(&self) -> Result<()> { // Ignore removal error, likely it is a 'No such file or directory (os error 2)'. @@ -319,6 +332,35 @@ impl ComputeNode { Ok(()) } + /// Get path pointing to requested binary directory. + pub fn get_pg_bindir(&self, version: &str) -> PathBuf { + Path::new(&self.pgroot) + .join(format!("v{}", version)) + .join("bin") + } + + /// Get path to requested Postgres binary. + pub fn get_pg_binary(&self, version: &str, binary: &str) -> String { + self.get_pg_bindir(version) + .join(binary) + .into_os_string() + .into_string() + .expect(&format!( + "path to {}-{} cannot be represented as UTF-8", + binary, version + )) + } + + /// Get path to Postgres binary directory of this compute. + pub fn get_my_pg_bindir(&self) -> PathBuf { + self.get_pg_bindir(&self.pgversion) + } + + /// Get path to specified Postgres binary of this compute. + pub fn get_my_pg_binary(&self, binary: &str) -> String { + self.get_pg_binary(&self.pgversion, binary) + } + // Get basebackup from the libpq connection to pageserver using `connstr` and // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip_all, fields(%lsn))] @@ -537,7 +579,8 @@ impl ComputeNode { pub fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); - let mut sync_handle = maybe_cgexec(&self.pgbin) + let postgres = self.get_my_pg_binary("postgres"); + let mut sync_handle = maybe_cgexec(&postgres) .args(["--sync-safekeepers"]) .env("PGDATA", &self.pgdata) // we cannot use -D in this mode .envs(if let Some(storage_auth_token) = &storage_auth_token { @@ -593,11 +636,7 @@ impl ComputeNode { /// Do all the preparations like PGDATA directory creation, configuration, /// safekeepers sync, basebackup, etc. #[instrument(skip_all)] - pub async fn prepare_pgdata( - &self, - compute_state: &ComputeState, - extension_server_port: u16, - ) -> Result<()> { + pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let spec = &pspec.spec; let pgdata_path = Path::new(&self.pgdata); @@ -607,7 +646,7 @@ impl ComputeNode { config::write_postgres_conf( &pgdata_path.join("postgresql.conf"), &pspec.spec, - Some(extension_server_port), + Some(self.http_port), )?; // Syncing safekeepers is only safe with primary nodes: if a primary @@ -635,6 +674,10 @@ impl ComputeNode { info!("Initializing standby from latest Pageserver LSN"); Lsn(0) } + ComputeMode::Upgrade => { + info!("Starting upgrade node at latest Pageserver LSN"); + Lsn(0) + } }; info!( @@ -696,7 +739,7 @@ impl ComputeNode { symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?; match spec.mode { - ComputeMode::Primary => {} + ComputeMode::Primary | ComputeMode::Upgrade => {} ComputeMode::Replica | ComputeMode::Static(..) => { add_standby_signal(pgdata_path)?; } @@ -715,7 +758,7 @@ impl ComputeNode { // Run initdb to completion info!("running initdb"); - let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb"); + let initdb_bin = self.get_my_pg_binary("initdb"); Command::new(initdb_bin) .args(["-D", pgdata]) .output() @@ -731,7 +774,8 @@ impl ComputeNode { // Start postgres info!("starting postgres"); - let mut pg = maybe_cgexec(&self.pgbin) + let postgres = self.get_my_pg_binary("postgres"); + let mut pg = maybe_cgexec(&postgres) .args(["-D", pgdata]) .spawn() .expect("cannot start postgres process"); @@ -764,7 +808,8 @@ impl ComputeNode { let pgdata_path = Path::new(&self.pgdata); // Run postgres as a child process. - let mut pg = maybe_cgexec(&self.pgbin) + let postgres = self.get_my_pg_binary("postgres"); + let mut pg = maybe_cgexec(&postgres) .args(["-D", &self.pgdata]) .envs(if let Some(storage_auth_token) = &storage_auth_token { vec![("NEON_AUTH_TOKEN", storage_auth_token)] @@ -785,6 +830,152 @@ impl ComputeNode { Ok((pg, logs_handle)) } + pub async fn upgrade(&self, pg_version: &str) -> Result<()> { + let old_bindir = self.get_my_pg_bindir(); + let new_bindir = self.get_pg_bindir(pg_version); + + let old_datadir = Utf8Path::new(&self.pgdata); + let parent_dir = old_datadir.parent().unwrap(); + let new_datadir = parent_dir.join("new-pgdata"); + + // Delete the new data directory before attempting, just in case it exists + let _ = std::fs::remove_dir_all(&new_datadir); + + // Step 1: Create new cluster + info!( + "Running initdb to start a cluster upgrade from v{} to v{}", + self.pgversion, pg_version + ); + + let initdb_bin = self.get_pg_binary(pg_version, "initdb"); + let mut initdb_cmd = Command::new(&initdb_bin); + initdb_cmd + .args(["--pgdata", new_datadir.as_str()]) + .args(["--username", "cloud_admin"]) + .args(["--encoding", "utf8"]) + .args(["--auth-local", "trust"]) + .env_clear() + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()); + + match initdb_cmd.status() { + Ok(status) => { + if !status.success() { + return Err(anyhow::anyhow!("failed to initialize the new database")); + } + + info!("Initialized v{} database", pg_version); + } + Err(_) => { + return Err(anyhow::anyhow!( + "failed to spawn initdb for the new database" + )) + } + }; + + // Step 2: Run pg_upgrade + info!( + "Running pg_upgrade to upgrade from v{} to v{}", + self.pgversion, pg_version + ); + + let pg_upgrade_bin = self.get_pg_binary(pg_version, "pg_upgrade"); + let mut pg_upgrade_cmd = Command::new(&pg_upgrade_bin); + let mut child = pg_upgrade_cmd + .args([ + "--old-bindir", + &old_bindir.into_os_string().into_string().unwrap(), + ]) + .args(["--old-datadir", old_datadir.as_str()]) + .args(["--old-options", "-c neon.safekeepers=''"]) + .args([ + "--new-bindir", + &new_bindir.into_os_string().into_string().unwrap(), + ]) + .args(["--new-datadir", new_datadir.as_str()]) + .args(["--new-options", "-c neon.safekeepers=''"]) + .args(["--username", "cloud_admin"]) + .arg("--no-transfer") + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn()?; + + let status = child.wait()?; + if status.success() { + info!("pg_upgrade was successful"); + } else { + return Err(anyhow::anyhow!( + "pg_upgrade failed with exit code {}", + status.code().unwrap() + )); + } + + /* Step 3: Delete the script that pg_upgrade generates, which is created in the current + * working directory + */ + // TODO: We should write a patch for upstream to not generate this file + if cfg!(windows) { + let _ = std::fs::remove_file("delete_old_cluster.bat"); + } else { + let _ = std::fs::remove_file("delete_old_cluster.sh"); + } + + /* Step 4: Re-prepare the pgdata directory to work with the latest basebackup from the + * pageserver + */ + { + let state = self.state.lock().unwrap().clone(); + + self.prepare_pgdata(&state).await?; + } + + // Step 5: Create tarball minus things like pg_dynshm, etc. + info!("Creating tarball of upgraded initdb directory, minus some files"); + + let initdb_tar_path = parent_dir.join("initdb.tar.zst"); + let _ = std::fs::remove_file(&initdb_tar_path); + create_zst_tarball(&new_datadir, &initdb_tar_path).await?; + + // Step 6: Write new postgresql.conf file for upgraded initdb + std::fs::copy( + old_datadir.join("postgresql.conf"), + new_datadir.join("postgresql.conf"), + )?; + + // Step 7: Write the tarball into the Postgres WAL + info!("Writing initdb.tar.zst to WAL"); + + let postgres_bin = self.get_my_pg_binary("postgres"); + let mut wal_log_cmd = Command::new(&postgres_bin); + child = wal_log_cmd + .args(["--wal-log", initdb_tar_path.as_str()]) + .env_clear() + .env("PGDATA", old_datadir.as_str()) + .env("NEON_PURPOSE", "upgrade") + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .spawn() + .expect("postgres --wal-log failed to start"); + match child.wait() { + Ok(s) => { + if !s.success() { + return Err(anyhow::anyhow!("Could not wal log upgrade tarball")); + } + } + Err(e) => return Err(e.into()), + }; + + // Step 8: Sync the safekeepers to push WAL record to Neon + self.sync_safekeepers(None)?; + + /* Note that whether any errors occur after this are unimportant. ALWAYS return success + * after this point. The compute will be terminated immediately after the upgrade. Remember + * that this is an upgrade-only compute, and it will not accept connections from users. + */ + + Ok(()) + } + /// Do post configuration of the already started Postgres. This function spawns a background thread to /// configure the database after applying the compute spec. Currently, it upgrades the neon extension /// version. In the future, it may upgrade all 3rd-party extensions. @@ -902,8 +1093,8 @@ impl ComputeNode { // `pg_ctl` for start / stop. #[instrument(skip_all)] fn pg_reload_conf(&self) -> Result<()> { - let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl"); - Command::new(pgctl_bin) + let pgctl_bin = self.get_my_pg_binary("pg_ctl"); + Command::new(&pgctl_bin) .args(["reload", "-D", &self.pgdata]) .output() .expect("cannot run pg_ctl process"); @@ -987,43 +1178,25 @@ impl ComputeNode { Ok(()) } + /// Prepares the compute for Postgres operations, including downloading + /// remote extensions and preparing the pgdata directory. + /// + /// The caller must hold the state mutex. #[instrument(skip_all)] - pub async fn start_compute( - &self, - extension_server_port: u16, - ) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> { - let compute_state = self.state.lock().unwrap().clone(); - let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + pub async fn prepare_compute(&self) -> Result<()> { + let state = self.state.lock().unwrap().clone(); + let pspec = state.pspec.as_ref().expect("spec must be set"); + info!( - "starting compute for project {}, operation {}, tenant {}, timeline {}", + "preparing compute for project {}, operation {}, tenant {}, timeline {}", pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"), pspec.spec.operation_uuid.as_deref().unwrap_or("None"), pspec.tenant_id, pspec.timeline_id, ); - // tune pgbouncer - if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings { - info!("tuning pgbouncer"); - - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("failed to create rt"); - - // Spawn a thread to do the tuning, - // so that we don't block the main thread that starts Postgres. - let pgbouncer_settings = pgbouncer_settings.clone(); - let _handle = thread::spawn(move || { - let res = rt.block_on(tune_pgbouncer(pgbouncer_settings)); - if let Err(err) = res { - error!("error while tuning pgbouncer: {err:?}"); - } - }); - } - info!( - "start_compute spec.remote_extensions {:?}", + "prepare_compute spec.remote_extensions {:?}", pspec.spec.remote_extensions ); @@ -1031,7 +1204,8 @@ impl ComputeNode { // remote shared_preload_libraries before postgres start (if any) if let Some(remote_extensions) = &pspec.spec.remote_extensions { // First, create control files for all availale extensions - extension_server::create_control_files(remote_extensions, &self.pgbin); + let postgres_bin = self.get_my_pg_binary("postgres"); + extension_server::create_control_files(remote_extensions, &postgres_bin); let library_load_start_time = Utc::now(); let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?; @@ -1053,8 +1227,17 @@ impl ComputeNode { info!("{:?}", remote_ext_metrics); } - self.prepare_pgdata(&compute_state, extension_server_port) - .await?; + self.prepare_pgdata(&state).await?; + + self.set_status(ComputeStatus::Prepared); + + Ok(()) + } + + #[instrument(skip_all)] + pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> { + let compute_state = self.state.lock().unwrap().clone(); + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let start_time = Utc::now(); let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?; @@ -1160,9 +1343,11 @@ impl ComputeNode { core_path.display() ); + let postgres_bin = self.get_my_pg_binary("postgres"); + // Try first with gdb let backtrace = Command::new("gdb") - .args(["--batch", "-q", "-ex", "bt", &self.pgbin]) + .args(["--batch", "-q", "-ex", "bt", &postgres_bin]) .arg(&core_path) .output(); @@ -1295,11 +1480,12 @@ LIMIT 100", // then we try to download it here info!("downloading new extension {ext_archive_name}"); + let postgres_bin = self.get_my_pg_binary("postgres"); let download_size = extension_server::download_extension( &real_ext_name, &ext_path, ext_remote_storage, - &self.pgbin, + &postgres_bin, ) .await .map_err(DownloadError::Other); diff --git a/compute_tools/src/config.rs b/compute_tools/src/config.rs index 479100eb89..ab2b981872 100644 --- a/compute_tools/src/config.rs +++ b/compute_tools/src/config.rs @@ -74,7 +74,7 @@ pub fn write_postgres_conf( } match spec.mode { - ComputeMode::Primary => {} + ComputeMode::Primary | ComputeMode::Upgrade => {} ComputeMode::Static(lsn) => { // hot_standby is 'on' by default, but let's be explicit writeln!(file, "hot_standby=on")?; diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index ef1db73982..2fee6d6596 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -75,7 +75,6 @@ use anyhow::Result; use anyhow::{bail, Context}; use bytes::Bytes; use compute_api::spec::RemoteExtSpec; -use regex::Regex; use remote_storage::*; use reqwest::StatusCode; use std::path::Path; @@ -103,34 +102,6 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String { .to_string() } -pub fn get_pg_version(pgbin: &str) -> String { - // pg_config --version returns a (platform specific) human readable string - // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc. - let human_version = get_pg_config("--version", pgbin); - return parse_pg_version(&human_version).to_string(); -} - -fn parse_pg_version(human_version: &str) -> &str { - // Normal releases have version strings like "PostgreSQL 15.4". But there - // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL - // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version - // configure option, you can tack any string to the version number, - // e.g. "PostgreSQL 15.4foobar". - match Regex::new(r"^PostgreSQL (?\d+).+") - .unwrap() - .captures(human_version) - { - Some(captures) if captures.len() == 2 => match &captures["major"] { - "14" => return "v14", - "15" => return "v15", - "16" => return "v16", - _ => {} - }, - _ => {} - } - panic!("Unsuported postgres version {human_version}"); -} - // download the archive for a given extension, // unzip it, and place files in the appropriate locations (share/lib) pub async fn download_extension( @@ -255,42 +226,3 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res ), } } - -#[cfg(test)] -mod tests { - use super::parse_pg_version; - - #[test] - fn test_parse_pg_version() { - assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15"); - assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15"); - assert_eq!( - parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"), - "v15" - ); - - assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14"); - assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14"); - assert_eq!( - parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"), - "v14" - ); - - assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16"); - assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16"); - assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16"); - assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16"); - } - - #[test] - #[should_panic] - fn test_parse_pg_unsupported_version() { - parse_pg_version("PostgreSQL 13.14"); - } - - #[test] - #[should_panic] - fn test_parse_pg_incorrect_version_format() { - parse_pg_version("PostgreSQL 14"); - } -} diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 43d29402bc..bda8102490 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -10,6 +10,7 @@ use crate::catalog::{get_database_schema, get_dbs_and_roles}; use crate::compute::forward_termination_signal; use crate::compute::{ComputeNode, ComputeState, ParsedSpec}; use compute_api::requests::ConfigurationRequest; +use compute_api::requests::UpgradeRequest; use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError}; use anyhow::Result; @@ -137,6 +138,20 @@ async fn routes(req: Request, compute: &Arc) -> Response { + info!("serving /upgrade POST request"); + match handle_upgrade_request(req, compute).await { + Ok(_) => Response::builder() + .status(StatusCode::ACCEPTED) + .body(Body::from("Starting upgrade")) + .unwrap(), + Err((e, status)) => { + error!("error handling /upgrade request: {e}"); + render_json_error(&format!("{}", e), status) + } + } + } + (&Method::GET, "/dbs_and_roles") => { info!("serving /dbs_and_roles GET request",); match get_dbs_and_roles(compute).await { @@ -397,6 +412,59 @@ async fn handle_terminate_request(compute: &Arc) -> Result<(), (Str Ok(()) } +async fn handle_upgrade_request( + req: Request, + compute: &Arc, +) -> Result<(), (anyhow::Error, StatusCode)> { + let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap(); + let body_str = String::from_utf8(body_bytes.to_vec()).unwrap(); + + let body = match serde_json::from_str::(&body_str) { + Ok(r) => r, + Err(e) => return Err((Into::into(e), StatusCode::BAD_REQUEST)), + }; + + // No sense in trying to upgrade to the same version. + let curr_version = compute.pgversion.clone(); + let new_version = body.pg_version; + if curr_version == new_version { + return Err(( + anyhow::anyhow!("cannot upgrade endpoint to the same version"), + StatusCode::UNPROCESSABLE_ENTITY, + )); + } + + // Check that we are in the running state before trying to upgrade. + match compute.get_status() { + ComputeStatus::Prepared => (), + ComputeStatus::Upgrading => { + return Err(( + anyhow::anyhow!("upgrade already in progress"), + StatusCode::CONFLICT, + )); + } + _ => { + return Err(( + anyhow::anyhow!("expected compute to be in the prepared state"), + StatusCode::CONFLICT, + )); + } + } + + compute.set_status(ComputeStatus::Upgrading); + + let c = compute.clone(); + tokio::spawn(async move { + if let Err(e) = c.upgrade(&new_version).await { + error!("Failed to upgrade database: {}", e); + } + + c.set_status(ComputeStatus::Running); + }); + + Ok(()) +} + // Main Hyper HTTP server function that runs it and blocks waiting on it forever. #[tokio::main] async fn serve(port: u16, state: Arc) { diff --git a/compute_tools/src/http/openapi_spec.yaml b/compute_tools/src/http/openapi_spec.yaml index b0ddaeae2b..ae5eaed24e 100644 --- a/compute_tools/src/http/openapi_spec.yaml +++ b/compute_tools/src/http/openapi_spec.yaml @@ -212,6 +212,21 @@ paths: application/json: schema: $ref: "#/components/schemas/GenericError" + /upgrade: + post: + tags: + - Upgrade + summary: Upgrade project to another Postgres major version. + operationId: upgradePostgres + responses: + 202: + description: Upgrade request in progress. + 409: + description: Upgrade already in progress. + 422: + description: Upgrade request could not be processed. + 500: + description: Upgrade request failed. /terminate: post: diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 144cd647c9..d07dba865a 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -798,14 +798,19 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re .get_one::("hot-standby") .copied() .unwrap_or(false); - + let upgrade_only = sub_args + .get_one::("upgrade-only") + .copied() + .unwrap_or(false); let allow_multiple = sub_args.get_flag("allow-multiple"); - let mode = match (lsn, hot_standby) { - (Some(lsn), false) => ComputeMode::Static(lsn), - (None, true) => ComputeMode::Replica, - (None, false) => ComputeMode::Primary, - (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), + let mode = match (lsn, hot_standby, upgrade_only) { + (Some(lsn), false, false) => ComputeMode::Static(lsn), + (None, true, false) => ComputeMode::Replica, + (None, false, false) => ComputeMode::Primary, + (None, false, true) => ComputeMode::Upgrade, + // Seeing this message means we aren't setting conflicts_with on clap arguments. + _ => anyhow::bail!("Invalid command line invocation"), }; match (mode, hot_standby) { @@ -1501,7 +1506,8 @@ fn cli() -> Command { let lsn_arg = Arg::new("lsn") .long("lsn") .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") - .required(false); + .required(false) + .conflicts_with("hot-standby"); let hot_standby_arg = Arg::new("hot-standby") .value_parser(value_parser!(bool)) @@ -1718,6 +1724,18 @@ fn cli() -> Command { .arg(hot_standby_arg.clone()) .arg(update_catalog) .arg(allow_multiple.clone()) + .arg( + Arg::new("upgrade-only") + .help("Mark this compute as an upgrade compute") + .long("upgrade-only") + .action(ArgAction::SetTrue) + .conflicts_with_all(&[ + "config-only", + "hot-standby", + // Perhaps we could offer upgrades at a specific LSN in the future. + "lsn", + ]) + ) ) .subcommand(Command::new("start") .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 9f879c4b08..249f2e2831 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -182,6 +182,7 @@ impl ComputeControlPlane { tenant_id: TenantId, timeline_id: TimelineId, ) -> Result<()> { + // TODO: It really feels like I need to do some protection here if matches!(mode, ComputeMode::Primary) { // this check is not complete, as you could have a concurrent attempt at // creating another primary, both reading the state before checking it here, @@ -393,6 +394,7 @@ impl Endpoint { conf.append("recovery_prefetch", "off"); } } + ComputeMode::Upgrade => {} } Ok(conf) @@ -624,13 +626,16 @@ impl Endpoint { self.endpoint_path().join("spec.json").to_str().unwrap(), ]) .args([ - "--pgbin", - self.env - .pg_bin_dir(self.pg_version)? - .join("postgres") - .to_str() + "--pgroot", + &self + .env + .pg_distrib_dir + .clone() + .into_os_string() + .into_string() .unwrap(), ]) + .args(["--pgversion", &self.pg_version.to_string()]) .stdin(std::process::Stdio::null()) .stderr(logfile.try_clone()?) .stdout(logfile); @@ -674,7 +679,7 @@ impl Endpoint { } // keep retrying } - ComputeStatus::Running => { + ComputeStatus::Running | ComputeStatus::Prepared => { // All good! break; } @@ -688,6 +693,7 @@ impl Endpoint { ); } ComputeStatus::Empty + | ComputeStatus::Upgrading | ComputeStatus::ConfigurationPending | ComputeStatus::Configuration | ComputeStatus::TerminationPending diff --git a/libs/compute_api/src/requests.rs b/libs/compute_api/src/requests.rs index 5896c7dc65..989d8520f4 100644 --- a/libs/compute_api/src/requests.rs +++ b/libs/compute_api/src/requests.rs @@ -12,3 +12,9 @@ use serde::Deserialize; pub struct ConfigurationRequest { pub spec: ComputeSpec, } + +/// Request body of the /upgrade API +#[derive(Deserialize, Debug)] +pub struct UpgradeRequest { + pub pg_version: String, +} diff --git a/libs/compute_api/src/responses.rs b/libs/compute_api/src/responses.rs index d05d625b0a..a0d6747f58 100644 --- a/libs/compute_api/src/responses.rs +++ b/libs/compute_api/src/responses.rs @@ -44,10 +44,15 @@ pub enum ComputeStatus { // Compute node has spec and initial startup and // configuration is in progress. Init, + // Compute has been prepared, meaning that remote extensions have been + // downloaded and the data directory has been prepared. + Prepared, // Compute is configured and running. Running, // New spec is being applied. Configuration, + // Compute is upgrading Postgres. + Upgrading, // Either startup or configuration failed, // compute will exit soon or is waiting for // control-plane to terminate it. diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 883c624f71..c7f2cbcb05 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -199,6 +199,11 @@ pub enum ComputeMode { /// Future versions may want to distinguish between replicas with hot standby /// feedback and other kinds of replication configurations. Replica, + /// An upgrade-only node + /// + /// This node will not accept remote Postgres connections. It's only + /// purpose is to upgrade a timeline. + Upgrade, } #[derive(Clone, Debug, Default, Deserialize, Serialize)] diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 6ce855c78e..a4859cecb1 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -181,9 +181,27 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20; pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30; pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40; pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50; +pub const XLOG_NEON_FILE: u8 = 0x60; pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40; +#[repr(C)] +#[derive(Debug)] +pub enum XlNeonFileFiletype { + UPGRADE_TARBALL, +} + +impl TryFrom for XlNeonFileFiletype { + type Error = (); + + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(XlNeonFileFiletype::UPGRADE_TARBALL), + _ => Err(()), + } + } +} + // from xlogreader.h pub const XLR_INFO_MASK: u8 = 0x0F; pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 5a0894cd1b..c575c5145b 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -506,6 +506,10 @@ async fn import_file( return Ok(None); } + if file_name == "pg_internal.init" { + return Ok(None); + } + if file_path.starts_with("global") { let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; let dbnode = 0; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 262dccac7d..a2c301d885 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,7 +14,7 @@ mod walreceiver; use anyhow::{anyhow, bail, ensure, Context, Result}; use arc_swap::ArcSwap; use bytes::Bytes; -use camino::Utf8Path; +use camino::{Utf8Path, Utf8PathBuf}; use chrono::{DateTime, Utc}; use enumset::EnumSet; use fail::fail_point; @@ -4357,6 +4357,10 @@ impl Timeline { ) } + pub fn get_path(&self) -> Utf8PathBuf { + self.conf.timelines_path(&self.tenant_shard_id) + } + /// Detach this timeline from its ancestor by copying all of ancestors layers as this /// Timelines layers up to the ancestor_lsn. /// diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 6e15ad81c3..49e9518b8a 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -24,6 +24,7 @@ use std::time::Duration; use std::time::SystemTime; +use camino::Utf8PathBuf; use pageserver_api::shard::ShardIdentity; use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz}; use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn}; @@ -33,8 +34,10 @@ use bytes::{Buf, Bytes, BytesMut}; use tracing::*; use utils::failpoint_support; use utils::rate_limit::RateLimit; +use utils::zstd::extract_zst_tarball; use crate::context::RequestContext; +use crate::import_datadir; use crate::metrics::WAL_INGEST; use crate::pgdatadir_mapping::{DatadirModification, Version}; use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; @@ -69,7 +72,9 @@ impl CheckPoint { } } -pub struct WalIngest { +pub struct WalIngest<'t> { + timeline: &'t Timeline, + timeline_path: Utf8PathBuf, shard: ShardIdentity, checkpoint: CheckPoint, checkpoint_modified: bool, @@ -82,12 +87,12 @@ struct WarnIngestLag { timestamp_invalid_msg_ratelimit: RateLimit, } -impl WalIngest { +impl<'t> WalIngest<'t> { pub async fn new( - timeline: &Timeline, + timeline: &'t Timeline, startpoint: Lsn, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> anyhow::Result> { // Fetch the latest checkpoint into memory, so that we can compare with it // quickly in `ingest_record` and update it when it changes. let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?; @@ -100,6 +105,8 @@ impl WalIngest { }); Ok(WalIngest { + timeline, + timeline_path: timeline.get_path(), shard: *timeline.get_shard_identity(), checkpoint, checkpoint_modified: false, @@ -458,6 +465,18 @@ impl WalIngest { modification.drop_replorigin(xlrec.node_id).await? } } + pg_constants::RM_BTREE_ID + | pg_constants::RM_HASH_ID + | pg_constants::RM_GIN_ID + | pg_constants::RM_GIST_ID + | pg_constants::RM_SEQ_ID + | pg_constants::RM_SPGIST_ID + | pg_constants::RM_BRIN_ID + | pg_constants::RM_COMMIT_TS_ID + | pg_constants::RM_REPLORIGIN_ID + | pg_constants::RM_GENERIC_ID => { + // No special handling currently for these resource managers + } _x => { // TODO: should probably log & fail here instead of blindly // doing something without understanding the protocol @@ -923,6 +942,33 @@ impl WalIngest { assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID); match pg_version { + 15 => { + let info = decoded.xl_info; + + match info { + pg_constants::XLOG_NEON_FILE => { + info!( + "tristan: last_record_lsn={}", + self.timeline.get_last_record_lsn() + ); + let xlrec = v16::rm_neon::XlNeonFile::decode(buf); + let pgdata_path = self.timeline_path.join("new-pgdata"); + + extract_zst_tarball(&pgdata_path, &*xlrec.data).await?; + + let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?; + info!("LSN from pg_upgraded controlfile: {lsn}"); + Box::pin(import_datadir::import_timeline_from_postgres_datadir( + self.timeline, + &pgdata_path, + lsn, + ctx, + )) + .await?; + } + _ => return Err(anyhow::anyhow!("Unknown XLOG xl_info field: {}", info)), + } + } 16 => { let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK; diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 0c4d575de8..2d2f4e6817 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -526,6 +526,29 @@ pub mod v16 { } } } + + #[repr(C)] + #[derive(Debug)] + pub struct XlNeonFile { + pub filetype: u8, + pub size: u32, + pub data: Bytes, + } + + impl XlNeonFile { + pub fn decode(buf: &mut Bytes) -> Self { + let filetype = buf.get_u8(); + // Skip the padding + buf.advance(std::mem::size_of::() * 3); + let size = buf.get_u32_le(); + + Self { + filetype, + size, + data: buf.copy_to_bytes(buf.remaining()), + } + } + } } } diff --git a/pgxn/neon/Makefile b/pgxn/neon/Makefile index 3b755bb042..b7cc430884 100644 --- a/pgxn/neon/Makefile +++ b/pgxn/neon/Makefile @@ -23,7 +23,18 @@ SHLIB_LINK_INTERNAL = $(libpq) SHLIB_LINK = -lcurl EXTENSION = neon -DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql +DATA = \ + neon--1.0.sql \ + neon--1.0--1.1.sql \ + neon--1.1--1.2.sql \ + neon--1.2--1.3.sql \ + neon--1.3--1.4.sql \ + neon--1.4--1.5.sql \ + neon--1.5--1.4.sql \ + neon--1.4--1.3.sql \ + neon--1.3--1.2.sql \ + neon--1.2--1.1.sql \ + neon--1.1--1.0.sql PGFILEDESC = "neon - cloud storage for PostgreSQL" EXTRA_CLEAN = \ diff --git a/pgxn/neon/neon--1.3--1.4.sql b/pgxn/neon/neon--1.3--1.4.sql index 042effe346..aa500651b1 100644 --- a/pgxn/neon/neon--1.3--1.4.sql +++ b/pgxn/neon/neon--1.3--1.4.sql @@ -7,3 +7,7 @@ LANGUAGE C PARALLEL SAFE; GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor; +CREATE FUNCTION wal_log_file(path text) +RETURNS pg_lsn +AS 'MODULE_PATHNAME', 'wal_log_file' +LANGUAGE C STRICT PARALLEL UNSAFE; diff --git a/pgxn/neon/neon--1.4--1.5.sql b/pgxn/neon/neon--1.4--1.5.sql new file mode 100644 index 0000000000..f644a8939c --- /dev/null +++ b/pgxn/neon/neon--1.4--1.5.sql @@ -0,0 +1,8 @@ +\echo Use "ALTER EXTENSION neon UPDATE TO '1.5'" to load this file. \quit + +CREATE FUNCTION wal_log_file(path text) +RETURNS pg_lsn +AS 'MODULE_PATHNAME', 'wal_log_file' +LANGUAGE C STRICT PARALLEL UNSAFE; + +GRANT EXECUTE ON FUNCTION wal_log_file TO pg_monitor; diff --git a/pgxn/neon/neon--1.5--1.4.sql b/pgxn/neon/neon--1.5--1.4.sql new file mode 100644 index 0000000000..27af4df39f --- /dev/null +++ b/pgxn/neon/neon--1.5--1.4.sql @@ -0,0 +1 @@ +DROP FUNCTION IF EXISTS wal_log_file(text) CASCADE; diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index fe8e276d1c..a93d921f45 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -11,6 +11,8 @@ #include "postgres.h" #include "fmgr.h" +#include + #include "miscadmin.h" #include "access/subtrans.h" #include "access/twophase.h" @@ -29,11 +31,19 @@ #include "tcop/tcopprot.h" #include "funcapi.h" #include "access/htup_details.h" +#include "access/xloginsert.h" #include "utils/builtins.h" #include "utils/pg_lsn.h" #include "utils/guc.h" #include "utils/guc_tables.h" +#include "utils/timeout.h" #include "utils/wait_event.h" +#include "storage/ipc.h" +#include "storage/proc.h" + +#if PG_MAJORVERSION_NUM >= 15 +#include "access/neon_xlog.h" +#endif #include "extension_server.h" #include "neon.h" @@ -629,10 +639,17 @@ ReportSearchPath(void) void _PG_init(void) { + const char *purpose; + + purpose = getenv("NEON_PURPOSE"); + if (purpose && strcmp(purpose, "upgrade") == 0) + return; + /* * Also load 'neon_rmgr'. This makes it unnecessary to list both 'neon' * and 'neon_rmgr' in shared_preload_libraries. */ + #if PG_VERSION_NUM >= 160000 load_file("$libdir/neon_rmgr", false); #endif @@ -676,6 +693,9 @@ _PG_init(void) PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); PG_FUNCTION_INFO_V1(backpressure_throttling_time); +#if PG_MAJORVERSION_NUM >= 15 +PG_FUNCTION_INFO_V1(wal_log_file); +#endif Datum pg_cluster_size(PG_FUNCTION_ARGS) @@ -721,3 +741,200 @@ backpressure_throttling_time(PG_FUNCTION_ARGS) { PG_RETURN_UINT64(BackpressureThrottlingTime()); } + +#if PG_MAJORVERSION_NUM >= 15 + +Datum +wal_log_file(PG_FUNCTION_ARGS) +{ + int rc; + int fd; + ssize_t n; + text *path; + size_t off; + char *data; + short nargs; + struct stat st; + XLogRecPtr lsn; + size_t path_len; + xl_neon_file xlrec; + char file[MAXPGPATH]; + +#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16 + bool wal_debug; +#endif + + path = PG_GETARG_TEXT_PP(0); + path_len = VARSIZE(path) - VARHDRSZ; + + memcpy(file, VARDATA(path), path_len); + file[path_len] = '\0'; + + /* Get the size of the file. Note that stat(2) follows symlinks. */ + rc = stat(file, &st); + if (rc != 0) + ereport(ERROR, + (errmsg("failed to get size of file (%s): %m", file))); + + xlrec.size = (size_t) st.st_size; + + fd = open(file, O_RDONLY); + if (fd == -1) + ereport(ERROR, + (errmsg("could not open %s: %m", file))); + + /* If the file is too large, error out. */ + + data = palloc(xlrec.size); + + /* Copy the file contents */ + off = 0; + while (true) { + n = read(fd, data + off, xlrec.size - off); + if (n == EOF) + { + close(fd); + ereport(ERROR, + (errmsg("failed to read %s: %m", file))); + } + + off += n; + + if (xlrec.size - off == 0) + break; + } + + close(fd); + + xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfNeonFile); + XLogRegisterData((char *) data, xlrec.size); + + /* + * We must turn debugging off on anything where the Neon RMGR is not + * registered. Stash the original value for restoration later. + */ +#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16 + wal_debug = XLOG_DEBUG; + XLOG_DEBUG = false; +#endif + + lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE); + +#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16 + XLOG_DEBUG = wal_debug; +#endif + + PG_RETURN_LSN(lsn); +} + +#endif + +#if PG_MAJORVERSION_NUM >= 15 + +/* + * Entry point for `postgres --wal-log`. + */ +PGDLLEXPORT void +WalLogMain(int argc, char *argv[]) +{ + int rc; + int fd; + off_t off; + struct stat st; + XLogRecPtr lsn; + void *data; + xl_neon_file xlrec; + /* TODO: should this be PATH_MAX? should we require an absolute path? */ + char file[MAXPGPATH]; + +#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16 + bool wal_debug; +#endif + + if (argc != 3) + ereport(ERROR, errmsg("wrong number of arguments passed to --wal-log")); + + if (!realpath(argv[2], file)) + ereport(ERROR, errmsg("failed to resolve path: %m")); + + ereport(LOG, errmsg("writing %s to WAL", file)); + + ChangeToDataDir(); + CreateDataDirLockFile(false); + LocalProcessControlFile(false); + InitializeMaxBackends(); + CreateSharedMemoryAndSemaphores(); + InitializeTimeouts(); + InitProcess(); + BaseInit(); + CreateAuxProcessResourceOwner(); + StartupXLOG(); + + /* Get the size of the file. Note that stat(2) follows symlinks. */ + rc = stat(file, &st); + if (rc != 0) + ereport(ERROR, + (errmsg("failed to get size of file (%s): %m", file))); + + xlrec.size = (size_t) st.st_size; + + fd = open(file, O_RDONLY); + if (fd == -1) + ereport(ERROR, + (errmsg("could not open %s: %m", file))); + + /* If the file is too large, error out. */ + + data = palloc(xlrec.size); + + /* Copy the file contents */ + off = 0; + while (true) { + ssize_t n; + + n = read(fd, data + off, xlrec.size - off); + if (n == EOF) + { + close(fd); + ereport(ERROR, + (errmsg("failed to read %s: %m", file))); + } + + off += n; + + if (xlrec.size - off == 0) + break; + } + + close(fd); + + xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL; + + /* ereport(LOG, errmsg("Current LSN: %X/%X" , LSN_FORMAT_ARGS(GetXLogWriteRecPtr()))); */ + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfNeonFile); + XLogRegisterData(data, xlrec.size); + + /* + * We must turn debugging off on anything where the Neon RMGR is not + * registered. Stash the original value for restoration later. + */ +#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16 + wal_debug = XLOG_DEBUG; + XLOG_DEBUG = false; +#endif + + lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE); + +#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16 + XLOG_DEBUG = wal_debug; +#endif + + exit(0); +} + +#endif diff --git a/pgxn/neon/relsize_cache.c b/pgxn/neon/relsize_cache.c index 2a4c2dc799..46d3b22395 100644 --- a/pgxn/neon/relsize_cache.c +++ b/pgxn/neon/relsize_cache.c @@ -78,11 +78,16 @@ neon_smgr_shmem_startup(void) if (prev_shmem_startup_hook) prev_shmem_startup_hook(); + if (relsize_hash_size <= 0) + return; + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found); if (!found) { + elog(LOG, "neon_relsize: %d", relsize_hash_size); relsize_lock = (LWLockId) GetNamedLWLockTranche("neon_relsize"); + elog(LOG, "neon_relsize"); info.keysize = sizeof(RelTag); info.entrysize = sizeof(RelSizeEntry); relsize_hash = ShmemInitHash("neon_relsize", diff --git a/pgxn/neon_rmgr/neon_rmgr.c b/pgxn/neon_rmgr/neon_rmgr.c index c3f726db84..8aac9bd1f2 100644 --- a/pgxn/neon_rmgr/neon_rmgr.c +++ b/pgxn/neon_rmgr/neon_rmgr.c @@ -76,6 +76,8 @@ neon_rm_redo(XLogReaderState *record) case XLOG_NEON_HEAP_MULTI_INSERT: redo_neon_heap_multi_insert(record); break; + case XLOG_NEON_FILE: + break; default: elog(PANIC, "neon_rm_redo: unknown op code %u", info); } diff --git a/pgxn/neon_rmgr/neon_rmgr_decode.c b/pgxn/neon_rmgr/neon_rmgr_decode.c index f327e132e9..0d5d744b3f 100644 --- a/pgxn/neon_rmgr/neon_rmgr_decode.c +++ b/pgxn/neon_rmgr/neon_rmgr_decode.c @@ -57,6 +57,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (SnapBuildProcessChange(builder, xid, buf->origptr)) DecodeNeonMultiInsert(ctx, buf); break; + case XLOG_NEON_FILE: + break; default: elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info); break; @@ -401,4 +403,4 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple) } -#endif \ No newline at end of file +#endif diff --git a/pgxn/neon_rmgr/neon_rmgr_desc.c b/pgxn/neon_rmgr/neon_rmgr_desc.c index 8901c85ba2..c2bae7ef2a 100644 --- a/pgxn/neon_rmgr/neon_rmgr_desc.c +++ b/pgxn/neon_rmgr/neon_rmgr_desc.c @@ -134,6 +134,16 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record) xlrec->ntuples, &offset_elem_desc, NULL); } } + else if (info == XLOG_NEON_FILE) + { + const xl_neon_file *xlrec = (xl_neon_file *) rec; + switch ((xl_neon_file_filetype) xlrec->filetype) + { + case XL_NEON_FILE_UPGRADE_TARBALL: + appendStringInfo(buf, "filetype: upgrade tarball, size: %zu", xlrec->size); + break; + } + } } const char * @@ -173,6 +183,9 @@ neon_rm_identify(uint8 info) case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE: id = "MULTI_INSERT+INIT"; break; + case XLOG_NEON_FILE: + id = "FILE"; + break; } return id; diff --git a/test_runner/regress/test_upgrade.py b/test_runner/regress/test_upgrade.py new file mode 100644 index 0000000000..f09b7e236a --- /dev/null +++ b/test_runner/regress/test_upgrade.py @@ -0,0 +1,54 @@ +from time import sleep + +import pytest +import requests +from fixtures.neon_fixtures import NeonEnv +from fixtures.pg_version import PgVersion + + +def test_upgrade(pg_version: PgVersion, neon_simple_env: NeonEnv): + env = neon_simple_env + + upgrade_to: PgVersion + if pg_version == PgVersion.V14: + upgrade_to = PgVersion.V15 + elif pg_version == PgVersion.V15: + upgrade_to = PgVersion.V16 + else: + pytest.skip("Nothing to upgrade") + + env.neon_cli.create_timeline("test_upgrade") + endpoint = env.endpoints.create_start("test_upgrade") + resp = requests.post( + f"http://localhost:{endpoint.http_port}/upgrade", + json={ + "pg_version": upgrade_to, + }, + ) + assert resp.status_code == 202 + + while True: + resp = requests.get(f"http://localhost:{endpoint.http_port}/status") + assert resp.status_code == 200 + + data = resp.json() + if data["status"] == "upgrading": + sleep(1) + continue + elif data["status"] == "running": + break + else: + pytest.fail(f"Unexpected compute state during upgrade: {data['status']}") + + endpoint.stop_and_destroy() + + +def test_upgrade_bad_request(neon_simple_env: NeonEnv): + env = neon_simple_env + + env.neon_cli.create_timeline("test_upgrade_bad_request") + endpoint = env.endpoints.create_start("test_upgrade_bad_request") + resp = requests.post(f"http://localhost:{endpoint.http_port}/upgrade") + assert resp.status_code == 400 + + # TODO: Use postgres versions that are out of range. diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 6f6d77fb59..6840d2616e 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 6f6d77fb5960602fcd3fd130aca9f99ecb1619c9 +Subproject commit 6840d2616ef202e733dd9b5b260abab146f8ec36 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 0baa7346df..737cd9c696 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 0baa7346dfd42d61912eeca554c9bb0a190f0a1e +Subproject commit 737cd9c696cdb92ff59600ab8eddff0c3d38da0c