Use pg_upgrade to upgrade projects from one Postgres major version to the next

This commit is contained in:
Tristan Partin
2024-02-16 11:08:18 -06:00
parent 3d07b6a483
commit 5e71d8fddc
32 changed files with 834 additions and 167 deletions

4
.gitmodules vendored
View File

@@ -5,8 +5,8 @@
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
branch = tristan957/15/pg_upgrade
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon
branch = tristan957/pg_upgrade

2
Cargo.lock generated
View File

@@ -1220,6 +1220,7 @@ dependencies = [
"anyhow",
"async-compression",
"bytes",
"camino",
"cfg-if",
"chrono",
"clap",
@@ -1237,6 +1238,7 @@ dependencies = [
"reqwest 0.12.4",
"rlimit",
"rust-ini",
"scopeguard",
"serde",
"serde_json",
"signal-hook",

View File

@@ -12,6 +12,7 @@ testing = []
[dependencies]
anyhow.workspace = true
async-compression.workspace = true
camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
@@ -24,6 +25,7 @@ num_cpus.workspace = true
opentelemetry.workspace = true
postgres.workspace = true
regex.workspace = true
scopeguard.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true

View File

@@ -43,7 +43,8 @@ use std::{thread, time::Duration};
use anyhow::{Context, Result};
use chrono::Utc;
use clap::Arg;
use clap::{Arg, ArgAction};
use compute_api::spec::ComputeMode;
use compute_tools::lsn_lease::launch_lsn_lease_bg_task_for_static;
use signal_hook::consts::{SIGQUIT, SIGTERM};
use signal_hook::{consts::SIGINT, iterator::Signals};
@@ -57,7 +58,6 @@ use compute_tools::compute::{
forward_termination_signal, ComputeNode, ComputeState, ParsedSpec, PG_PID,
};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::get_pg_version;
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -121,11 +121,14 @@ fn init() -> Result<(String, clap::ArgMatches)> {
}
fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
let pgbin_default = "postgres";
let pgbin = matches
.get_one::<String>("pgbin")
let pgroot = matches
.get_one::<String>("pgroot")
.map(|s| s.as_str())
.unwrap_or(pgbin_default);
.expect("pgroot is required");
let pgversion = matches
.get_one::<String>("pgversion")
.map(|s| s.as_str())
.expect("pgversion is required");
let ext_remote_storage = matches
.get_one::<String>("remote-ext-config")
@@ -156,7 +159,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
Ok(ProcessCliResult {
connstr,
pgdata,
pgbin,
pgroot,
pgversion,
ext_remote_storage,
http_port,
spec_json,
@@ -168,7 +172,8 @@ fn process_cli(matches: &clap::ArgMatches) -> Result<ProcessCliResult> {
struct ProcessCliResult<'clap> {
connstr: &'clap str,
pgdata: &'clap str,
pgbin: &'clap str,
pgroot: &'clap str,
pgversion: &'clap str,
ext_remote_storage: Option<&'clap str>,
http_port: u16,
spec_json: Option<&'clap String>,
@@ -290,8 +295,9 @@ fn wait_spec(
build_tag: String,
ProcessCliResult {
connstr,
pgroot,
pgversion,
pgdata,
pgbin,
ext_remote_storage,
resize_swap_on_bind,
http_port,
@@ -316,8 +322,9 @@ fn wait_spec(
let compute_node = ComputeNode {
connstr: Url::parse(connstr).context("cannot parse connstr as a URL")?,
pgdata: pgdata.to_string(),
pgbin: pgbin.to_string(),
pgversion: get_pg_version(pgbin),
pgroot: pgroot.to_string(),
pgversion: pgversion.to_string(),
http_port,
live_config_allowed,
state: Mutex::new(new_state),
state_changed: Condvar::new(),
@@ -337,7 +344,7 @@ fn wait_spec(
// Launch http service first, so that we can serve control-plane requests
// while configuration is still in progress.
let _http_handle =
let http_handle =
launch_http_server(http_port, &compute).expect("cannot launch http endpoint thread");
if !spec_set {
@@ -372,15 +379,12 @@ fn wait_spec(
Ok(WaitSpecResult {
compute,
http_port,
resize_swap_on_bind,
})
}
struct WaitSpecResult {
compute: Arc<ComputeNode>,
// passed through from ProcessCliResult
http_port: u16,
resize_swap_on_bind: bool,
}
@@ -389,7 +393,6 @@ async fn start_postgres(
#[allow(unused_variables)] matches: &clap::ArgMatches,
WaitSpecResult {
compute,
http_port,
resize_swap_on_bind,
}: WaitSpecResult,
) -> Result<(Option<PostgresHandle>, StartPostgresResult)> {
@@ -442,12 +445,10 @@ async fn start_postgres(
}
}
let extension_server_port: u16 = http_port;
// Start Postgres
let mut pg = None;
if !prestartup_failed {
pg = match compute.start_compute(extension_server_port).await {
pg = match compute.start_compute() {
Ok(pg) => Some(pg),
Err(err) => {
error!("could not start the compute node: {:#}", err);
@@ -687,11 +688,17 @@ fn cli() -> clap::Command {
.required(true),
)
.arg(
Arg::new("pgbin")
.short('b')
.long("pgbin")
.default_value("postgres")
.value_name("POSTGRES_PATH"),
Arg::new("pgroot")
.short('R')
.long("pgroot")
.value_name("POSTGRES_ROOT")
.required(true),
)
.arg(
Arg::new("pgversion")
.long("pgversion")
.value_name("POSTGRES_VERSION")
.required(true),
)
.arg(
Arg::new("spec")
@@ -751,6 +758,11 @@ fn cli() -> clap::Command {
.long("resize-swap-on-bind")
.action(clap::ArgAction::SetTrue),
)
.arg(
Arg::new("no-postgres")
.long("no-postgres")
.action(ArgAction::SetTrue),
)
}
/// When compute_ctl is killed, send also termination signal to sync-safekeepers

View File

@@ -55,9 +55,7 @@ pub async fn get_database_schema(
compute: &Arc<ComputeNode>,
dbname: &str,
) -> Result<impl Stream<Item = Result<bytes::Bytes, std::io::Error>>, SchemaDumpError> {
let pgbin = &compute.pgbin;
let basepath = Path::new(pgbin).parent().unwrap();
let pgdump = basepath.join("pg_dump");
let pgdump = compute.get_my_pg_binary("pg_dump");
let mut connstr = compute.connstr.clone();
connstr.set_path(dbname);
let mut cmd = Command::new(pgdump)

View File

@@ -3,6 +3,7 @@ use std::env;
use std::fs;
use std::os::unix::fs::{symlink, PermissionsExt};
use std::path::Path;
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::str::FromStr;
use std::sync::atomic::AtomicU32;
@@ -13,6 +14,7 @@ use std::time::Instant;
use anyhow::{Context, Result};
use bytes::{Buf, BufMut};
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use futures::future::join_all;
use futures::stream::FuturesUnordered;
@@ -20,9 +22,12 @@ use futures::StreamExt;
use nix::unistd::Pid;
use postgres::error::SqlState;
use postgres::{Client, NoTls};
use tokio;
use tokio_postgres;
use tracing::{debug, error, info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::zstd::create_zst_tarball;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec};
@@ -46,8 +51,9 @@ pub struct ComputeNode {
// Url type maintains proper escaping
pub connstr: url::Url,
pub pgdata: String,
pub pgbin: String,
pub pgroot: String,
pub pgversion: String,
pub http_port: u16,
/// We should only allow live re- / configuration of the compute node if
/// it uses 'pull model', i.e. it can go to control-plane and fetch
/// the latest configuration. Otherwise, there could be a case:
@@ -308,6 +314,13 @@ impl ComputeNode {
self.state.lock().unwrap().status
}
/// Get the mode of this compute.
pub fn get_mode(&self) -> ComputeMode {
let state = self.state.lock().unwrap();
state.pspec.as_ref().unwrap().spec.mode
}
// Remove `pgdata` directory and create it again with right permissions.
fn create_pgdata(&self) -> Result<()> {
// Ignore removal error, likely it is a 'No such file or directory (os error 2)'.
@@ -319,6 +332,35 @@ impl ComputeNode {
Ok(())
}
/// Get path pointing to requested binary directory.
pub fn get_pg_bindir(&self, version: &str) -> PathBuf {
Path::new(&self.pgroot)
.join(format!("v{}", version))
.join("bin")
}
/// Get path to requested Postgres binary.
pub fn get_pg_binary(&self, version: &str, binary: &str) -> String {
self.get_pg_bindir(version)
.join(binary)
.into_os_string()
.into_string()
.expect(&format!(
"path to {}-{} cannot be represented as UTF-8",
binary, version
))
}
/// Get path to Postgres binary directory of this compute.
pub fn get_my_pg_bindir(&self) -> PathBuf {
self.get_pg_bindir(&self.pgversion)
}
/// Get path to specified Postgres binary of this compute.
pub fn get_my_pg_binary(&self, binary: &str) -> String {
self.get_pg_binary(&self.pgversion, binary)
}
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip_all, fields(%lsn))]
@@ -537,7 +579,8 @@ impl ComputeNode {
pub fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let mut sync_handle = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut sync_handle = maybe_cgexec(&postgres)
.args(["--sync-safekeepers"])
.env("PGDATA", &self.pgdata) // we cannot use -D in this mode
.envs(if let Some(storage_auth_token) = &storage_auth_token {
@@ -593,11 +636,7 @@ impl ComputeNode {
/// Do all the preparations like PGDATA directory creation, configuration,
/// safekeepers sync, basebackup, etc.
#[instrument(skip_all)]
pub async fn prepare_pgdata(
&self,
compute_state: &ComputeState,
extension_server_port: u16,
) -> Result<()> {
pub async fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata);
@@ -607,7 +646,7 @@ impl ComputeNode {
config::write_postgres_conf(
&pgdata_path.join("postgresql.conf"),
&pspec.spec,
Some(extension_server_port),
Some(self.http_port),
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -635,6 +674,10 @@ impl ComputeNode {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
ComputeMode::Upgrade => {
info!("Starting upgrade node at latest Pageserver LSN");
Lsn(0)
}
};
info!(
@@ -696,7 +739,7 @@ impl ComputeNode {
symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?;
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary | ComputeMode::Upgrade => {}
ComputeMode::Replica | ComputeMode::Static(..) => {
add_standby_signal(pgdata_path)?;
}
@@ -715,7 +758,7 @@ impl ComputeNode {
// Run initdb to completion
info!("running initdb");
let initdb_bin = Path::new(&self.pgbin).parent().unwrap().join("initdb");
let initdb_bin = self.get_my_pg_binary("initdb");
Command::new(initdb_bin)
.args(["-D", pgdata])
.output()
@@ -731,7 +774,8 @@ impl ComputeNode {
// Start postgres
info!("starting postgres");
let mut pg = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut pg = maybe_cgexec(&postgres)
.args(["-D", pgdata])
.spawn()
.expect("cannot start postgres process");
@@ -764,7 +808,8 @@ impl ComputeNode {
let pgdata_path = Path::new(&self.pgdata);
// Run postgres as a child process.
let mut pg = maybe_cgexec(&self.pgbin)
let postgres = self.get_my_pg_binary("postgres");
let mut pg = maybe_cgexec(&postgres)
.args(["-D", &self.pgdata])
.envs(if let Some(storage_auth_token) = &storage_auth_token {
vec![("NEON_AUTH_TOKEN", storage_auth_token)]
@@ -785,6 +830,152 @@ impl ComputeNode {
Ok((pg, logs_handle))
}
pub async fn upgrade(&self, pg_version: &str) -> Result<()> {
let old_bindir = self.get_my_pg_bindir();
let new_bindir = self.get_pg_bindir(pg_version);
let old_datadir = Utf8Path::new(&self.pgdata);
let parent_dir = old_datadir.parent().unwrap();
let new_datadir = parent_dir.join("new-pgdata");
// Delete the new data directory before attempting, just in case it exists
let _ = std::fs::remove_dir_all(&new_datadir);
// Step 1: Create new cluster
info!(
"Running initdb to start a cluster upgrade from v{} to v{}",
self.pgversion, pg_version
);
let initdb_bin = self.get_pg_binary(pg_version, "initdb");
let mut initdb_cmd = Command::new(&initdb_bin);
initdb_cmd
.args(["--pgdata", new_datadir.as_str()])
.args(["--username", "cloud_admin"])
.args(["--encoding", "utf8"])
.args(["--auth-local", "trust"])
.env_clear()
.stdout(Stdio::inherit())
.stderr(Stdio::inherit());
match initdb_cmd.status() {
Ok(status) => {
if !status.success() {
return Err(anyhow::anyhow!("failed to initialize the new database"));
}
info!("Initialized v{} database", pg_version);
}
Err(_) => {
return Err(anyhow::anyhow!(
"failed to spawn initdb for the new database"
))
}
};
// Step 2: Run pg_upgrade
info!(
"Running pg_upgrade to upgrade from v{} to v{}",
self.pgversion, pg_version
);
let pg_upgrade_bin = self.get_pg_binary(pg_version, "pg_upgrade");
let mut pg_upgrade_cmd = Command::new(&pg_upgrade_bin);
let mut child = pg_upgrade_cmd
.args([
"--old-bindir",
&old_bindir.into_os_string().into_string().unwrap(),
])
.args(["--old-datadir", old_datadir.as_str()])
.args(["--old-options", "-c neon.safekeepers=''"])
.args([
"--new-bindir",
&new_bindir.into_os_string().into_string().unwrap(),
])
.args(["--new-datadir", new_datadir.as_str()])
.args(["--new-options", "-c neon.safekeepers=''"])
.args(["--username", "cloud_admin"])
.arg("--no-transfer")
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()?;
let status = child.wait()?;
if status.success() {
info!("pg_upgrade was successful");
} else {
return Err(anyhow::anyhow!(
"pg_upgrade failed with exit code {}",
status.code().unwrap()
));
}
/* Step 3: Delete the script that pg_upgrade generates, which is created in the current
* working directory
*/
// TODO: We should write a patch for upstream to not generate this file
if cfg!(windows) {
let _ = std::fs::remove_file("delete_old_cluster.bat");
} else {
let _ = std::fs::remove_file("delete_old_cluster.sh");
}
/* Step 4: Re-prepare the pgdata directory to work with the latest basebackup from the
* pageserver
*/
{
let state = self.state.lock().unwrap().clone();
self.prepare_pgdata(&state).await?;
}
// Step 5: Create tarball minus things like pg_dynshm, etc.
info!("Creating tarball of upgraded initdb directory, minus some files");
let initdb_tar_path = parent_dir.join("initdb.tar.zst");
let _ = std::fs::remove_file(&initdb_tar_path);
create_zst_tarball(&new_datadir, &initdb_tar_path).await?;
// Step 6: Write new postgresql.conf file for upgraded initdb
std::fs::copy(
old_datadir.join("postgresql.conf"),
new_datadir.join("postgresql.conf"),
)?;
// Step 7: Write the tarball into the Postgres WAL
info!("Writing initdb.tar.zst to WAL");
let postgres_bin = self.get_my_pg_binary("postgres");
let mut wal_log_cmd = Command::new(&postgres_bin);
child = wal_log_cmd
.args(["--wal-log", initdb_tar_path.as_str()])
.env_clear()
.env("PGDATA", old_datadir.as_str())
.env("NEON_PURPOSE", "upgrade")
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.expect("postgres --wal-log failed to start");
match child.wait() {
Ok(s) => {
if !s.success() {
return Err(anyhow::anyhow!("Could not wal log upgrade tarball"));
}
}
Err(e) => return Err(e.into()),
};
// Step 8: Sync the safekeepers to push WAL record to Neon
self.sync_safekeepers(None)?;
/* Note that whether any errors occur after this are unimportant. ALWAYS return success
* after this point. The compute will be terminated immediately after the upgrade. Remember
* that this is an upgrade-only compute, and it will not accept connections from users.
*/
Ok(())
}
/// Do post configuration of the already started Postgres. This function spawns a background thread to
/// configure the database after applying the compute spec. Currently, it upgrades the neon extension
/// version. In the future, it may upgrade all 3rd-party extensions.
@@ -902,8 +1093,8 @@ impl ComputeNode {
// `pg_ctl` for start / stop.
#[instrument(skip_all)]
fn pg_reload_conf(&self) -> Result<()> {
let pgctl_bin = Path::new(&self.pgbin).parent().unwrap().join("pg_ctl");
Command::new(pgctl_bin)
let pgctl_bin = self.get_my_pg_binary("pg_ctl");
Command::new(&pgctl_bin)
.args(["reload", "-D", &self.pgdata])
.output()
.expect("cannot run pg_ctl process");
@@ -987,43 +1178,25 @@ impl ComputeNode {
Ok(())
}
/// Prepares the compute for Postgres operations, including downloading
/// remote extensions and preparing the pgdata directory.
///
/// The caller must hold the state mutex.
#[instrument(skip_all)]
pub async fn start_compute(
&self,
extension_server_port: u16,
) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
pub async fn prepare_compute(&self) -> Result<()> {
let state = self.state.lock().unwrap().clone();
let pspec = state.pspec.as_ref().expect("spec must be set");
info!(
"starting compute for project {}, operation {}, tenant {}, timeline {}",
"preparing compute for project {}, operation {}, tenant {}, timeline {}",
pspec.spec.cluster.cluster_id.as_deref().unwrap_or("None"),
pspec.spec.operation_uuid.as_deref().unwrap_or("None"),
pspec.tenant_id,
pspec.timeline_id,
);
// tune pgbouncer
if let Some(pgbouncer_settings) = &pspec.spec.pgbouncer_settings {
info!("tuning pgbouncer");
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to create rt");
// Spawn a thread to do the tuning,
// so that we don't block the main thread that starts Postgres.
let pgbouncer_settings = pgbouncer_settings.clone();
let _handle = thread::spawn(move || {
let res = rt.block_on(tune_pgbouncer(pgbouncer_settings));
if let Err(err) = res {
error!("error while tuning pgbouncer: {err:?}");
}
});
}
info!(
"start_compute spec.remote_extensions {:?}",
"prepare_compute spec.remote_extensions {:?}",
pspec.spec.remote_extensions
);
@@ -1031,7 +1204,8 @@ impl ComputeNode {
// remote shared_preload_libraries before postgres start (if any)
if let Some(remote_extensions) = &pspec.spec.remote_extensions {
// First, create control files for all availale extensions
extension_server::create_control_files(remote_extensions, &self.pgbin);
let postgres_bin = self.get_my_pg_binary("postgres");
extension_server::create_control_files(remote_extensions, &postgres_bin);
let library_load_start_time = Utc::now();
let remote_ext_metrics = self.prepare_preload_libraries(&pspec.spec)?;
@@ -1053,8 +1227,17 @@ impl ComputeNode {
info!("{:?}", remote_ext_metrics);
}
self.prepare_pgdata(&compute_state, extension_server_port)
.await?;
self.prepare_pgdata(&state).await?;
self.set_status(ComputeStatus::Prepared);
Ok(())
}
#[instrument(skip_all)]
pub fn start_compute(&self) -> Result<(std::process::Child, std::thread::JoinHandle<()>)> {
let compute_state = self.state.lock().unwrap().clone();
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
let pg_process = self.start_postgres(pspec.storage_auth_token.clone())?;
@@ -1160,9 +1343,11 @@ impl ComputeNode {
core_path.display()
);
let postgres_bin = self.get_my_pg_binary("postgres");
// Try first with gdb
let backtrace = Command::new("gdb")
.args(["--batch", "-q", "-ex", "bt", &self.pgbin])
.args(["--batch", "-q", "-ex", "bt", &postgres_bin])
.arg(&core_path)
.output();
@@ -1295,11 +1480,12 @@ LIMIT 100",
// then we try to download it here
info!("downloading new extension {ext_archive_name}");
let postgres_bin = self.get_my_pg_binary("postgres");
let download_size = extension_server::download_extension(
&real_ext_name,
&ext_path,
ext_remote_storage,
&self.pgbin,
&postgres_bin,
)
.await
.map_err(DownloadError::Other);

View File

@@ -74,7 +74,7 @@ pub fn write_postgres_conf(
}
match spec.mode {
ComputeMode::Primary => {}
ComputeMode::Primary | ComputeMode::Upgrade => {}
ComputeMode::Static(lsn) => {
// hot_standby is 'on' by default, but let's be explicit
writeln!(file, "hot_standby=on")?;

View File

@@ -75,7 +75,6 @@ use anyhow::Result;
use anyhow::{bail, Context};
use bytes::Bytes;
use compute_api::spec::RemoteExtSpec;
use regex::Regex;
use remote_storage::*;
use reqwest::StatusCode;
use std::path::Path;
@@ -103,34 +102,6 @@ fn get_pg_config(argument: &str, pgbin: &str) -> String {
.to_string()
}
pub fn get_pg_version(pgbin: &str) -> String {
// pg_config --version returns a (platform specific) human readable string
// such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
let human_version = get_pg_config("--version", pgbin);
return parse_pg_version(&human_version).to_string();
}
fn parse_pg_version(human_version: &str) -> &str {
// Normal releases have version strings like "PostgreSQL 15.4". But there
// are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
// 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
// configure option, you can tack any string to the version number,
// e.g. "PostgreSQL 15.4foobar".
match Regex::new(r"^PostgreSQL (?<major>\d+).+")
.unwrap()
.captures(human_version)
{
Some(captures) if captures.len() == 2 => match &captures["major"] {
"14" => return "v14",
"15" => return "v15",
"16" => return "v16",
_ => {}
},
_ => {}
}
panic!("Unsuported postgres version {human_version}");
}
// download the archive for a given extension,
// unzip it, and place files in the appropriate locations (share/lib)
pub async fn download_extension(
@@ -255,42 +226,3 @@ async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Res
),
}
}
#[cfg(test)]
mod tests {
use super::parse_pg_version;
#[test]
fn test_parse_pg_version() {
assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
assert_eq!(
parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
"v15"
);
assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
assert_eq!(
parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
"v14"
);
assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
}
#[test]
#[should_panic]
fn test_parse_pg_unsupported_version() {
parse_pg_version("PostgreSQL 13.14");
}
#[test]
#[should_panic]
fn test_parse_pg_incorrect_version_format() {
parse_pg_version("PostgreSQL 14");
}
}

View File

@@ -10,6 +10,7 @@ use crate::catalog::{get_database_schema, get_dbs_and_roles};
use crate::compute::forward_termination_signal;
use crate::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_api::requests::ConfigurationRequest;
use compute_api::requests::UpgradeRequest;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse, GenericAPIError};
use anyhow::Result;
@@ -137,6 +138,20 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
}
}
(&Method::POST, "/upgrade") => {
info!("serving /upgrade POST request");
match handle_upgrade_request(req, compute).await {
Ok(_) => Response::builder()
.status(StatusCode::ACCEPTED)
.body(Body::from("Starting upgrade"))
.unwrap(),
Err((e, status)) => {
error!("error handling /upgrade request: {e}");
render_json_error(&format!("{}", e), status)
}
}
}
(&Method::GET, "/dbs_and_roles") => {
info!("serving /dbs_and_roles GET request",);
match get_dbs_and_roles(compute).await {
@@ -397,6 +412,59 @@ async fn handle_terminate_request(compute: &Arc<ComputeNode>) -> Result<(), (Str
Ok(())
}
async fn handle_upgrade_request(
req: Request<Body>,
compute: &Arc<ComputeNode>,
) -> Result<(), (anyhow::Error, StatusCode)> {
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
let body_str = String::from_utf8(body_bytes.to_vec()).unwrap();
let body = match serde_json::from_str::<UpgradeRequest>(&body_str) {
Ok(r) => r,
Err(e) => return Err((Into::into(e), StatusCode::BAD_REQUEST)),
};
// No sense in trying to upgrade to the same version.
let curr_version = compute.pgversion.clone();
let new_version = body.pg_version;
if curr_version == new_version {
return Err((
anyhow::anyhow!("cannot upgrade endpoint to the same version"),
StatusCode::UNPROCESSABLE_ENTITY,
));
}
// Check that we are in the running state before trying to upgrade.
match compute.get_status() {
ComputeStatus::Prepared => (),
ComputeStatus::Upgrading => {
return Err((
anyhow::anyhow!("upgrade already in progress"),
StatusCode::CONFLICT,
));
}
_ => {
return Err((
anyhow::anyhow!("expected compute to be in the prepared state"),
StatusCode::CONFLICT,
));
}
}
compute.set_status(ComputeStatus::Upgrading);
let c = compute.clone();
tokio::spawn(async move {
if let Err(e) = c.upgrade(&new_version).await {
error!("Failed to upgrade database: {}", e);
}
c.set_status(ComputeStatus::Running);
});
Ok(())
}
// Main Hyper HTTP server function that runs it and blocks waiting on it forever.
#[tokio::main]
async fn serve(port: u16, state: Arc<ComputeNode>) {

View File

@@ -212,6 +212,21 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/GenericError"
/upgrade:
post:
tags:
- Upgrade
summary: Upgrade project to another Postgres major version.
operationId: upgradePostgres
responses:
202:
description: Upgrade request in progress.
409:
description: Upgrade already in progress.
422:
description: Upgrade request could not be processed.
500:
description: Upgrade request failed.
/terminate:
post:

View File

@@ -798,14 +798,19 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let upgrade_only = sub_args
.get_one::<bool>("upgrade-only")
.copied()
.unwrap_or(false);
let allow_multiple = sub_args.get_flag("allow-multiple");
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
(None, false) => ComputeMode::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
let mode = match (lsn, hot_standby, upgrade_only) {
(Some(lsn), false, false) => ComputeMode::Static(lsn),
(None, true, false) => ComputeMode::Replica,
(None, false, false) => ComputeMode::Primary,
(None, false, true) => ComputeMode::Upgrade,
// Seeing this message means we aren't setting conflicts_with on clap arguments.
_ => anyhow::bail!("Invalid command line invocation"),
};
match (mode, hot_standby) {
@@ -1501,7 +1506,8 @@ fn cli() -> Command {
let lsn_arg = Arg::new("lsn")
.long("lsn")
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
.required(false)
.conflicts_with("hot-standby");
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
@@ -1718,6 +1724,18 @@ fn cli() -> Command {
.arg(hot_standby_arg.clone())
.arg(update_catalog)
.arg(allow_multiple.clone())
.arg(
Arg::new("upgrade-only")
.help("Mark this compute as an upgrade compute")
.long("upgrade-only")
.action(ArgAction::SetTrue)
.conflicts_with_all(&[
"config-only",
"hot-standby",
// Perhaps we could offer upgrades at a specific LSN in the future.
"lsn",
])
)
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")

View File

@@ -182,6 +182,7 @@ impl ComputeControlPlane {
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<()> {
// TODO: It really feels like I need to do some protection here
if matches!(mode, ComputeMode::Primary) {
// this check is not complete, as you could have a concurrent attempt at
// creating another primary, both reading the state before checking it here,
@@ -393,6 +394,7 @@ impl Endpoint {
conf.append("recovery_prefetch", "off");
}
}
ComputeMode::Upgrade => {}
}
Ok(conf)
@@ -624,13 +626,16 @@ impl Endpoint {
self.endpoint_path().join("spec.json").to_str().unwrap(),
])
.args([
"--pgbin",
self.env
.pg_bin_dir(self.pg_version)?
.join("postgres")
.to_str()
"--pgroot",
&self
.env
.pg_distrib_dir
.clone()
.into_os_string()
.into_string()
.unwrap(),
])
.args(["--pgversion", &self.pg_version.to_string()])
.stdin(std::process::Stdio::null())
.stderr(logfile.try_clone()?)
.stdout(logfile);
@@ -674,7 +679,7 @@ impl Endpoint {
}
// keep retrying
}
ComputeStatus::Running => {
ComputeStatus::Running | ComputeStatus::Prepared => {
// All good!
break;
}
@@ -688,6 +693,7 @@ impl Endpoint {
);
}
ComputeStatus::Empty
| ComputeStatus::Upgrading
| ComputeStatus::ConfigurationPending
| ComputeStatus::Configuration
| ComputeStatus::TerminationPending

View File

@@ -12,3 +12,9 @@ use serde::Deserialize;
pub struct ConfigurationRequest {
pub spec: ComputeSpec,
}
/// Request body of the /upgrade API
#[derive(Deserialize, Debug)]
pub struct UpgradeRequest {
pub pg_version: String,
}

View File

@@ -44,10 +44,15 @@ pub enum ComputeStatus {
// Compute node has spec and initial startup and
// configuration is in progress.
Init,
// Compute has been prepared, meaning that remote extensions have been
// downloaded and the data directory has been prepared.
Prepared,
// Compute is configured and running.
Running,
// New spec is being applied.
Configuration,
// Compute is upgrading Postgres.
Upgrading,
// Either startup or configuration failed,
// compute will exit soon or is waiting for
// control-plane to terminate it.

View File

@@ -199,6 +199,11 @@ pub enum ComputeMode {
/// Future versions may want to distinguish between replicas with hot standby
/// feedback and other kinds of replication configurations.
Replica,
/// An upgrade-only node
///
/// This node will not accept remote Postgres connections. It's only
/// purpose is to upgrade a timeline.
Upgrade,
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]

View File

@@ -181,9 +181,27 @@ pub const XLOG_NEON_HEAP_UPDATE: u8 = 0x20;
pub const XLOG_NEON_HEAP_HOT_UPDATE: u8 = 0x30;
pub const XLOG_NEON_HEAP_LOCK: u8 = 0x40;
pub const XLOG_NEON_HEAP_MULTI_INSERT: u8 = 0x50;
pub const XLOG_NEON_FILE: u8 = 0x60;
pub const XLOG_NEON_HEAP_VISIBLE: u8 = 0x40;
#[repr(C)]
#[derive(Debug)]
pub enum XlNeonFileFiletype {
UPGRADE_TARBALL,
}
impl TryFrom<u8> for XlNeonFileFiletype {
type Error = ();
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(XlNeonFileFiletype::UPGRADE_TARBALL),
_ => Err(()),
}
}
}
// from xlogreader.h
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;

View File

@@ -506,6 +506,10 @@ async fn import_file(
return Ok(None);
}
if file_name == "pg_internal.init" {
return Ok(None);
}
if file_path.starts_with("global") {
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;

View File

@@ -14,7 +14,7 @@ mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
use arc_swap::ArcSwap;
use bytes::Bytes;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
@@ -4357,6 +4357,10 @@ impl Timeline {
)
}
pub fn get_path(&self) -> Utf8PathBuf {
self.conf.timelines_path(&self.tenant_shard_id)
}
/// Detach this timeline from its ancestor by copying all of ancestors layers as this
/// Timelines layers up to the ancestor_lsn.
///

View File

@@ -24,6 +24,7 @@
use std::time::Duration;
use std::time::SystemTime;
use camino::Utf8PathBuf;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
@@ -33,8 +34,10 @@ use bytes::{Buf, Bytes, BytesMut};
use tracing::*;
use utils::failpoint_support;
use utils::rate_limit::RateLimit;
use utils::zstd::extract_zst_tarball;
use crate::context::RequestContext;
use crate::import_datadir;
use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::{DatadirModification, Version};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -69,7 +72,9 @@ impl CheckPoint {
}
}
pub struct WalIngest {
pub struct WalIngest<'t> {
timeline: &'t Timeline,
timeline_path: Utf8PathBuf,
shard: ShardIdentity,
checkpoint: CheckPoint,
checkpoint_modified: bool,
@@ -82,12 +87,12 @@ struct WarnIngestLag {
timestamp_invalid_msg_ratelimit: RateLimit,
}
impl WalIngest {
impl<'t> WalIngest<'t> {
pub async fn new(
timeline: &Timeline,
timeline: &'t Timeline,
startpoint: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<WalIngest> {
) -> anyhow::Result<WalIngest<'t>> {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
@@ -100,6 +105,8 @@ impl WalIngest {
});
Ok(WalIngest {
timeline,
timeline_path: timeline.get_path(),
shard: *timeline.get_shard_identity(),
checkpoint,
checkpoint_modified: false,
@@ -458,6 +465,18 @@ impl WalIngest {
modification.drop_replorigin(xlrec.node_id).await?
}
}
pg_constants::RM_BTREE_ID
| pg_constants::RM_HASH_ID
| pg_constants::RM_GIN_ID
| pg_constants::RM_GIST_ID
| pg_constants::RM_SEQ_ID
| pg_constants::RM_SPGIST_ID
| pg_constants::RM_BRIN_ID
| pg_constants::RM_COMMIT_TS_ID
| pg_constants::RM_REPLORIGIN_ID
| pg_constants::RM_GENERIC_ID => {
// No special handling currently for these resource managers
}
_x => {
// TODO: should probably log & fail here instead of blindly
// doing something without understanding the protocol
@@ -923,6 +942,33 @@ impl WalIngest {
assert_eq!(decoded.xl_rmid, pg_constants::RM_NEON_ID);
match pg_version {
15 => {
let info = decoded.xl_info;
match info {
pg_constants::XLOG_NEON_FILE => {
info!(
"tristan: last_record_lsn={}",
self.timeline.get_last_record_lsn()
);
let xlrec = v16::rm_neon::XlNeonFile::decode(buf);
let pgdata_path = self.timeline_path.join("new-pgdata");
extract_zst_tarball(&pgdata_path, &*xlrec.data).await?;
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?;
info!("LSN from pg_upgraded controlfile: {lsn}");
Box::pin(import_datadir::import_timeline_from_postgres_datadir(
self.timeline,
&pgdata_path,
lsn,
ctx,
))
.await?;
}
_ => return Err(anyhow::anyhow!("Unknown XLOG xl_info field: {}", info)),
}
}
16 => {
let info = decoded.xl_info & pg_constants::XLOG_HEAP_OPMASK;

View File

@@ -526,6 +526,29 @@ pub mod v16 {
}
}
}
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonFile {
pub filetype: u8,
pub size: u32,
pub data: Bytes,
}
impl XlNeonFile {
pub fn decode(buf: &mut Bytes) -> Self {
let filetype = buf.get_u8();
// Skip the padding
buf.advance(std::mem::size_of::<u8>() * 3);
let size = buf.get_u32_le();
Self {
filetype,
size,
data: buf.copy_to_bytes(buf.remaining()),
}
}
}
}
}

View File

@@ -23,7 +23,18 @@ SHLIB_LINK_INTERNAL = $(libpq)
SHLIB_LINK = -lcurl
EXTENSION = neon
DATA = neon--1.0.sql neon--1.0--1.1.sql neon--1.1--1.2.sql neon--1.2--1.3.sql neon--1.3--1.2.sql neon--1.2--1.1.sql neon--1.1--1.0.sql neon--1.3--1.4.sql neon--1.4--1.3.sql
DATA = \
neon--1.0.sql \
neon--1.0--1.1.sql \
neon--1.1--1.2.sql \
neon--1.2--1.3.sql \
neon--1.3--1.4.sql \
neon--1.4--1.5.sql \
neon--1.5--1.4.sql \
neon--1.4--1.3.sql \
neon--1.3--1.2.sql \
neon--1.2--1.1.sql \
neon--1.1--1.0.sql
PGFILEDESC = "neon - cloud storage for PostgreSQL"
EXTRA_CLEAN = \

View File

@@ -7,3 +7,7 @@ LANGUAGE C PARALLEL SAFE;
GRANT EXECUTE ON FUNCTION approximate_working_set_size_seconds(integer) TO pg_monitor;
CREATE FUNCTION wal_log_file(path text)
RETURNS pg_lsn
AS 'MODULE_PATHNAME', 'wal_log_file'
LANGUAGE C STRICT PARALLEL UNSAFE;

View File

@@ -0,0 +1,8 @@
\echo Use "ALTER EXTENSION neon UPDATE TO '1.5'" to load this file. \quit
CREATE FUNCTION wal_log_file(path text)
RETURNS pg_lsn
AS 'MODULE_PATHNAME', 'wal_log_file'
LANGUAGE C STRICT PARALLEL UNSAFE;
GRANT EXECUTE ON FUNCTION wal_log_file TO pg_monitor;

View File

@@ -0,0 +1 @@
DROP FUNCTION IF EXISTS wal_log_file(text) CASCADE;

View File

@@ -11,6 +11,8 @@
#include "postgres.h"
#include "fmgr.h"
#include <sys/stat.h>
#include "miscadmin.h"
#include "access/subtrans.h"
#include "access/twophase.h"
@@ -29,11 +31,19 @@
#include "tcop/tcopprot.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/xloginsert.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
#include "utils/timeout.h"
#include "utils/wait_event.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#if PG_MAJORVERSION_NUM >= 15
#include "access/neon_xlog.h"
#endif
#include "extension_server.h"
#include "neon.h"
@@ -629,10 +639,17 @@ ReportSearchPath(void)
void
_PG_init(void)
{
const char *purpose;
purpose = getenv("NEON_PURPOSE");
if (purpose && strcmp(purpose, "upgrade") == 0)
return;
/*
* Also load 'neon_rmgr'. This makes it unnecessary to list both 'neon'
* and 'neon_rmgr' in shared_preload_libraries.
*/
#if PG_VERSION_NUM >= 160000
load_file("$libdir/neon_rmgr", false);
#endif
@@ -676,6 +693,9 @@ _PG_init(void)
PG_FUNCTION_INFO_V1(pg_cluster_size);
PG_FUNCTION_INFO_V1(backpressure_lsns);
PG_FUNCTION_INFO_V1(backpressure_throttling_time);
#if PG_MAJORVERSION_NUM >= 15
PG_FUNCTION_INFO_V1(wal_log_file);
#endif
Datum
pg_cluster_size(PG_FUNCTION_ARGS)
@@ -721,3 +741,200 @@ backpressure_throttling_time(PG_FUNCTION_ARGS)
{
PG_RETURN_UINT64(BackpressureThrottlingTime());
}
#if PG_MAJORVERSION_NUM >= 15
Datum
wal_log_file(PG_FUNCTION_ARGS)
{
int rc;
int fd;
ssize_t n;
text *path;
size_t off;
char *data;
short nargs;
struct stat st;
XLogRecPtr lsn;
size_t path_len;
xl_neon_file xlrec;
char file[MAXPGPATH];
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
bool wal_debug;
#endif
path = PG_GETARG_TEXT_PP(0);
path_len = VARSIZE(path) - VARHDRSZ;
memcpy(file, VARDATA(path), path_len);
file[path_len] = '\0';
/* Get the size of the file. Note that stat(2) follows symlinks. */
rc = stat(file, &st);
if (rc != 0)
ereport(ERROR,
(errmsg("failed to get size of file (%s): %m", file)));
xlrec.size = (size_t) st.st_size;
fd = open(file, O_RDONLY);
if (fd == -1)
ereport(ERROR,
(errmsg("could not open %s: %m", file)));
/* If the file is too large, error out. */
data = palloc(xlrec.size);
/* Copy the file contents */
off = 0;
while (true) {
n = read(fd, data + off, xlrec.size - off);
if (n == EOF)
{
close(fd);
ereport(ERROR,
(errmsg("failed to read %s: %m", file)));
}
off += n;
if (xlrec.size - off == 0)
break;
}
close(fd);
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
XLogRegisterData((char *) data, xlrec.size);
/*
* We must turn debugging off on anything where the Neon RMGR is not
* registered. Stash the original value for restoration later.
*/
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
wal_debug = XLOG_DEBUG;
XLOG_DEBUG = false;
#endif
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
XLOG_DEBUG = wal_debug;
#endif
PG_RETURN_LSN(lsn);
}
#endif
#if PG_MAJORVERSION_NUM >= 15
/*
* Entry point for `postgres --wal-log`.
*/
PGDLLEXPORT void
WalLogMain(int argc, char *argv[])
{
int rc;
int fd;
off_t off;
struct stat st;
XLogRecPtr lsn;
void *data;
xl_neon_file xlrec;
/* TODO: should this be PATH_MAX? should we require an absolute path? */
char file[MAXPGPATH];
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
bool wal_debug;
#endif
if (argc != 3)
ereport(ERROR, errmsg("wrong number of arguments passed to --wal-log"));
if (!realpath(argv[2], file))
ereport(ERROR, errmsg("failed to resolve path: %m"));
ereport(LOG, errmsg("writing %s to WAL", file));
ChangeToDataDir();
CreateDataDirLockFile(false);
LocalProcessControlFile(false);
InitializeMaxBackends();
CreateSharedMemoryAndSemaphores();
InitializeTimeouts();
InitProcess();
BaseInit();
CreateAuxProcessResourceOwner();
StartupXLOG();
/* Get the size of the file. Note that stat(2) follows symlinks. */
rc = stat(file, &st);
if (rc != 0)
ereport(ERROR,
(errmsg("failed to get size of file (%s): %m", file)));
xlrec.size = (size_t) st.st_size;
fd = open(file, O_RDONLY);
if (fd == -1)
ereport(ERROR,
(errmsg("could not open %s: %m", file)));
/* If the file is too large, error out. */
data = palloc(xlrec.size);
/* Copy the file contents */
off = 0;
while (true) {
ssize_t n;
n = read(fd, data + off, xlrec.size - off);
if (n == EOF)
{
close(fd);
ereport(ERROR,
(errmsg("failed to read %s: %m", file)));
}
off += n;
if (xlrec.size - off == 0)
break;
}
close(fd);
xlrec.filetype = XL_NEON_FILE_UPGRADE_TARBALL;
/* ereport(LOG, errmsg("Current LSN: %X/%X" , LSN_FORMAT_ARGS(GetXLogWriteRecPtr()))); */
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfNeonFile);
XLogRegisterData(data, xlrec.size);
/*
* We must turn debugging off on anything where the Neon RMGR is not
* registered. Stash the original value for restoration later.
*/
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
wal_debug = XLOG_DEBUG;
XLOG_DEBUG = false;
#endif
lsn = XLogInsert(RM_NEON_ID, XLOG_NEON_FILE);
#if defined(WAL_DEBUG) && PG_MAJORVERSION_NUM < 16
XLOG_DEBUG = wal_debug;
#endif
exit(0);
}
#endif

View File

@@ -78,11 +78,16 @@ neon_smgr_shmem_startup(void)
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
if (relsize_hash_size <= 0)
return;
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
relsize_ctl = (RelSizeHashControl *) ShmemInitStruct("relsize_hash", sizeof(RelSizeHashControl), &found);
if (!found)
{
elog(LOG, "neon_relsize: %d", relsize_hash_size);
relsize_lock = (LWLockId) GetNamedLWLockTranche("neon_relsize");
elog(LOG, "neon_relsize");
info.keysize = sizeof(RelTag);
info.entrysize = sizeof(RelSizeEntry);
relsize_hash = ShmemInitHash("neon_relsize",

View File

@@ -76,6 +76,8 @@ neon_rm_redo(XLogReaderState *record)
case XLOG_NEON_HEAP_MULTI_INSERT:
redo_neon_heap_multi_insert(record);
break;
case XLOG_NEON_FILE:
break;
default:
elog(PANIC, "neon_rm_redo: unknown op code %u", info);
}

View File

@@ -57,6 +57,8 @@ neon_rm_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (SnapBuildProcessChange(builder, xid, buf->origptr))
DecodeNeonMultiInsert(ctx, buf);
break;
case XLOG_NEON_FILE:
break;
default:
elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
break;
@@ -401,4 +403,4 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
}
#endif
#endif

View File

@@ -134,6 +134,16 @@ neon_rm_desc(StringInfo buf, XLogReaderState *record)
xlrec->ntuples, &offset_elem_desc, NULL);
}
}
else if (info == XLOG_NEON_FILE)
{
const xl_neon_file *xlrec = (xl_neon_file *) rec;
switch ((xl_neon_file_filetype) xlrec->filetype)
{
case XL_NEON_FILE_UPGRADE_TARBALL:
appendStringInfo(buf, "filetype: upgrade tarball, size: %zu", xlrec->size);
break;
}
}
}
const char *
@@ -173,6 +183,9 @@ neon_rm_identify(uint8 info)
case XLOG_NEON_HEAP_MULTI_INSERT | XLOG_NEON_INIT_PAGE:
id = "MULTI_INSERT+INIT";
break;
case XLOG_NEON_FILE:
id = "FILE";
break;
}
return id;

View File

@@ -0,0 +1,54 @@
from time import sleep
import pytest
import requests
from fixtures.neon_fixtures import NeonEnv
from fixtures.pg_version import PgVersion
def test_upgrade(pg_version: PgVersion, neon_simple_env: NeonEnv):
env = neon_simple_env
upgrade_to: PgVersion
if pg_version == PgVersion.V14:
upgrade_to = PgVersion.V15
elif pg_version == PgVersion.V15:
upgrade_to = PgVersion.V16
else:
pytest.skip("Nothing to upgrade")
env.neon_cli.create_timeline("test_upgrade")
endpoint = env.endpoints.create_start("test_upgrade")
resp = requests.post(
f"http://localhost:{endpoint.http_port}/upgrade",
json={
"pg_version": upgrade_to,
},
)
assert resp.status_code == 202
while True:
resp = requests.get(f"http://localhost:{endpoint.http_port}/status")
assert resp.status_code == 200
data = resp.json()
if data["status"] == "upgrading":
sleep(1)
continue
elif data["status"] == "running":
break
else:
pytest.fail(f"Unexpected compute state during upgrade: {data['status']}")
endpoint.stop_and_destroy()
def test_upgrade_bad_request(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_timeline("test_upgrade_bad_request")
endpoint = env.endpoints.create_start("test_upgrade_bad_request")
resp = requests.post(f"http://localhost:{endpoint.http_port}/upgrade")
assert resp.status_code == 400
# TODO: Use postgres versions that are out of range.