From 86bf4919817d34a2e56590596eb5f8270ce8b79e Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Wed, 14 Sep 2022 17:09:28 +0300 Subject: [PATCH] Support pg 15 - Split postgres_ffi into two version specific files. - Preserve pg_version in timeline metadata. - Use pg_version in safekeeper code. Check for postgres major version mismatch. - Clean up the code to use DEFAULT_PG_VERSION constant everywhere, instead of hardcoding. - Parameterize python tests: use DEFAULT_PG_VERSION env and pg_version fixture. To run tests using a specific PostgreSQL version, pass the DEFAULT_PG_VERSION environment variable: 'DEFAULT_PG_VERSION='15' ./scripts/pytest test_runner/regress' Currently don't all tests pass, because rust code relies on the default version of PostgreSQL in a few places. --- control_plane/src/bin/neon_local.rs | 95 +++++++++++-- control_plane/src/compute.rs | 49 +++++-- control_plane/src/local_env.rs | 48 +++++-- control_plane/src/storage.rs | 22 ++- libs/postgres_ffi/src/lib.rs | 129 +++++++++++++++++- libs/postgres_ffi/src/nonrelfile_utils.rs | 2 +- libs/postgres_ffi/src/pg_constants.rs | 19 +-- libs/postgres_ffi/src/pg_constants_v14.rs | 5 + libs/postgres_ffi/src/pg_constants_v15.rs | 10 ++ libs/postgres_ffi/src/relfile_utils.rs | 25 ++-- libs/postgres_ffi/src/waldecoder.rs | 49 +------ libs/postgres_ffi/src/xlog_utils.rs | 38 +++++- pageserver/src/basebackup.rs | 82 +++++------ pageserver/src/bin/update_metadata.rs | 2 + pageserver/src/config.rs | 45 ++++-- pageserver/src/http/models.rs | 1 + pageserver/src/http/routes.rs | 1 + pageserver/src/import_datadir.rs | 20 +-- pageserver/src/lib.rs | 2 + pageserver/src/page_service.rs | 31 ++++- pageserver/src/pgdatadir_mapping.rs | 10 +- pageserver/src/reltag.rs | 6 +- pageserver/src/storage_sync.rs | 12 +- pageserver/src/storage_sync/index.rs | 23 +++- pageserver/src/tenant.rs | 49 ++++--- pageserver/src/tenant/metadata.rs | 9 ++ pageserver/src/tenant/timeline.rs | 17 ++- pageserver/src/walingest.rs | 83 +++++++---- .../src/walreceiver/connection_manager.rs | 2 +- .../src/walreceiver/walreceiver_connection.rs | 4 +- pageserver/src/walrecord.rs | 38 ++++-- pageserver/src/walredo.rs | 30 ++-- safekeeper/src/json_ctrl.rs | 11 +- safekeeper/src/safekeeper.rs | 19 ++- safekeeper/src/send_wal.rs | 2 +- safekeeper/src/wal_backup.rs | 3 +- safekeeper/src/wal_storage.rs | 10 +- test_runner/fixtures/neon_fixtures.py | 30 +++- test_runner/regress/test_import.py | 5 + test_runner/regress/test_pg_regress.py | 18 ++- test_runner/regress/test_wal_acceptor.py | 9 +- vendor/postgres-v14 | 2 +- vendor/postgres-v15 | 2 +- 43 files changed, 777 insertions(+), 292 deletions(-) create mode 100644 libs/postgres_ffi/src/pg_constants_v14.rs create mode 100644 libs/postgres_ffi/src/pg_constants_v15.rs diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index e16fd8764a..92782ea235 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -39,6 +39,8 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1); const DEFAULT_BRANCH_NAME: &str = "main"; project_git_version!(GIT_VERSION); +const DEFAULT_PG_VERSION: &str = "14"; + fn default_conf(etcd_binary_path: &Path) -> String { format!( r#" @@ -105,6 +107,13 @@ fn main() -> Result<()> { .takes_value(true) .required(false); + let pg_version_arg = Arg::new("pg-version") + .long("pg-version") + .help("Postgres version to use for the initial tenant") + .required(false) + .takes_value(true) + .default_value(DEFAULT_PG_VERSION); + let port_arg = Arg::new("port") .long("port") .required(false) @@ -146,6 +155,7 @@ fn main() -> Result<()> { .required(false) .value_name("config"), ) + .arg(pg_version_arg.clone()) ) .subcommand( App::new("timeline") @@ -164,7 +174,9 @@ fn main() -> Result<()> { .subcommand(App::new("create") .about("Create a new blank timeline") .arg(tenant_id_arg.clone()) - .arg(branch_name_arg.clone())) + .arg(branch_name_arg.clone()) + .arg(pg_version_arg.clone()) + ) .subcommand(App::new("import") .about("Import timeline from basebackup directory") .arg(tenant_id_arg.clone()) @@ -178,7 +190,9 @@ fn main() -> Result<()> { .arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true) .help("Wal to add after base")) .arg(Arg::new("end-lsn").long("end-lsn").takes_value(true) - .help("Lsn the basebackup ends at"))) + .help("Lsn the basebackup ends at")) + .arg(pg_version_arg.clone()) + ) ).subcommand( App::new("tenant") .setting(AppSettings::ArgRequiredElseHelp) @@ -188,6 +202,7 @@ fn main() -> Result<()> { .arg(tenant_id_arg.clone()) .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) + .arg(pg_version_arg.clone()) ) .subcommand(App::new("config") .arg(tenant_id_arg.clone()) @@ -239,8 +254,9 @@ fn main() -> Result<()> { Arg::new("config-only") .help("Don't do basebackup, create compute node with only config files") .long("config-only") - .required(false) - )) + .required(false)) + .arg(pg_version_arg.clone()) + ) .subcommand(App::new("start") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") .arg(pg_node_arg.clone()) @@ -248,7 +264,9 @@ fn main() -> Result<()> { .arg(branch_name_arg.clone()) .arg(timeline_id_arg.clone()) .arg(lsn_arg.clone()) - .arg(port_arg.clone())) + .arg(port_arg.clone()) + .arg(pg_version_arg.clone()) + ) .subcommand( App::new("stop") .arg(pg_node_arg.clone()) @@ -501,9 +519,16 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { default_conf(&EtcdBroker::locate_etcd()?) }; + let pg_version = init_match + .value_of("pg-version") + .unwrap() + .parse::() + .context("Failed to parse postgres version from the argument string")?; + let mut env = LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?; - env.init().context("Failed to initialize neon repository")?; + env.init(pg_version) + .context("Failed to initialize neon repository")?; let initial_tenant_id = env .default_tenant_id .expect("default_tenant_id should be generated by the `env.init()` call above"); @@ -515,6 +540,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result { Some(initial_tenant_id), initial_timeline_id_arg, &pageserver_config_overrides(init_match), + pg_version, ) .unwrap_or_else(|e| { eprintln!("pageserver init failed: {e}"); @@ -557,8 +583,19 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an // Create an initial timeline for the new tenant let new_timeline_id = parse_timeline_id(create_match)?; - let timeline_info = - pageserver.timeline_create(new_tenant_id, new_timeline_id, None, None)?; + let pg_version = create_match + .value_of("pg-version") + .unwrap() + .parse::() + .context("Failed to parse postgres version from the argument string")?; + + let timeline_info = pageserver.timeline_create( + new_tenant_id, + new_timeline_id, + None, + None, + Some(pg_version), + )?; let new_timeline_id = timeline_info.timeline_id; let last_record_lsn = timeline_info .local @@ -607,7 +644,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - let new_branch_name = create_match .value_of("branch-name") .ok_or_else(|| anyhow!("No branch name provided"))?; - let timeline_info = pageserver.timeline_create(tenant_id, None, None, None)?; + + let pg_version = create_match + .value_of("pg-version") + .unwrap() + .parse::() + .context("Failed to parse postgres version from the argument string")?; + + let timeline_info = + pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?; let new_timeline_id = timeline_info.timeline_id; let last_record_lsn = timeline_info @@ -655,7 +700,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?; println!("Creating node for imported timeline ..."); env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; - cplane.new_node(tenant_id, name, timeline_id, None, None)?; + + let pg_version = import_match + .value_of("pg-version") + .unwrap() + .parse::() + .context("Failed to parse postgres version from the argument string")?; + + cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?; println!("Done"); } Some(("branch", branch_match)) => { @@ -682,6 +734,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - None, start_lsn, Some(ancestor_timeline_id), + None, )?; let new_timeline_id = timeline_info.timeline_id; @@ -797,7 +850,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { Some(p) => Some(p.parse()?), None => None, }; - cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port)?; + + let pg_version = sub_args + .value_of("pg-version") + .unwrap() + .parse::() + .context("Failed to parse postgres version from the argument string")?; + + cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?; } "start" => { let port: Option = match sub_args.value_of("port") { @@ -835,16 +895,23 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { .map(Lsn::from_str) .transpose() .context("Failed to parse Lsn from the request")?; + let pg_version = sub_args + .value_of("pg-version") + .unwrap() + .parse::() + .context("Failed to parse postgres version from the argument string")?; // when used with custom port this results in non obvious behaviour // port is remembered from first start command, i e // start --port X // stop // start <-- will also use port X even without explicit port argument println!( - "Starting new postgres {} on timeline {} ...", - node_name, timeline_id + "Starting new postgres (v{}) {} on timeline {} ...", + pg_version, node_name, timeline_id ); - let node = cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?; + + let node = + cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?; node.start(&auth_token)?; } } diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index b678d620df..89994c5647 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -18,7 +18,7 @@ use utils::{ postgres_backend::AuthType, }; -use crate::local_env::LocalEnv; +use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION}; use crate::postgresql_conf::PostgresConf; use crate::storage::PageServerNode; @@ -81,6 +81,7 @@ impl ComputeControlPlane { timeline_id: TimelineId, lsn: Option, port: Option, + pg_version: u32, ) -> Result> { let port = port.unwrap_or_else(|| self.get_port()); let node = Arc::new(PostgresNode { @@ -93,6 +94,7 @@ impl ComputeControlPlane { lsn, tenant_id, uses_wal_proposer: false, + pg_version, }); node.create_pgdata()?; @@ -118,6 +120,7 @@ pub struct PostgresNode { pub lsn: Option, // if it's a read-only node. None for primary pub tenant_id: TenantId, uses_wal_proposer: bool, + pg_version: u32, } impl PostgresNode { @@ -152,6 +155,14 @@ impl PostgresNode { let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?; let uses_wal_proposer = conf.get("neon.safekeepers").is_some(); + // Read postgres version from PG_VERSION file to determine which postgres version binary to use. + // If it doesn't exist, assume broken data directory and use default pg version. + let pg_version_path = entry.path().join("PG_VERSION"); + + let pg_version_str = + fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string()); + let pg_version = u32::from_str(&pg_version_str)?; + // parse recovery_target_lsn, if any let recovery_target_lsn: Option = conf.parse_field_optional("recovery_target_lsn", &context)?; @@ -167,17 +178,24 @@ impl PostgresNode { lsn: recovery_target_lsn, tenant_id, uses_wal_proposer, + pg_version, }) } - fn sync_safekeepers(&self, auth_token: &Option) -> Result { - let pg_path = self.env.pg_bin_dir().join("postgres"); + fn sync_safekeepers(&self, auth_token: &Option, pg_version: u32) -> Result { + let pg_path = self.env.pg_bin_dir(pg_version).join("postgres"); let mut cmd = Command::new(&pg_path); cmd.arg("--sync-safekeepers") .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env( + "LD_LIBRARY_PATH", + self.env.pg_lib_dir(pg_version).to_str().unwrap(), + ) + .env( + "DYLD_LIBRARY_PATH", + self.env.pg_lib_dir(pg_version).to_str().unwrap(), + ) .env("PGDATA", self.pgdata().to_str().unwrap()) .stdout(Stdio::piped()) // Comment this to avoid capturing stderr (useful if command hangs) @@ -259,8 +277,8 @@ impl PostgresNode { }) } - // Connect to a page server, get base backup, and untar it to initialize a - // new data directory + // Write postgresql.conf with default configuration + // and PG_VERSION file to the data directory of a new node. fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> { let mut conf = PostgresConf::new(); conf.append("max_wal_senders", "10"); @@ -357,6 +375,9 @@ impl PostgresNode { let mut file = File::create(self.pgdata().join("postgresql.conf"))?; file.write_all(conf.to_string().as_bytes())?; + let mut file = File::create(self.pgdata().join("PG_VERSION"))?; + file.write_all(self.pg_version.to_string().as_bytes())?; + Ok(()) } @@ -368,7 +389,7 @@ impl PostgresNode { // latest data from the pageserver. That is a bit clumsy but whole bootstrap // procedure evolves quite actively right now, so let's think about it again // when things would be more stable (TODO). - let lsn = self.sync_safekeepers(auth_token)?; + let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; if lsn == Lsn(0) { None } else { @@ -401,7 +422,7 @@ impl PostgresNode { } fn pg_ctl(&self, args: &[&str], auth_token: &Option) -> Result<()> { - let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); + let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl"); let mut cmd = Command::new(pg_ctl_path); cmd.args( [ @@ -417,8 +438,14 @@ impl PostgresNode { .concat(), ) .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); + .env( + "LD_LIBRARY_PATH", + self.env.pg_lib_dir(self.pg_version).to_str().unwrap(), + ) + .env( + "DYLD_LIBRARY_PATH", + self.env.pg_lib_dir(self.pg_version).to_str().unwrap(), + ); if let Some(token) = auth_token { cmd.env("ZENITH_AUTH_TOKEN", token); } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 7afaad26dc..14bb4cf346 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -20,6 +20,8 @@ use utils::{ use crate::safekeeper::SafekeeperNode; +pub const DEFAULT_PG_VERSION: u32 = 14; + // // This data structures represents neon_local CLI config // @@ -195,12 +197,40 @@ impl Default for SafekeeperConf { } impl LocalEnv { - // postgres installation paths - pub fn pg_bin_dir(&self) -> PathBuf { - self.pg_distrib_dir.join("bin") + pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf { + let mut path = self.pg_distrib_dir.clone(); + + if pg_version != DEFAULT_PG_VERSION { + // step up to the parent directory + // We assume that the pg_distrib subdirs + // for different pg versions + // are located in the same directory + // and follow the naming convention: v14, v15, etc. + path.pop(); + + match pg_version { + 14 => return path.join(format!("v{pg_version}")), + 15 => return path.join(format!("v{pg_version}")), + _ => panic!("Unsupported postgres version: {}", pg_version), + }; + } + + path } - pub fn pg_lib_dir(&self) -> PathBuf { - self.pg_distrib_dir.join("lib") + + pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf { + match pg_version { + 14 => self.pg_distrib_dir(pg_version).join("bin"), + 15 => self.pg_distrib_dir(pg_version).join("bin"), + _ => panic!("Unsupported postgres version: {}", pg_version), + } + } + pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf { + match pg_version { + 14 => self.pg_distrib_dir(pg_version).join("lib"), + 15 => self.pg_distrib_dir(pg_version).join("lib"), + _ => panic!("Unsupported postgres version: {}", pg_version), + } } pub fn pageserver_bin(&self) -> anyhow::Result { @@ -290,6 +320,8 @@ impl LocalEnv { // Find postgres binaries. // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install/v14". + // Note that later in the code we assume, that distrib dirs follow the same pattern + // for all postgres versions. if env.pg_distrib_dir == Path::new("") { if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { env.pg_distrib_dir = postgres_bin.into(); @@ -384,7 +416,7 @@ impl LocalEnv { // // Initialize a new Neon repository // - pub fn init(&mut self) -> anyhow::Result<()> { + pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> { // check if config already exists let base_path = &self.base_data_dir; ensure!( @@ -397,10 +429,10 @@ impl LocalEnv { "directory '{}' already exists. Perhaps already initialized?", base_path.display() ); - if !self.pg_distrib_dir.join("bin/postgres").exists() { + if !self.pg_bin_dir(pg_version).join("postgres").exists() { bail!( "Can't find postgres binary at {}", - self.pg_distrib_dir.display() + self.pg_bin_dir(pg_version).display() ); } for binary in ["pageserver", "safekeeper"] { diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 3bbbdc5865..95ade14fbf 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -112,11 +112,15 @@ impl PageServerNode { create_tenant: Option, initial_timeline_id: Option, config_overrides: &[&str], + pg_version: u32, ) -> anyhow::Result { let id = format!("id={}", self.env.pageserver.id); // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. - let pg_distrib_dir_param = - format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display()); + let pg_distrib_dir_param = format!( + "pg_distrib_dir='{}'", + self.env.pg_distrib_dir(pg_version).display() + ); + let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type); let listen_http_addr_param = format!( "listen_http_addr='{}'", @@ -159,7 +163,7 @@ impl PageServerNode { self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?; let init_result = self - .try_init_timeline(create_tenant, initial_timeline_id) + .try_init_timeline(create_tenant, initial_timeline_id, pg_version) .context("Failed to create initial tenant and timeline for pageserver"); match &init_result { Ok(initial_timeline_id) => { @@ -175,10 +179,16 @@ impl PageServerNode { &self, new_tenant_id: Option, new_timeline_id: Option, + pg_version: u32, ) -> anyhow::Result { let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?; - let initial_timeline_info = - self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?; + let initial_timeline_info = self.timeline_create( + initial_tenant_id, + new_timeline_id, + None, + None, + Some(pg_version), + )?; Ok(initial_timeline_info.timeline_id) } @@ -497,6 +507,7 @@ impl PageServerNode { new_timeline_id: Option, ancestor_start_lsn: Option, ancestor_timeline_id: Option, + pg_version: Option, ) -> anyhow::Result { self.http_request( Method::POST, @@ -506,6 +517,7 @@ impl PageServerNode { new_timeline_id, ancestor_start_lsn, ancestor_timeline_id, + pg_version, }) .send()? .error_from_body()? diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index f43232ed0c..25e1f6029c 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -7,6 +7,8 @@ // https://github.com/rust-lang/rust-bindgen/issues/1651 #![allow(deref_nullptr)] +use bytes::Bytes; +use utils::bin_ser::SerializeError; use utils::lsn::Lsn; macro_rules! postgres_ffi { @@ -24,11 +26,11 @@ macro_rules! postgres_ffi { stringify!($version), ".rs" )); + + include!(concat!("pg_constants_", stringify!($version), ".rs")); } pub mod controlfile_utils; pub mod nonrelfile_utils; - pub mod pg_constants; - pub mod relfile_utils; pub mod waldecoder; pub mod xlog_utils; @@ -44,6 +46,9 @@ macro_rules! postgres_ffi { postgres_ffi!(v14); postgres_ffi!(v15); +pub mod pg_constants; +pub mod relfile_utils; + // Export some widely used datatypes that are unlikely to change across Postgres versions pub use v14::bindings::{uint32, uint64, Oid}; pub use v14::bindings::{BlockNumber, OffsetNumber}; @@ -52,8 +57,11 @@ pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo}; // Likewise for these, although the assumption that these don't change is a little more iffy. pub use v14::bindings::{MultiXactOffset, MultiXactStatus}; +pub use v14::bindings::{PageHeaderData, XLogRecord}; pub use v14::xlog_utils::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD}; +pub use v14::bindings::{CheckPoint, ControlFileData}; + // from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and // --with-segsize=SEGSIZE, but assume the defaults for now. pub const BLCKSZ: u16 = 8192; @@ -63,6 +71,50 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; +// Export some version independent functions that are used outside of this mod +pub use v14::xlog_utils::encode_logical_message; +pub use v14::xlog_utils::get_current_timestamp; +pub use v14::xlog_utils::to_pg_timestamp; +pub use v14::xlog_utils::XLogFileName; + +pub use v14::bindings::DBState_DB_SHUTDOWNED; + +pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool { + if version == 14 { + bimg_info & v14::bindings::BKPIMAGE_IS_COMPRESSED != 0 + } else { + assert_eq!(version, 15); + bimg_info & v15::bindings::BKPIMAGE_COMPRESS_PGLZ != 0 + || bimg_info & v15::bindings::BKPIMAGE_COMPRESS_LZ4 != 0 + || bimg_info & v15::bindings::BKPIMAGE_COMPRESS_ZSTD != 0 + } +} + +pub fn generate_wal_segment( + segno: u64, + system_id: u64, + pg_version: u32, +) -> Result { + match pg_version { + 14 => v14::xlog_utils::generate_wal_segment(segno, system_id), + 15 => v15::xlog_utils::generate_wal_segment(segno, system_id), + _ => Err(SerializeError::BadInput), + } +} + +pub fn generate_pg_control( + pg_control_bytes: &[u8], + checkpoint_bytes: &[u8], + lsn: Lsn, + pg_version: u32, +) -> anyhow::Result<(Bytes, u64)> { + match pg_version { + 14 => v14::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn), + 15 => v15::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn), + _ => anyhow::bail!("Unknown version {}", pg_version), + } +} + // PG timeline is always 1, changing it doesn't have any useful meaning in Neon. // // NOTE: this is not to be confused with Neon timelines; different concept! @@ -74,7 +126,7 @@ pub const PG_TLI: u32 = 1; // See TransactionIdIsNormal in transam.h pub const fn transaction_id_is_normal(id: TransactionId) -> bool { - id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID + id > pg_constants::FIRST_NORMAL_TRANSACTION_ID } // See TransactionIdPrecedes in transam.c @@ -109,3 +161,74 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) { pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); } + +pub mod waldecoder { + + use crate::{v14, v15}; + use bytes::{Buf, Bytes, BytesMut}; + use std::num::NonZeroU32; + use thiserror::Error; + use utils::lsn::Lsn; + + pub enum State { + WaitingForRecord, + ReassemblingRecord { + recordbuf: BytesMut, + contlen: NonZeroU32, + }, + SkippingEverything { + skip_until_lsn: Lsn, + }, + } + + pub struct WalStreamDecoder { + pub lsn: Lsn, + pub pg_version: u32, + pub inputbuf: BytesMut, + pub state: State, + } + + #[derive(Error, Debug, Clone)] + #[error("{msg} at {lsn}")] + pub struct WalDecodeError { + pub msg: String, + pub lsn: Lsn, + } + + impl WalStreamDecoder { + pub fn new(lsn: Lsn, pg_version: u32) -> WalStreamDecoder { + WalStreamDecoder { + lsn, + pg_version, + inputbuf: BytesMut::new(), + state: State::WaitingForRecord, + } + } + + // The latest LSN position fed to the decoder. + pub fn available(&self) -> Lsn { + self.lsn + self.inputbuf.remaining() as u64 + } + + pub fn feed_bytes(&mut self, buf: &[u8]) { + self.inputbuf.extend_from_slice(buf); + } + + pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + match self.pg_version { + 14 => { + use self::v14::waldecoder::WalStreamDecoderHandler; + self.poll_decode_internal() + } + 15 => { + use self::v15::waldecoder::WalStreamDecoderHandler; + self.poll_decode_internal() + } + _ => Err(WalDecodeError { + msg: format!("Unknown version {}", self.pg_version), + lsn: self.lsn, + }), + } + } + } +} diff --git a/libs/postgres_ffi/src/nonrelfile_utils.rs b/libs/postgres_ffi/src/nonrelfile_utils.rs index 1de1d367e0..01e5554b8a 100644 --- a/libs/postgres_ffi/src/nonrelfile_utils.rs +++ b/libs/postgres_ffi/src/nonrelfile_utils.rs @@ -1,7 +1,7 @@ //! //! Common utilities for dealing with PostgreSQL non-relation files. //! -use super::pg_constants; +use crate::pg_constants; use crate::transaction_id_precedes; use bytes::BytesMut; use log::*; diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 8cc9fa7af6..6aaa739a69 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -1,14 +1,16 @@ //! //! Misc constants, copied from PostgreSQL headers. //! +//! Only place version-independent constants here. +//! //! TODO: These probably should be auto-generated using bindgen, //! rather than copied by hand. Although on the other hand, it's nice //! to have them all here in one place, and have the ability to add //! comments on them. //! -use super::bindings::{PageHeaderData, XLogRecord}; use crate::BLCKSZ; +use crate::{PageHeaderData, XLogRecord}; // // From pg_tablespace_d.h @@ -16,14 +18,6 @@ use crate::BLCKSZ; pub const DEFAULTTABLESPACE_OID: u32 = 1663; pub const GLOBALTABLESPACE_OID: u32 = 1664; -// -// Fork numbers, from relpath.h -// -pub const MAIN_FORKNUM: u8 = 0; -pub const FSM_FORKNUM: u8 = 1; -pub const VISIBILITYMAP_FORKNUM: u8 = 2; -pub const INIT_FORKNUM: u8 = 3; - // From storage_xlog.h pub const XLOG_SMGR_CREATE: u8 = 0x10; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; @@ -114,7 +108,6 @@ pub const XLOG_NEXTOID: u8 = 0x30; pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_FPI_FOR_HINT: u8 = 0xA0; pub const XLOG_FPI: u8 = 0xB0; -pub const DB_SHUTDOWNED: u32 = 1; // From multixact.h pub const FIRST_MULTIXACT_ID: u32 = 1; @@ -169,10 +162,6 @@ pub const RM_HEAP_ID: u8 = 10; pub const XLR_INFO_MASK: u8 = 0x0F; pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; -// from dbcommands_xlog.h -pub const XLOG_DBASE_CREATE: u8 = 0x00; -pub const XLOG_DBASE_DROP: u8 = 0x10; - pub const XLOG_TBLSPC_CREATE: u8 = 0x00; pub const XLOG_TBLSPC_DROP: u8 = 0x10; @@ -197,8 +186,6 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous /* Information stored in bimg_info */ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ -pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ -pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ /* From transam.h */ pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; diff --git a/libs/postgres_ffi/src/pg_constants_v14.rs b/libs/postgres_ffi/src/pg_constants_v14.rs new file mode 100644 index 0000000000..810898ee80 --- /dev/null +++ b/libs/postgres_ffi/src/pg_constants_v14.rs @@ -0,0 +1,5 @@ +pub const XLOG_DBASE_CREATE: u8 = 0x00; +pub const XLOG_DBASE_DROP: u8 = 0x10; + +pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ +pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ diff --git a/libs/postgres_ffi/src/pg_constants_v15.rs b/libs/postgres_ffi/src/pg_constants_v15.rs new file mode 100644 index 0000000000..6fa5eb008c --- /dev/null +++ b/libs/postgres_ffi/src/pg_constants_v15.rs @@ -0,0 +1,10 @@ +pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8; + +pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00; +pub const XLOG_DBASE_CREATE_WAL_LOG: u8 = 0x00; +pub const XLOG_DBASE_DROP: u8 = 0x20; + +pub const BKPIMAGE_APPLY: u8 = 0x02; /* page image should be restored during replay */ +pub const BKPIMAGE_COMPRESS_PGLZ: u8 = 0x04; /* page image is compressed */ +pub const BKPIMAGE_COMPRESS_LZ4: u8 = 0x08; /* page image is compressed */ +pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */ diff --git a/libs/postgres_ffi/src/relfile_utils.rs b/libs/postgres_ffi/src/relfile_utils.rs index f3476acc9c..1dc9f367ff 100644 --- a/libs/postgres_ffi/src/relfile_utils.rs +++ b/libs/postgres_ffi/src/relfile_utils.rs @@ -1,10 +1,17 @@ //! //! Common utilities for dealing with PostgreSQL relation files. //! -use super::pg_constants; use once_cell::sync::OnceCell; use regex::Regex; +// +// Fork numbers, from relpath.h +// +pub const MAIN_FORKNUM: u8 = 0; +pub const FSM_FORKNUM: u8 = 1; +pub const VISIBILITYMAP_FORKNUM: u8 = 2; +pub const INIT_FORKNUM: u8 = 3; + #[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] pub enum FilePathError { #[error("invalid relation fork name")] @@ -23,10 +30,10 @@ impl From for FilePathError { pub fn forkname_to_number(forkname: Option<&str>) -> Result { match forkname { // "main" is not in filenames, it's implicit if the fork name is not present - None => Ok(pg_constants::MAIN_FORKNUM), - Some("fsm") => Ok(pg_constants::FSM_FORKNUM), - Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM), - Some("init") => Ok(pg_constants::INIT_FORKNUM), + None => Ok(MAIN_FORKNUM), + Some("fsm") => Ok(FSM_FORKNUM), + Some("vm") => Ok(VISIBILITYMAP_FORKNUM), + Some("init") => Ok(INIT_FORKNUM), Some(_) => Err(FilePathError::InvalidForkName), } } @@ -34,10 +41,10 @@ pub fn forkname_to_number(forkname: Option<&str>) -> Result { /// Convert Postgres fork number to the right suffix of the relation data file. pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { match forknum { - pg_constants::MAIN_FORKNUM => None, - pg_constants::FSM_FORKNUM => Some("fsm"), - pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"), - pg_constants::INIT_FORKNUM => Some("init"), + MAIN_FORKNUM => None, + FSM_FORKNUM => Some("fsm"), + VISIBILITYMAP_FORKNUM => Some("vm"), + INIT_FORKNUM => Some("init"), _ => Some("UNKNOWN FORKNUM"), } } diff --git a/libs/postgres_ffi/src/waldecoder.rs b/libs/postgres_ffi/src/waldecoder.rs index 4d79e4b1d1..5b46d52321 100644 --- a/libs/postgres_ffi/src/waldecoder.rs +++ b/libs/postgres_ffi/src/waldecoder.rs @@ -8,6 +8,7 @@ //! to look deeper into the WAL records to also understand which blocks they modify, the code //! for that is in pageserver/src/walrecord.rs //! +use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder}; use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC}; use super::xlog_utils::*; use crate::WAL_SEGMENT_SIZE; @@ -16,55 +17,19 @@ use crc32c::*; use log::*; use std::cmp::min; use std::num::NonZeroU32; -use thiserror::Error; use utils::lsn::Lsn; -enum State { - WaitingForRecord, - ReassemblingRecord { - recordbuf: BytesMut, - contlen: NonZeroU32, - }, - SkippingEverything { - skip_until_lsn: Lsn, - }, -} - -pub struct WalStreamDecoder { - lsn: Lsn, - inputbuf: BytesMut, - state: State, -} - -#[derive(Error, Debug, Clone)] -#[error("{msg} at {lsn}")] -pub struct WalDecodeError { - msg: String, - lsn: Lsn, +pub trait WalStreamDecoderHandler { + fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>; + fn poll_decode_internal(&mut self) -> Result, WalDecodeError>; + fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>; } // // WalRecordStream is a Stream that returns a stream of WAL records // FIXME: This isn't a proper rust stream // -impl WalStreamDecoder { - pub fn new(lsn: Lsn) -> WalStreamDecoder { - WalStreamDecoder { - lsn, - inputbuf: BytesMut::new(), - state: State::WaitingForRecord, - } - } - - // The latest LSN position fed to the decoder. - pub fn available(&self) -> Lsn { - self.lsn + self.inputbuf.remaining() as u64 - } - - pub fn feed_bytes(&mut self, buf: &[u8]) { - self.inputbuf.extend_from_slice(buf); - } - +impl WalStreamDecoderHandler for WalStreamDecoder { fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> { let validate_impl = || { if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 { @@ -125,7 +90,7 @@ impl WalStreamDecoder { /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid. /// - pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + fn poll_decode_internal(&mut self) -> Result, WalDecodeError> { // Run state machine that validates page headers, and reassembles records // that cross page boundaries. loop { diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index f8606b6e47..8389a6e971 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -9,12 +9,13 @@ use crc32c::crc32c_append; +use super::super::waldecoder::WalStreamDecoder; use super::bindings::{ - CheckPoint, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData, - XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC, + CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz, + XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC, }; -use super::pg_constants; -use super::waldecoder::WalStreamDecoder; +use super::PG_MAJORVERSION; +use crate::pg_constants; use crate::PG_TLI; use crate::{uint32, uint64, Oid}; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; @@ -113,6 +114,30 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn { } } +pub fn generate_pg_control( + pg_control_bytes: &[u8], + checkpoint_bytes: &[u8], + lsn: Lsn, +) -> anyhow::Result<(Bytes, u64)> { + let mut pg_control = ControlFileData::decode(pg_control_bytes)?; + let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?; + + // Generate new pg_control needed for bootstrap + checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0; + + //reset some fields we don't want to preserve + //TODO Check this. + //We may need to determine the value from twophase data. + checkpoint.oldestActiveXid = 0; + + //save new values in pg_control + pg_control.checkPoint = 0; + pg_control.checkPointCopy = checkpoint; + pg_control.state = DBState_DB_SHUTDOWNED; + + Ok((pg_control.encode(), pg_control.system_identifier)) +} + pub fn get_current_timestamp() -> TimestampTz { to_pg_timestamp(SystemTime::now()) } @@ -144,7 +169,10 @@ pub fn find_end_of_wal( let mut result = start_lsn; let mut curr_lsn = start_lsn; let mut buf = [0u8; XLOG_BLCKSZ]; - let mut decoder = WalStreamDecoder::new(start_lsn); + let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); + info!("find_end_of_wal PG_VERSION: {}", pg_version); + + let mut decoder = WalStreamDecoder::new(start_lsn, pg_version); // loop over segments loop { diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index eca6a3c87f..d0a57a473b 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -25,10 +25,10 @@ use tracing::*; use crate::reltag::{RelTag, SlruKind}; use crate::tenant::Timeline; -use postgres_ffi::v14::pg_constants; -use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName}; -use postgres_ffi::v14::{CheckPoint, ControlFileData}; +use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID}; +use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA}; use postgres_ffi::TransactionId; +use postgres_ffi::XLogFileName; use postgres_ffi::PG_TLI; use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE}; use utils::lsn::Lsn; @@ -129,15 +129,15 @@ where // TODO include checksum // Create pgdata subdirs structure - for dir in pg_constants::PGDATA_SUBDIRS.iter() { + for dir in PGDATA_SUBDIRS.iter() { let header = new_tar_header_dir(*dir)?; self.ar.append(&header, &mut io::empty())?; } // Send empty config files. - for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() { + for filepath in PGDATA_SPECIAL_FILES.iter() { if *filepath == "pg_hba.conf" { - let data = pg_constants::PG_HBA.as_bytes(); + let data = PG_HBA.as_bytes(); let header = new_tar_header(filepath, data.len() as u64)?; self.ar.append(&header, data)?; } else { @@ -267,16 +267,12 @@ where None }; - // TODO pass this as a parameter - let pg_version = "14"; + if spcnode == GLOBALTABLESPACE_OID { + let pg_version_str = self.timeline.pg_version.to_string(); + let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?; + self.ar.append(&header, pg_version_str.as_bytes())?; - if spcnode == pg_constants::GLOBALTABLESPACE_OID { - let version_bytes = pg_version.as_bytes(); - let header = new_tar_header("PG_VERSION", version_bytes.len() as u64)?; - self.ar.append(&header, version_bytes)?; - - let header = new_tar_header("global/PG_VERSION", version_bytes.len() as u64)?; - self.ar.append(&header, version_bytes)?; + info!("timeline.pg_version {}", self.timeline.pg_version); if let Some(img) = relmap_img { // filenode map for global tablespace @@ -305,7 +301,7 @@ where return Ok(()); } // User defined tablespaces are not supported - ensure!(spcnode == pg_constants::DEFAULTTABLESPACE_OID); + ensure!(spcnode == DEFAULTTABLESPACE_OID); // Append dir path for each database let path = format!("base/{}", dbnode); @@ -314,9 +310,10 @@ where if let Some(img) = relmap_img { let dst_path = format!("base/{}/PG_VERSION", dbnode); - let version_bytes = pg_version.as_bytes(); - let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; - self.ar.append(&header, version_bytes)?; + + let pg_version_str = self.timeline.pg_version.to_string(); + let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?; + self.ar.append(&header, pg_version_str.as_bytes())?; let relmap_path = format!("base/{}/pg_filenode.map", dbnode); let header = new_tar_header(&relmap_path, img.len() as u64)?; @@ -348,30 +345,6 @@ where // Also send zenith.signal file with extra bootstrap data. // fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { - let checkpoint_bytes = self - .timeline - .get_checkpoint(self.lsn) - .context("failed to get checkpoint bytes")?; - let pg_control_bytes = self - .timeline - .get_control_file(self.lsn) - .context("failed get control bytes")?; - let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; - let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; - - // Generate new pg_control needed for bootstrap - checkpoint.redo = normalize_lsn(self.lsn, WAL_SEGMENT_SIZE).0; - - //reset some fields we don't want to preserve - //TODO Check this. - //We may need to determine the value from twophase data. - checkpoint.oldestActiveXid = 0; - - //save new values in pg_control - pg_control.checkPoint = 0; - pg_control.checkPointCopy = checkpoint; - pg_control.state = pg_constants::DB_SHUTDOWNED; - // add zenith.signal file let mut zenith_signal = String::new(); if self.prev_record_lsn == Lsn(0) { @@ -388,8 +361,23 @@ where zenith_signal.as_bytes(), )?; + let checkpoint_bytes = self + .timeline + .get_checkpoint(self.lsn) + .context("failed to get checkpoint bytes")?; + let pg_control_bytes = self + .timeline + .get_control_file(self.lsn) + .context("failed get control bytes")?; + + let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control( + &pg_control_bytes, + &checkpoint_bytes, + self.lsn, + self.timeline.pg_version, + )?; + //send pg_control - let pg_control_bytes = pg_control.encode(); let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; self.ar.append(&header, &pg_control_bytes[..])?; @@ -398,8 +386,10 @@ where let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE); let wal_file_path = format!("pg_wal/{}", wal_file_name); let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?; - let wal_seg = generate_wal_segment(segno, pg_control.system_identifier) - .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; + + let wal_seg = + postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version) + .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; ensure!(wal_seg.len() == WAL_SEGMENT_SIZE); self.ar.append(&header, &wal_seg[..])?; Ok(()) diff --git a/pageserver/src/bin/update_metadata.rs b/pageserver/src/bin/update_metadata.rs index 16359c2532..e66049c457 100644 --- a/pageserver/src/bin/update_metadata.rs +++ b/pageserver/src/bin/update_metadata.rs @@ -50,6 +50,7 @@ fn main() -> Result<()> { meta.ancestor_lsn(), meta.latest_gc_cutoff_lsn(), meta.initdb_lsn(), + meta.pg_version(), ); update_meta = true; } @@ -62,6 +63,7 @@ fn main() -> Result<()> { meta.ancestor_lsn(), meta.latest_gc_cutoff_lsn(), meta.initdb_lsn(), + meta.pg_version(), ); update_meta = true; } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 945ee098ea..a4346c0190 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -21,6 +21,7 @@ use utils::{ use crate::tenant::TIMELINES_SEGMENT_NAME; use crate::tenant_config::{TenantConf, TenantConfOpt}; +use crate::DEFAULT_PG_VERSION; /// The name of the metadata file pageserver creates per timeline. pub const METADATA_FILE_NAME: &str = "metadata"; @@ -209,7 +210,7 @@ impl Default for PageServerConfigBuilder { workdir: Set(PathBuf::new()), pg_distrib_dir: Set(env::current_dir() .expect("cannot access current directory") - .join("pg_install/v14")), + .join(format!("pg_install/v{}", DEFAULT_PG_VERSION))), auth_type: Set(AuthType::Trust), auth_validation_public_key_path: Set(None), remote_storage_config: Set(None), @@ -374,13 +375,40 @@ impl PageServerConf { // // Postgres distribution paths // + pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf { + let mut path = self.pg_distrib_dir.clone(); - pub fn pg_bin_dir(&self) -> PathBuf { - self.pg_distrib_dir.join("bin") + if pg_version != DEFAULT_PG_VERSION { + // step up to the parent directory + // We assume that the pg_distrib subdirs + // for different pg versions + // are located in the same directory + // and follow the naming convention: v14, v15, etc. + path.pop(); + + match pg_version { + 14 => return path.join(format!("v{pg_version}")), + 15 => return path.join(format!("v{pg_version}")), + _ => panic!("Unsupported postgres version: {}", pg_version), + }; + } + + path } - pub fn pg_lib_dir(&self) -> PathBuf { - self.pg_distrib_dir.join("lib") + pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf { + match pg_version { + 14 => self.pg_distrib_dir(pg_version).join("bin"), + 15 => self.pg_distrib_dir(pg_version).join("bin"), + _ => panic!("Unsupported postgres version: {}", pg_version), + } + } + pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf { + match pg_version { + 14 => self.pg_distrib_dir(pg_version).join("lib"), + 15 => self.pg_distrib_dir(pg_version).join("lib"), + _ => panic!("Unsupported postgres version: {}", pg_version), + } } /// Parse a configuration file (pageserver.toml) into a PageServerConf struct, @@ -449,10 +477,11 @@ impl PageServerConf { ); } - if !conf.pg_distrib_dir.join("bin/postgres").exists() { + let pg_version = DEFAULT_PG_VERSION; + if !conf.pg_bin_dir(pg_version).join("postgres").exists() { bail!( "Can't find postgres binary at {}", - conf.pg_distrib_dir.display() + conf.pg_bin_dir(pg_version).display() ); } @@ -863,7 +892,7 @@ broker_endpoints = ['{broker_endpoint}'] let workdir = tempdir_path.join("workdir"); fs::create_dir_all(&workdir)?; - let pg_distrib_dir = tempdir_path.join("pg_distrib"); + let pg_distrib_dir = tempdir_path.join(format!("pg_distrib/v{DEFAULT_PG_VERSION}")); fs::create_dir_all(&pg_distrib_dir)?; let postgres_bin_dir = pg_distrib_dir.join("bin"); fs::create_dir_all(&postgres_bin_dir)?; diff --git a/pageserver/src/http/models.rs b/pageserver/src/http/models.rs index 2d7d560d2a..851fa881a0 100644 --- a/pageserver/src/http/models.rs +++ b/pageserver/src/http/models.rs @@ -21,6 +21,7 @@ pub struct TimelineCreateRequest { #[serde(default)] #[serde_as(as = "Option")] pub ancestor_start_lsn: Option, + pub pg_version: Option, } #[serde_as] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index c676dfacd2..6892c0b391 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -173,6 +173,7 @@ async fn timeline_create_handler(mut request: Request) -> Result { // Created. Construct a TimelineInfo for it. diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index c1e736d552..23c4351b4e 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -16,11 +16,13 @@ use crate::reltag::{RelTag, SlruKind}; use crate::tenant::Timeline; use crate::walingest::WalIngest; use crate::walrecord::DecodedWALRecord; -use postgres_ffi::v14::relfile_utils::*; -use postgres_ffi::v14::waldecoder::*; -use postgres_ffi::v14::xlog_utils::*; -use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; +use postgres_ffi::pg_constants; +use postgres_ffi::relfile_utils::*; +use postgres_ffi::waldecoder::WalStreamDecoder; +use postgres_ffi::ControlFileData; +use postgres_ffi::DBState_DB_SHUTDOWNED; use postgres_ffi::Oid; +use postgres_ffi::XLogFileName; use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE}; use utils::lsn::Lsn; @@ -236,7 +238,7 @@ fn import_slru( /// Scan PostgreSQL WAL files in given directory and load all records between /// 'startpoint' and 'endpoint' into the repository. fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> { - let mut waldecoder = WalStreamDecoder::new(startpoint); + let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version); let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE); @@ -354,7 +356,7 @@ pub fn import_wal_from_tar( end_lsn: Lsn, ) -> Result<()> { // Set up walingest mutable state - let mut waldecoder = WalStreamDecoder::new(start_lsn); + let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version); let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE); let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE); let mut last_lsn = start_lsn; @@ -439,7 +441,7 @@ fn import_file( len: usize, ) -> Result> { if file_path.starts_with("global") { - let spcnode = pg_constants::GLOBALTABLESPACE_OID; + let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; let dbnode = 0; match file_path @@ -467,7 +469,7 @@ fn import_file( debug!("imported relmap file") } "PG_VERSION" => { - debug!("ignored"); + debug!("ignored PG_VERSION file"); } _ => { import_rel(modification, file_path, spcnode, dbnode, reader, len)?; @@ -495,7 +497,7 @@ fn import_file( debug!("imported relmap file") } "PG_VERSION" => { - debug!("ignored"); + debug!("ignored PG_VERSION file"); } _ => { import_rel(modification, file_path, spcnode, dbnode, reader, len)?; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index e918a39457..0bd5e242d3 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -36,6 +36,8 @@ use crate::task_mgr::TaskKind; /// format, bump this! pub const STORAGE_FORMAT_VERSION: u16 = 3; +pub const DEFAULT_PG_VERSION: u32 = 14; + // Magic constants used to identify different kinds of files pub const IMAGE_FILE_MAGIC: u16 = 0x5A60; pub const DELTA_FILE_MAGIC: u16 = 0x5A61; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7de6403b83..fed5d0dcc4 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -43,9 +43,9 @@ use crate::task_mgr::TaskKind; use crate::tenant::Timeline; use crate::tenant_mgr; use crate::CheckpointConfig; -use postgres_ffi::v14::xlog_utils::to_pg_timestamp; -use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID; +use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID; +use postgres_ffi::to_pg_timestamp; use postgres_ffi::BLCKSZ; // Wrapped in libpq CopyData @@ -498,12 +498,16 @@ impl PageServerHandler { timeline_id: TimelineId, base_lsn: Lsn, _end_lsn: Lsn, + pg_version: u32, ) -> anyhow::Result<()> { task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); // Create empty timeline info!("creating new timeline"); - let timeline = tenant_mgr::get_tenant(tenant_id, true)? - .create_empty_timeline(timeline_id, base_lsn)?; + let timeline = tenant_mgr::get_tenant(tenant_id, true)?.create_empty_timeline( + timeline_id, + base_lsn, + pg_version, + )?; // TODO mark timeline as not ready until it reaches end_lsn. // We might have some wal to import as well, and we should prevent compute @@ -958,16 +962,31 @@ impl postgres_backend_async::Handler for PageServerHandler { // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN" let (_, params_raw) = query_string.split_at("import basebackup ".len()); let params = params_raw.split_whitespace().collect::>(); - ensure!(params.len() == 4); + ensure!(params.len() >= 4); let tenant_id = TenantId::from_str(params[0])?; let timeline_id = TimelineId::from_str(params[1])?; let base_lsn = Lsn::from_str(params[2])?; let end_lsn = Lsn::from_str(params[3])?; + let pg_version = if params.len() == 5 { + u32::from_str(params[4])? + } else { + // If version is not provided, assume default. + // TODO: this may lead to weird errors if the version is wrong. + crate::DEFAULT_PG_VERSION + }; + self.check_permission(Some(tenant_id))?; match self - .handle_import_basebackup(pgb, tenant_id, timeline_id, base_lsn, end_lsn) + .handle_import_basebackup( + pgb, + tenant_id, + timeline_id, + base_lsn, + end_lsn, + pg_version, + ) .await { Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 9d4b438dc4..fc9867dc05 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,7 +13,7 @@ use crate::tenant::Timeline; use crate::walrecord::NeonWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; -use postgres_ffi::v14::pg_constants; +use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; use postgres_ffi::{Oid, TimestampTz, TransactionId}; use serde::{Deserialize, Serialize}; @@ -125,8 +125,7 @@ impl Timeline { return Ok(nblocks); } - if (tag.forknum == pg_constants::FSM_FORKNUM - || tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM) + if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM) && !self.get_rel_exists(tag, lsn, latest)? { // FIXME: Postgres sometimes calls smgrcreate() to create @@ -1090,6 +1089,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // 03 misc // controlfile // checkpoint +// pg_version // // Below is a full list of the keyspace allocation: // @@ -1128,7 +1128,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); // // Checkpoint: // 03 00000000 00000000 00000000 00 00000001 - //-- Section 01: relation data and metadata const DBDIR_KEY: Key = Key { @@ -1402,8 +1401,9 @@ fn is_slru_block_key(key: Key) -> bool { pub fn create_test_timeline( tenant: &crate::tenant::Tenant, timeline_id: utils::id::TimelineId, + pg_version: u32, ) -> Result> { - let tline = tenant.create_empty_timeline(timeline_id, Lsn(8))?; + let tline = tenant.create_empty_timeline(timeline_id, Lsn(8), pg_version)?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; m.commit()?; diff --git a/pageserver/src/reltag.rs b/pageserver/src/reltag.rs index e3d08f8b3d..43d38bd986 100644 --- a/pageserver/src/reltag.rs +++ b/pageserver/src/reltag.rs @@ -2,8 +2,8 @@ use serde::{Deserialize, Serialize}; use std::cmp::Ordering; use std::fmt; -use postgres_ffi::v14::pg_constants; -use postgres_ffi::v14::relfile_utils::forknumber_to_name; +use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID; +use postgres_ffi::relfile_utils::forknumber_to_name; use postgres_ffi::Oid; /// @@ -78,7 +78,7 @@ impl fmt::Display for RelTag { impl RelTag { pub fn to_segfile_name(&self, segno: u32) -> String { - let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID { + let mut name = if self.spcnode == GLOBALTABLESPACE_OID { "global/".to_string() } else { format!("base/{}/", self.dbnode) diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index 489d0ad4ed..892a34a76f 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -1445,7 +1445,17 @@ mod test_utils { } pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata { - TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0)) + TimelineMetadata::new( + disk_consistent_lsn, + None, + None, + Lsn(0), + Lsn(0), + Lsn(0), + // Any version will do + // but it should be consistent with the one in the tests + crate::DEFAULT_PG_VERSION, + ) } } diff --git a/pageserver/src/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs index 13495ffefe..db37c7b411 100644 --- a/pageserver/src/storage_sync/index.rs +++ b/pageserver/src/storage_sync/index.rs @@ -341,13 +341,21 @@ mod tests { use super::*; use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; + use crate::DEFAULT_PG_VERSION; #[test] fn index_part_conversion() { let harness = TenantHarness::create("index_part_conversion").unwrap(); let timeline_path = harness.timeline_path(&TIMELINE_ID); - let metadata = - TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1)); + let metadata = TimelineMetadata::new( + Lsn(5).align(), + Some(Lsn(4)), + None, + Lsn(3), + Lsn(2), + Lsn(1), + DEFAULT_PG_VERSION, + ); let remote_timeline = RemoteTimeline { timeline_layers: HashSet::from([ timeline_path.join("layer_1"), @@ -464,8 +472,15 @@ mod tests { fn index_part_conversion_negatives() { let harness = TenantHarness::create("index_part_conversion_negatives").unwrap(); let timeline_path = harness.timeline_path(&TIMELINE_ID); - let metadata = - TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1)); + let metadata = TimelineMetadata::new( + Lsn(5).align(), + Some(Lsn(4)), + None, + Lsn(3), + Lsn(2), + Lsn(1), + DEFAULT_PG_VERSION, + ); let conversion_result = IndexPart::from_remote_timeline( &timeline_path, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ca97796870..5860e13534 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -171,6 +171,7 @@ impl Tenant { &self, new_timeline_id: TimelineId, initdb_lsn: Lsn, + pg_version: u32, ) -> Result> { // XXX: keep the lock to avoid races during timeline creation let mut timelines = self.timelines.lock().unwrap(); @@ -186,7 +187,7 @@ impl Tenant { } let new_metadata = - TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn); + TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn, pg_version,); let new_timeline = self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?; new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); @@ -387,6 +388,11 @@ impl Tenant { let mut timelines_accessor = self.timelines.lock().unwrap(); for (timeline_id, metadata) in sorted_timelines { + info!( + "Attaching timeline {} pg_version {}", + timeline_id, + metadata.pg_version() + ); let timeline = self .initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor) .with_context(|| format!("Failed to initialize timeline {timeline_id}"))?; @@ -613,7 +619,7 @@ impl Tenant { }; let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn(); - + let pg_version = new_metadata.pg_version(); let new_timeline = Arc::new(Timeline::new( self.conf, Arc::clone(&self.tenant_conf), @@ -623,6 +629,7 @@ impl Tenant { self.tenant_id, Arc::clone(&self.walredo_mgr), self.upload_layers, + pg_version, )); new_timeline @@ -984,6 +991,7 @@ impl Tenant { start_lsn, *src_timeline.latest_gc_cutoff_lsn.read(), src_timeline.initdb_lsn, + src_timeline.pg_version, ); let new_timeline = self.create_initialized_timeline(dst, metadata, &mut timelines)?; info!("branched timeline {dst} from {src} at {start_lsn}"); @@ -1319,6 +1327,7 @@ pub mod harness { lsn: Lsn, base_img: Option, records: Vec<(Lsn, NeonWalRecord)>, + _pg_version: u32, ) -> Result { let s = format!( "redo for {} to get to {}, with {} and {} records", @@ -1345,6 +1354,7 @@ mod tests { use crate::keyspace::KeySpaceAccum; use crate::repository::{Key, Value}; use crate::tenant::harness::*; + use crate::DEFAULT_PG_VERSION; use bytes::BytesMut; use hex_literal::hex; use once_cell::sync::Lazy; @@ -1356,7 +1366,7 @@ mod tests { #[test] fn test_basic() -> Result<()> { let tenant = TenantHarness::create("test_basic")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -1378,9 +1388,9 @@ mod tests { #[test] fn no_duplicate_timelines() -> Result<()> { let tenant = TenantHarness::create("no_duplicate_timelines")?.load(); - let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; - match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0)) { + match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION) { Ok(_) => panic!("duplicate timeline creation should fail"), Err(e) => assert_eq!( e.to_string(), @@ -1404,7 +1414,7 @@ mod tests { #[test] fn test_branch() -> Result<()> { let tenant = TenantHarness::create("test_branch")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; let writer = tline.writer(); use std::str::from_utf8; @@ -1499,7 +1509,7 @@ mod tests { let tenant = TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? .load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; make_some_layers(tline.as_ref(), Lsn(0x20))?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 @@ -1529,7 +1539,7 @@ mod tests { let tenant = TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load(); - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?; + tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)?; // try to branch at lsn 0x25, should fail because initdb lsn is 0x50 match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { Ok(_) => panic!("branching should have failed"), @@ -1555,7 +1565,7 @@ mod tests { RepoHarness::create("test_prohibit_get_for_garbage_collected_data")? .load(); - let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; make_some_layers(tline.as_ref(), Lsn(0x20))?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; @@ -1573,7 +1583,7 @@ mod tests { fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> { let tenant = TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; make_some_layers(tline.as_ref(), Lsn(0x20))?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; @@ -1590,7 +1600,7 @@ mod tests { fn test_parent_keeps_data_forever_after_branching() -> Result<()> { let tenant = TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; make_some_layers(tline.as_ref(), Lsn(0x20))?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; @@ -1618,7 +1628,8 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; { let tenant = harness.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?; + let tline = + tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?; make_some_layers(tline.as_ref(), Lsn(0x8000))?; tline.checkpoint(CheckpointConfig::Forced)?; } @@ -1638,7 +1649,7 @@ mod tests { // create two timelines { let tenant = harness.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; make_some_layers(tline.as_ref(), Lsn(0x20))?; tline.checkpoint(CheckpointConfig::Forced)?; @@ -1674,7 +1685,7 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; let tenant = harness.load(); - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; drop(tenant); let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME); @@ -1711,7 +1722,7 @@ mod tests { #[test] fn test_images() -> Result<()> { let tenant = TenantHarness::create("test_images")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -1761,7 +1772,7 @@ mod tests { #[test] fn test_bulk_insert() -> Result<()> { let tenant = TenantHarness::create("test_bulk_insert")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; let mut lsn = Lsn(0x10); @@ -1801,7 +1812,7 @@ mod tests { #[test] fn test_random_updates() -> Result<()> { let tenant = TenantHarness::create("test_random_updates")?.load(); - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; const NUM_KEYS: usize = 1000; @@ -1871,7 +1882,7 @@ mod tests { #[test] fn test_traverse_branches() -> Result<()> { let tenant = TenantHarness::create("test_traverse_branches")?.load(); - let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; const NUM_KEYS: usize = 1000; @@ -1950,7 +1961,7 @@ mod tests { #[test] fn test_traverse_ancestors() -> Result<()> { let tenant = TenantHarness::create("test_traverse_ancestors")?.load(); - let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; + let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?; const NUM_KEYS: usize = 100; const NUM_TLINES: usize = 50; diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 606acbf2f1..41790b4d11 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -63,6 +63,7 @@ struct TimelineMetadataBody { ancestor_lsn: Lsn, latest_gc_cutoff_lsn: Lsn, initdb_lsn: Lsn, + pg_version: u32, } impl TimelineMetadata { @@ -73,6 +74,7 @@ impl TimelineMetadata { ancestor_lsn: Lsn, latest_gc_cutoff_lsn: Lsn, initdb_lsn: Lsn, + pg_version: u32, ) -> Self { Self { hdr: TimelineMetadataHeader { @@ -87,6 +89,7 @@ impl TimelineMetadata { ancestor_lsn, latest_gc_cutoff_lsn, initdb_lsn, + pg_version, }, } } @@ -160,6 +163,10 @@ impl TimelineMetadata { pub fn initdb_lsn(&self) -> Lsn { self.body.initdb_lsn } + + pub fn pg_version(&self) -> u32 { + self.body.pg_version + } } /// Save timeline metadata to file @@ -212,6 +219,8 @@ mod tests { Lsn(0), Lsn(0), Lsn(0), + // Any version will do here, so use the default + crate::DEFAULT_PG_VERSION, ); let metadata_bytes = original_metadata diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 6de1d44876..019de81d64 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -37,7 +37,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::reltag::RelTag; use crate::tenant_config::TenantConfOpt; -use postgres_ffi::v14::xlog_utils::to_pg_timestamp; +use postgres_ffi::to_pg_timestamp; use utils::{ id::{TenantId, TimelineId}, lsn::{AtomicLsn, Lsn, RecordLsn}, @@ -61,6 +61,8 @@ pub struct Timeline { pub tenant_id: TenantId, pub timeline_id: TimelineId, + pub pg_version: u32, + pub layers: RwLock, last_freeze_at: AtomicLsn, @@ -533,6 +535,7 @@ impl Timeline { tenant_id: TenantId, walredo_mgr: Arc, upload_layers: bool, + pg_version: u32, ) -> Timeline { let disk_consistent_lsn = metadata.disk_consistent_lsn(); @@ -541,6 +544,7 @@ impl Timeline { tenant_conf, timeline_id, tenant_id, + pg_version, layers: RwLock::new(LayerMap::default()), walredo_mgr, @@ -1260,6 +1264,7 @@ impl Timeline { self.ancestor_lsn, *self.latest_gc_cutoff_lsn.read(), self.initdb_lsn, + self.pg_version, ); fail_point!("checkpoint-before-saving-metadata", |x| bail!( @@ -2133,9 +2138,13 @@ impl Timeline { let last_rec_lsn = data.records.last().unwrap().0; - let img = - self.walredo_mgr - .request_redo(key, request_lsn, base_img, data.records)?; + let img = self.walredo_mgr.request_redo( + key, + request_lsn, + base_img, + data.records, + self.pg_version, + )?; if img.len() == page_cache::PAGE_SZ { let cache = page_cache::get(); diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index bede4ac13e..1d5cab38b9 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -34,8 +34,9 @@ use crate::pgdatadir_mapping::*; use crate::reltag::{RelTag, SlruKind}; use crate::tenant::Timeline; use crate::walrecord::*; +use postgres_ffi::pg_constants; +use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment; -use postgres_ffi::v14::pg_constants; use postgres_ffi::v14::xlog_utils::*; use postgres_ffi::v14::CheckPoint; use postgres_ffi::TransactionId; @@ -82,7 +83,8 @@ impl<'a> WalIngest<'a> { decoded: &mut DecodedWALRecord, ) -> Result<()> { modification.lsn = lsn; - decode_wal_record(recdata, decoded).context("failed decoding wal record")?; + decode_wal_record(recdata, decoded, self.timeline.pg_version) + .context("failed decoding wal record")?; let mut buf = decoded.record.clone(); buf.advance(decoded.main_data_offset); @@ -113,18 +115,49 @@ impl<'a> WalIngest<'a> { let truncate = XlSmgrTruncate::decode(&mut buf); self.ingest_xlog_smgr_truncate(modification, &truncate)?; } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { - if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_DBASE_CREATE - { - let createdb = XlCreateDatabase::decode(&mut buf); - self.ingest_xlog_dbase_create(modification, &createdb)?; - } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) - == pg_constants::XLOG_DBASE_DROP - { - let dropdb = XlDropDatabase::decode(&mut buf); - for tablespace_id in dropdb.tablespace_ids { - trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); - modification.drop_dbdir(tablespace_id, dropdb.db_id)?; + debug!( + "handle RM_DBASE_ID for Postgres version {:?}", + self.timeline.pg_version + ); + if self.timeline.pg_version == 14 { + if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE + { + let createdb = XlCreateDatabase::decode(&mut buf); + debug!("XLOG_DBASE_CREATE v14"); + + self.ingest_xlog_dbase_create(modification, &createdb)?; + } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == postgres_ffi::v14::bindings::XLOG_DBASE_DROP + { + let dropdb = XlDropDatabase::decode(&mut buf); + for tablespace_id in dropdb.tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); + modification.drop_dbdir(tablespace_id, dropdb.db_id)?; + } + } + } else if self.timeline.pg_version == 15 { + if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG + { + debug!("XLOG_DBASE_CREATE_WAL_LOG: noop"); + } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY + { + // The XLOG record was renamed between v14 and v15, + // but the record format is the same. + // So we can reuse XlCreateDatabase here. + debug!("XLOG_DBASE_CREATE_FILE_COPY"); + let createdb = XlCreateDatabase::decode(&mut buf); + self.ingest_xlog_dbase_create(modification, &createdb)?; + } else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) + == postgres_ffi::v15::bindings::XLOG_DBASE_DROP + { + let dropdb = XlDropDatabase::decode(&mut buf); + for tablespace_id in dropdb.tablespace_ids { + trace!("Drop db {}, {}", tablespace_id, dropdb.db_id); + modification.drop_dbdir(tablespace_id, dropdb.db_id)?; + } } } } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { @@ -291,7 +324,7 @@ impl<'a> WalIngest<'a> { && (decoded.xl_info == pg_constants::XLOG_FPI || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) // compression of WAL is not yet supported: fall back to storing the original WAL record - && (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 + && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version) { // Extract page image from FPI record let img_len = blk.bimg_len as usize; @@ -392,7 +425,7 @@ impl<'a> WalIngest<'a> { // Clear the VM bits if required. if new_heap_blkno.is_some() || old_heap_blkno.is_some() { let vm_rel = RelTag { - forknum: pg_constants::VISIBILITYMAP_FORKNUM, + forknum: VISIBILITYMAP_FORKNUM, spcnode: decoded.blocks[0].rnode_spcnode, dbnode: decoded.blocks[0].rnode_dbnode, relnode: decoded.blocks[0].rnode_relnode, @@ -568,7 +601,7 @@ impl<'a> WalIngest<'a> { spcnode, dbnode, relnode, - forknum: pg_constants::MAIN_FORKNUM, + forknum: MAIN_FORKNUM, }; self.put_rel_truncation(modification, rel, rec.blkno)?; } @@ -577,7 +610,7 @@ impl<'a> WalIngest<'a> { spcnode, dbnode, relnode, - forknum: pg_constants::FSM_FORKNUM, + forknum: FSM_FORKNUM, }; // FIXME: 'blkno' stored in the WAL record is the new size of the @@ -600,7 +633,7 @@ impl<'a> WalIngest<'a> { spcnode, dbnode, relnode, - forknum: pg_constants::VISIBILITYMAP_FORKNUM, + forknum: VISIBILITYMAP_FORKNUM, }; // FIXME: Like with the FSM above, the logic to truncate the VM @@ -672,7 +705,7 @@ impl<'a> WalIngest<'a> { )?; for xnode in &parsed.xnodes { - for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { + for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM { let rel = RelTag { forknum, spcnode: xnode.spcnode, @@ -1032,6 +1065,8 @@ mod tests { use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT; use postgres_ffi::RELSEG_SIZE; + use crate::DEFAULT_PG_VERSION; + /// Arbitrary relation tag, for testing. const TESTREL_A: RelTag = RelTag { spcnode: 0, @@ -1059,7 +1094,7 @@ mod tests { #[test] fn test_relsize() -> Result<()> { let tenant = TenantHarness::create("test_relsize")?.load(); - let tline = create_test_timeline(&tenant, TIMELINE_ID)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&*tline)?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1187,7 +1222,7 @@ mod tests { #[test] fn test_drop_extend() -> Result<()> { let tenant = TenantHarness::create("test_drop_extend")?.load(); - let tline = create_test_timeline(&tenant, TIMELINE_ID)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&*tline)?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1227,7 +1262,7 @@ mod tests { #[test] fn test_truncate_extend() -> Result<()> { let tenant = TenantHarness::create("test_truncate_extend")?.load(); - let tline = create_test_timeline(&tenant, TIMELINE_ID)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&*tline)?; // Create a 20 MB relation (the size is arbitrary) @@ -1315,7 +1350,7 @@ mod tests { #[test] fn test_large_rel() -> Result<()> { let tenant = TenantHarness::create("test_large_rel")?.load(); - let tline = create_test_timeline(&tenant, TIMELINE_ID)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?; let mut walingest = init_walingest_test(&*tline)?; let mut lsn = 0x10; diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 148372c9d0..a82e69e5ba 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -1366,7 +1366,7 @@ mod tests { }, timeline: harness .load() - .create_empty_timeline(TIMELINE_ID, Lsn(0)) + .create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION) .expect("Failed to create an empty timeline for dummy wal connection manager"), wal_connect_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1), diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 29c4cea882..5ac9a3ef7a 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -29,7 +29,7 @@ use crate::{ walingest::WalIngest, walrecord::DecodedWALRecord, }; -use postgres_ffi::v14::waldecoder::WalStreamDecoder; +use postgres_ffi::waldecoder::WalStreamDecoder; use utils::id::TenantTimelineId; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback}; @@ -166,7 +166,7 @@ pub async fn handle_walreceiver_connection( let physical_stream = ReplicationStream::new(copy_stream); pin!(physical_stream); - let mut waldecoder = WalStreamDecoder::new(startpoint); + let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version); let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?; diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index dbf9bf9d33..258e1a445f 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -3,12 +3,11 @@ //! use anyhow::Result; use bytes::{Buf, Bytes}; -use postgres_ffi::v14::pg_constants; -use postgres_ffi::v14::xlog_utils::XLOG_SIZE_OF_XLOG_RECORD; -use postgres_ffi::v14::XLogRecord; +use postgres_ffi::pg_constants; use postgres_ffi::BLCKSZ; use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; +use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD}; use serde::{Deserialize, Serialize}; use tracing::*; use utils::bin_ser::DeserializeError; @@ -390,6 +389,16 @@ impl XlXactParsedRecord { xid = buf.get_u32_le(); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); } + + if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 { + let nitems = buf.get_i32_le(); + debug!( + "XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}", + nitems + ); + //FIXME: do we need to handle dropped stats here? + } + XlXactParsedRecord { xid, info, @@ -517,6 +526,7 @@ impl XlMultiXactTruncate { pub fn decode_wal_record( record: Bytes, decoded: &mut DecodedWALRecord, + pg_version: u32, ) -> Result<(), DeserializeError> { let mut rnode_spcnode: u32 = 0; let mut rnode_dbnode: u32 = 0; @@ -610,9 +620,21 @@ pub fn decode_wal_record( blk.hole_offset = buf.get_u16_le(); blk.bimg_info = buf.get_u8(); - blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0; + blk.apply_image = if pg_version == 14 { + (blk.bimg_info & postgres_ffi::v14::bindings::BKPIMAGE_APPLY) != 0 + } else { + assert_eq!(pg_version, 15); + (blk.bimg_info & postgres_ffi::v15::bindings::BKPIMAGE_APPLY) != 0 + }; - if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 { + let blk_img_is_compressed = + postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version); + + if blk_img_is_compressed { + debug!("compressed block image , pg_version = {}", pg_version); + } + + if blk_img_is_compressed { if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { blk.hole_length = buf.get_u16_le(); } else { @@ -665,9 +687,7 @@ pub fn decode_wal_record( * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED * flag is set. */ - if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) - && blk.bimg_len == BLCKSZ - { + if !blk_img_is_compressed && blk.bimg_len == BLCKSZ { // TODO /* report_invalid_record(state, @@ -683,7 +703,7 @@ pub fn decode_wal_record( * IS_COMPRESSED flag is set. */ if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 - && blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0 + && !blk_img_is_compressed && blk.bimg_len != BLCKSZ { // TODO diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 79c2edc96e..15a9408dc9 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -46,11 +46,12 @@ use crate::reltag::{RelTag, SlruKind}; use crate::repository::Key; use crate::walrecord::NeonWalRecord; use crate::{config::PageServerConf, TEMP_FILE_SUFFIX}; +use postgres_ffi::pg_constants; +use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM; use postgres_ffi::v14::nonrelfile_utils::{ mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, transaction_id_set_status, }; -use postgres_ffi::v14::pg_constants; use postgres_ffi::BLCKSZ; /// @@ -82,6 +83,7 @@ pub trait WalRedoManager: Send + Sync { lsn: Lsn, base_img: Option, records: Vec<(Lsn, NeonWalRecord)>, + pg_version: u32, ) -> Result; } @@ -144,6 +146,7 @@ impl WalRedoManager for PostgresRedoManager { lsn: Lsn, base_img: Option, records: Vec<(Lsn, NeonWalRecord)>, + pg_version: u32, ) -> Result { if records.is_empty() { error!("invalid WAL redo request with no records"); @@ -166,6 +169,7 @@ impl WalRedoManager for PostgresRedoManager { img, &records[batch_start..i], self.conf.wal_redo_timeout, + pg_version, ) }; img = Some(result?); @@ -184,6 +188,7 @@ impl WalRedoManager for PostgresRedoManager { img, &records[batch_start..], self.conf.wal_redo_timeout, + pg_version, ) } } @@ -212,6 +217,7 @@ impl PostgresRedoManager { base_img: Option, records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, + pg_version: u32, ) -> Result { let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?; @@ -222,7 +228,7 @@ impl PostgresRedoManager { // launch the WAL redo process on first use if process_guard.is_none() { - let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id)?; + let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?; *process_guard = Some(p); } let process = process_guard.as_mut().unwrap(); @@ -326,7 +332,7 @@ impl PostgresRedoManager { // sanity check that this is modifying the correct relation let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?; assert!( - rel.forknum == pg_constants::VISIBILITYMAP_FORKNUM, + rel.forknum == VISIBILITYMAP_FORKNUM, "ClearVisibilityMapFlags record on unexpected rel {}", rel ); @@ -570,7 +576,11 @@ impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - fn launch(conf: &PageServerConf, tenant_id: &TenantId) -> Result { + fn launch( + conf: &PageServerConf, + tenant_id: &TenantId, + pg_version: u32, + ) -> Result { // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // just create one with constant name. That fails if you try to launch more than // one WAL redo manager concurrently. @@ -588,12 +598,12 @@ impl PostgresRedoProcess { fs::remove_dir_all(&datadir)?; } info!("running initdb in {}", datadir.display()); - let initdb = Command::new(conf.pg_bin_dir().join("initdb")) + let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb")) .args(&["-D", &datadir.to_string_lossy()]) .arg("-N") .env_clear() - .env("LD_LIBRARY_PATH", conf.pg_lib_dir()) - .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) + .env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version)) + .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version)) .close_fds() .output() .map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?; @@ -619,14 +629,14 @@ impl PostgresRedoProcess { } // Start postgres itself - let mut child = Command::new(conf.pg_bin_dir().join("postgres")) + let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres")) .arg("--wal-redo") .stdin(Stdio::piped()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) .env_clear() - .env("LD_LIBRARY_PATH", conf.pg_lib_dir()) - .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) + .env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version)) + .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version)) .env("PGDATA", &datadir) // The redo process is not trusted, so it runs in seccomp mode // (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 2456eb0752..3de410d117 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -22,7 +22,7 @@ use crate::safekeeper::{ use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; use crate::timeline::Timeline; use crate::GlobalTimelines; -use postgres_ffi::v14::xlog_utils; +use postgres_ffi::encode_logical_message; use postgres_ffi::WAL_SEGMENT_SIZE; use utils::{ lsn::Lsn, @@ -47,6 +47,7 @@ pub struct AppendLogicalMessage { epoch_start_lsn: Lsn, begin_lsn: Lsn, truncate_lsn: Lsn, + pg_version: u32, } #[derive(Serialize, Deserialize)] @@ -68,7 +69,7 @@ pub fn handle_json_ctrl( info!("JSON_CTRL request: {:?}", append_request); // need to init safekeeper state before AppendRequest - let tli = prepare_safekeeper(spg.ttid)?; + let tli = prepare_safekeeper(spg.ttid, append_request.pg_version)?; // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { @@ -95,11 +96,11 @@ pub fn handle_json_ctrl( /// Prepare safekeeper to process append requests without crashes, /// by sending ProposerGreeting with default server.wal_seg_size. -fn prepare_safekeeper(ttid: TenantTimelineId) -> Result> { +fn prepare_safekeeper(ttid: TenantTimelineId, pg_version: u32) -> Result> { GlobalTimelines::create( ttid, ServerInfo { - pg_version: 0, // unknown + pg_version, wal_seg_size: WAL_SEGMENT_SIZE as u32, system_id: 0, }, @@ -135,7 +136,7 @@ struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. fn append_logical_message(tli: &Arc, msg: &AppendLogicalMessage) -> Result { - let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message); + let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); let sk_state = tli.get_state().1; let begin_lsn = msg.begin_lsn; diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 65340ac0ed..eec24faf2f 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -27,7 +27,7 @@ use utils::{ pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_FORMAT_VERSION: u32 = 6; const SK_PROTOCOL_VERSION: u32 = 2; -const UNKNOWN_SERVER_VERSION: u32 = 0; +pub const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. pub type Term = u64; @@ -594,15 +594,20 @@ where SK_PROTOCOL_VERSION ); } - // Postgres upgrade is not treated as fatal error - if msg.pg_version != self.state.server.pg_version + /* Postgres major version mismatch is treated as fatal error + * because safekeepers parse WAL headers and the format + * may change between versions. + */ + if msg.pg_version / 10000 != self.state.server.pg_version / 10000 && self.state.server.pg_version != UNKNOWN_SERVER_VERSION { - warn!( + bail!( "incompatible server version {}, expected {}", - msg.pg_version, self.state.server.pg_version + msg.pg_version, + self.state.server.pg_version ); } + if msg.tenant_id != self.state.tenant_id { bail!( "invalid tenant ID, got {}, expected {}", @@ -634,6 +639,10 @@ where let mut state = self.state.clone(); state.server.system_id = msg.system_id; + state.server.wal_seg_size = msg.wal_seg_size; + if msg.pg_version != UNKNOWN_SERVER_VERSION { + state.server.pg_version = msg.pg_version; + } self.state.persist(&state)?; } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 5a38558e9c..2829c875ed 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -8,7 +8,7 @@ use crate::GlobalTimelines; use anyhow::{bail, Context, Result}; use bytes::Bytes; -use postgres_ffi::v14::xlog_utils::get_current_timestamp; +use postgres_ffi::get_current_timestamp; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use serde::{Deserialize, Serialize}; use std::cmp::min; diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 0d5321fb3a..c82a003161 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -11,7 +11,8 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNoOffsetToRecPtr}; +use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr; +use postgres_ffi::XLogFileName; use postgres_ffi::{XLogSegNo, PG_TLI}; use remote_storage::GenericRemoteStorage; use tokio::fs::File; diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 692bd18342..44dc313ef6 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -29,13 +29,14 @@ use utils::{id::TenantTimelineId, lsn::Lsn}; use crate::metrics::{time_io_closure, WalStorageMetrics}; use crate::safekeeper::SafeKeeperState; +use crate::safekeeper::UNKNOWN_SERVER_VERSION; use crate::wal_backup::read_object; use crate::SafeKeeperConf; -use postgres_ffi::v14::xlog_utils::XLogFileName; +use postgres_ffi::XLogFileName; use postgres_ffi::XLOG_BLCKSZ; -use postgres_ffi::v14::waldecoder::WalStreamDecoder; +use postgres_ffi::waldecoder::WalStreamDecoder; use tokio::io::{AsyncReadExt, AsyncSeekExt}; @@ -139,7 +140,7 @@ impl PhysicalStorage { write_lsn, write_record_lsn: write_lsn, flush_record_lsn: flush_lsn, - decoder: WalStreamDecoder::new(write_lsn), + decoder: WalStreamDecoder::new(write_lsn, UNKNOWN_SERVER_VERSION), file: None, }) } @@ -291,7 +292,8 @@ impl Storage for PhysicalStorage { self.decoder.available(), startpos, ); - self.decoder = WalStreamDecoder::new(startpos); + let pg_version = self.decoder.pg_version; + self.decoder = WalStreamDecoder::new(startpos, pg_version); } self.decoder.feed_bytes(buf); loop { diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1e83ee3839..c1ebc6aa7d 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -59,7 +59,7 @@ Env = Dict[str, str] Fn = TypeVar("Fn", bound=Callable[..., Any]) DEFAULT_OUTPUT_DIR = "test_output" -DEFAULT_POSTGRES_DIR = "pg_install/v14" +DEFAULT_PG_VERSION_DEFAULT = "14" DEFAULT_BRANCH_NAME = "main" BASE_PORT = 15000 @@ -71,6 +71,7 @@ base_dir = "" neon_binpath = "" pg_distrib_dir = "" top_output_dir = "" +pg_version = "" def pytest_configure(config): @@ -100,12 +101,21 @@ def pytest_configure(config): Path(top_output_dir).mkdir(exist_ok=True) # Find the postgres installation. + global pg_version + pg_version = os.environ.get("DEFAULT_PG_VERSION", DEFAULT_PG_VERSION_DEFAULT) + global pg_distrib_dir + + # TODO get rid of the POSTGRES_DISTRIB_DIR env var ? + # use DEFAULT_PG_VERSION instead to generate the path env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR") if env_postgres_bin: pg_distrib_dir = env_postgres_bin else: - pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR)) + pg_distrib_dir = os.path.normpath( + os.path.join(base_dir, "pg_install/v{}".format(pg_version)) + ) + log.info(f"pg_distrib_dir is {pg_distrib_dir}") if os.getenv("REMOTE_ENV"): # When testing against a remote server, we only need the client binary. @@ -1185,6 +1195,7 @@ class AbstractNeonCli(abc.ABC): env_vars = os.environ.copy() env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir) env_vars["POSTGRES_DISTRIB_DIR"] = str(pg_distrib_dir) + env_vars["DEFAULT_PG_VERSION"] = str(pg_version) if self.env.rust_log_override is not None: env_vars["RUST_LOG"] = self.env.rust_log_override for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items(): @@ -1251,6 +1262,8 @@ class NeonCli(AbstractNeonCli): str(tenant_id), "--timeline-id", str(timeline_id), + "--pg-version", + pg_version, ] ) else: @@ -1262,6 +1275,8 @@ class NeonCli(AbstractNeonCli): str(tenant_id), "--timeline-id", str(timeline_id), + "--pg-version", + pg_version, ] + sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), []) ) @@ -1296,6 +1311,8 @@ class NeonCli(AbstractNeonCli): new_branch_name, "--tenant-id", str(tenant_id or self.env.initial_tenant), + "--pg-version", + pg_version, ] res = self.raw_cli(cmd) @@ -1317,6 +1334,8 @@ class NeonCli(AbstractNeonCli): branch_name, "--tenant-id", str(tenant_id or self.env.initial_tenant), + "--pg-version", + pg_version, ] res = self.raw_cli(cmd) @@ -1395,6 +1414,9 @@ class NeonCli(AbstractNeonCli): cmd = ["init", f"--config={tmp.name}"] if initial_timeline_id: cmd.extend(["--timeline-id", str(initial_timeline_id)]) + + cmd.extend(["--pg-version", pg_version]) + append_pageserver_param_overrides( params_to_update=cmd, remote_storage=self.env.remote_storage, @@ -1476,6 +1498,8 @@ class NeonCli(AbstractNeonCli): str(tenant_id or self.env.initial_tenant), "--branch-name", branch_name, + "--pg-version", + pg_version, ] if lsn is not None: args.extend(["--lsn", str(lsn)]) @@ -1500,6 +1524,8 @@ class NeonCli(AbstractNeonCli): "start", "--tenant-id", str(tenant_id or self.env.initial_tenant), + "--pg-version", + pg_version, ] if lsn is not None: args.append(f"--lsn={lsn}") diff --git a/test_runner/regress/test_import.py b/test_runner/regress/test_import.py index 885a0dc26f..417595ae4d 100644 --- a/test_runner/regress/test_import.py +++ b/test_runner/regress/test_import.py @@ -14,6 +14,7 @@ from fixtures.neon_fixtures import ( PgBin, Postgres, pg_distrib_dir, + pg_version, wait_for_last_record_lsn, wait_for_upload, ) @@ -96,6 +97,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build end_lsn, "--wal-tarfile", wal, + "--pg-version", + pg_version, ] ) @@ -248,6 +251,8 @@ def _import( str(lsn), "--base-tarfile", os.path.join(tar_output_file), + "--pg-version", + pg_version, ] ) diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index aa5a65f446..4934fb9354 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -5,7 +5,13 @@ import os from pathlib import Path import pytest -from fixtures.neon_fixtures import NeonEnv, base_dir, check_restored_datadir_content, pg_distrib_dir +from fixtures.neon_fixtures import ( + NeonEnv, + base_dir, + check_restored_datadir_content, + pg_distrib_dir, + pg_version, +) # Run the main PostgreSQL regression tests, in src/test/regress. @@ -26,8 +32,8 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, cap (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_regress will need. - build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress") - src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/regress") + build_path = os.path.join(pg_distrib_dir, "../build/v{}/src/test/regress").format(pg_version) + src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(pg_version) bindir = os.path.join(pg_distrib_dir, "bin") schedule = os.path.join(src_path, "parallel_schedule") pg_regress = os.path.join(build_path, "pg_regress") @@ -80,8 +86,8 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps (runpath / "testtablespace").mkdir(parents=True) # Compute all the file locations that pg_isolation_regress will need. - build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/isolation") - src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/isolation") + build_path = os.path.join(pg_distrib_dir, "../build/v{}/src/test/isolation".format(pg_version)) + src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/isolation".format(pg_version)) bindir = os.path.join(pg_distrib_dir, "bin") schedule = os.path.join(src_path, "isolation_schedule") pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress") @@ -124,7 +130,7 @@ def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, ca # Compute all the file locations that pg_regress will need. # This test runs neon specific tests - build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress") + build_path = os.path.join(pg_distrib_dir, "../build/v{}/src/test/regress").format(pg_version) src_path = os.path.join(base_dir, "test_runner/sql_regress") bindir = os.path.join(pg_distrib_dir, "bin") schedule = os.path.join(src_path, "parallel_schedule") diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 931de0f1e3..73e26bd207 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -29,6 +29,7 @@ from fixtures.neon_fixtures import ( SafekeeperPort, available_remote_storages, neon_binpath, + pg_version, wait_for_last_record_lsn, wait_for_upload, ) @@ -634,6 +635,9 @@ class ProposerPostgres(PgProtocol): } basepath = self.pg_bin.run_capture(command, env) + + log.info(f"postgres --sync-safekeepers output: {basepath}") + stdout_filename = basepath + ".stdout" with open(stdout_filename, "r") as stdout_f: @@ -662,7 +666,9 @@ class ProposerPostgres(PgProtocol): # insert wal in all safekeepers and run sync on proposer def test_sync_safekeepers( - neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor + neon_env_builder: NeonEnvBuilder, + pg_bin: PgBin, + port_distributor: PortDistributor, ): # We don't really need the full environment for this test, just the @@ -699,6 +705,7 @@ def test_sync_safekeepers( "begin_lsn": int(begin_lsn), "epoch_start_lsn": int(epoch_start_lsn), "truncate_lsn": int(epoch_start_lsn), + "pg_version": int(pg_version) * 10000, }, ) lsn = Lsn(res["inserted_wal"]["end_lsn"]) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 19d948fd47..796770565f 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 19d948fd47f45d83367062d9a54709cf2d9c8921 +Subproject commit 796770565ff668b585e80733b8d679961ad50e93 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 5b8b3eeef5..9383aaa9c2 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 5b8b3eeef5ec34c0cad9377833906a1387841d04 +Subproject commit 9383aaa9c2616fd81cfafb058fe0d692f5e43ac3