Support pg 15

- Split postgres_ffi into two version specific files.
- Preserve pg_version in timeline metadata.
- Use pg_version in safekeeper code. Check for postgres major version mismatch.
- Clean up the code to use DEFAULT_PG_VERSION constant everywhere, instead of hardcoding.

-  Parameterize python tests: use DEFAULT_PG_VERSION env and pg_version fixture.
   To run tests using a specific PostgreSQL version, pass the DEFAULT_PG_VERSION environment variable:
   'DEFAULT_PG_VERSION='15' ./scripts/pytest test_runner/regress'
 Currently don't all tests pass, because rust code relies on the default version of PostgreSQL in a few places.
This commit is contained in:
Anastasia Lubennikova
2022-09-14 17:09:28 +03:00
parent e764c1e60f
commit 86bf491981
43 changed files with 777 additions and 292 deletions

View File

@@ -39,6 +39,8 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "14";
fn default_conf(etcd_binary_path: &Path) -> String {
format!(
r#"
@@ -105,6 +107,13 @@ fn main() -> Result<()> {
.takes_value(true)
.required(false);
let pg_version_arg = Arg::new("pg-version")
.long("pg-version")
.help("Postgres version to use for the initial tenant")
.required(false)
.takes_value(true)
.default_value(DEFAULT_PG_VERSION);
let port_arg = Arg::new("port")
.long("port")
.required(false)
@@ -146,6 +155,7 @@ fn main() -> Result<()> {
.required(false)
.value_name("config"),
)
.arg(pg_version_arg.clone())
)
.subcommand(
App::new("timeline")
@@ -164,7 +174,9 @@ fn main() -> Result<()> {
.subcommand(App::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone()))
.arg(branch_name_arg.clone())
.arg(pg_version_arg.clone())
)
.subcommand(App::new("import")
.about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone())
@@ -178,7 +190,9 @@ fn main() -> Result<()> {
.arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true)
.help("Wal to add after base"))
.arg(Arg::new("end-lsn").long("end-lsn").takes_value(true)
.help("Lsn the basebackup ends at")))
.help("Lsn the basebackup ends at"))
.arg(pg_version_arg.clone())
)
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
@@ -188,6 +202,7 @@ fn main() -> Result<()> {
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
.arg(pg_version_arg.clone())
)
.subcommand(App::new("config")
.arg(tenant_id_arg.clone())
@@ -239,8 +254,9 @@ fn main() -> Result<()> {
Arg::new("config-only")
.help("Don't do basebackup, create compute node with only config files")
.long("config-only")
.required(false)
))
.required(false))
.arg(pg_version_arg.clone())
)
.subcommand(App::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
@@ -248,7 +264,9 @@ fn main() -> Result<()> {
.arg(branch_name_arg.clone())
.arg(timeline_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone()))
.arg(port_arg.clone())
.arg(pg_version_arg.clone())
)
.subcommand(
App::new("stop")
.arg(pg_node_arg.clone())
@@ -501,9 +519,16 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
default_conf(&EtcdBroker::locate_etcd()?)
};
let pg_version = init_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let mut env =
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
env.init().context("Failed to initialize neon repository")?;
env.init(pg_version)
.context("Failed to initialize neon repository")?;
let initial_tenant_id = env
.default_tenant_id
.expect("default_tenant_id should be generated by the `env.init()` call above");
@@ -515,6 +540,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
Some(initial_tenant_id),
initial_timeline_id_arg,
&pageserver_config_overrides(init_match),
pg_version,
)
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e}");
@@ -557,8 +583,19 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
let timeline_info =
pageserver.timeline_create(new_tenant_id, new_timeline_id, None, None)?;
let pg_version = create_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
new_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info
.local
@@ -607,7 +644,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_branch_name = create_match
.value_of("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
let timeline_info = pageserver.timeline_create(tenant_id, None, None, None)?;
let pg_version = create_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info =
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info
@@ -655,7 +700,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
let pg_version = import_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -682,6 +734,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None,
start_lsn,
Some(ancestor_timeline_id),
None,
)?;
let new_timeline_id = timeline_info.timeline_id;
@@ -797,7 +850,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
Some(p) => Some(p.parse()?),
None => None,
};
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port)?;
let pg_version = sub_args
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = match sub_args.value_of("port") {
@@ -835,16 +895,23 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.map(Lsn::from_str)
.transpose()
.context("Failed to parse Lsn from the request")?;
let pg_version = sub_args
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
// when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
println!(
"Starting new postgres {} on timeline {} ...",
node_name, timeline_id
"Starting new postgres (v{}) {} on timeline {} ...",
pg_version, node_name, timeline_id
);
let node = cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?;
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?;
}
}

View File

@@ -18,7 +18,7 @@ use utils::{
postgres_backend::AuthType,
};
use crate::local_env::LocalEnv;
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::postgresql_conf::PostgresConf;
use crate::storage::PageServerNode;
@@ -81,6 +81,7 @@ impl ComputeControlPlane {
timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<PostgresNode>> {
let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode {
@@ -93,6 +94,7 @@ impl ComputeControlPlane {
lsn,
tenant_id,
uses_wal_proposer: false,
pg_version,
});
node.create_pgdata()?;
@@ -118,6 +120,7 @@ pub struct PostgresNode {
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
uses_wal_proposer: bool,
pg_version: u32,
}
impl PostgresNode {
@@ -152,6 +155,14 @@ impl PostgresNode {
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
let uses_wal_proposer = conf.get("neon.safekeepers").is_some();
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
@@ -167,17 +178,24 @@ impl PostgresNode {
lsn: recovery_target_lsn,
tenant_id,
uses_wal_proposer,
pg_version,
})
}
fn sync_safekeepers(&self, auth_token: &Option<String>) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir().join("postgres");
fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir(pg_version).join("postgres");
let mut cmd = Command::new(&pg_path);
cmd.arg("--sync-safekeepers")
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env("PGDATA", self.pgdata().to_str().unwrap())
.stdout(Stdio::piped())
// Comment this to avoid capturing stderr (useful if command hangs)
@@ -259,8 +277,8 @@ impl PostgresNode {
})
}
// Connect to a page server, get base backup, and untar it to initialize a
// new data directory
// Write postgresql.conf with default configuration
// and PG_VERSION file to the data directory of a new node.
fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
@@ -357,6 +375,9 @@ impl PostgresNode {
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
file.write_all(conf.to_string().as_bytes())?;
let mut file = File::create(self.pgdata().join("PG_VERSION"))?;
file.write_all(self.pg_version.to_string().as_bytes())?;
Ok(())
}
@@ -368,7 +389,7 @@ impl PostgresNode {
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token)?;
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
@@ -401,7 +422,7 @@ impl PostgresNode {
}
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path);
cmd.args(
[
@@ -417,8 +438,14 @@ impl PostgresNode {
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
);
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);
}

View File

@@ -20,6 +20,8 @@ use utils::{
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 14;
//
// This data structures represents neon_local CLI config
//
@@ -195,12 +197,40 @@ impl Default for SafekeeperConf {
}
impl LocalEnv {
// postgres installation paths
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let mut path = self.pg_distrib_dir.clone();
if pg_version != DEFAULT_PG_VERSION {
// step up to the parent directory
// We assume that the pg_distrib subdirs
// for different pg versions
// are located in the same directory
// and follow the naming convention: v14, v15, etc.
path.pop();
match pg_version {
14 => return path.join(format!("v{pg_version}")),
15 => return path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
};
}
path
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pageserver_bin(&self) -> anyhow::Result<PathBuf> {
@@ -290,6 +320,8 @@ impl LocalEnv {
// Find postgres binaries.
// Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install/v14".
// Note that later in the code we assume, that distrib dirs follow the same pattern
// for all postgres versions.
if env.pg_distrib_dir == Path::new("") {
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
env.pg_distrib_dir = postgres_bin.into();
@@ -384,7 +416,7 @@ impl LocalEnv {
//
// Initialize a new Neon repository
//
pub fn init(&mut self) -> anyhow::Result<()> {
pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
// check if config already exists
let base_path = &self.base_data_dir;
ensure!(
@@ -397,10 +429,10 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if !self.pg_distrib_dir.join("bin/postgres").exists() {
if !self.pg_bin_dir(pg_version).join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_distrib_dir.display()
self.pg_bin_dir(pg_version).display()
);
}
for binary in ["pageserver", "safekeeper"] {

View File

@@ -112,11 +112,15 @@ impl PageServerNode {
create_tenant: Option<TenantId>,
initial_timeline_id: Option<TimelineId>,
config_overrides: &[&str],
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let pg_distrib_dir_param = format!(
"pg_distrib_dir='{}'",
self.env.pg_distrib_dir(pg_version).display()
);
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
let listen_http_addr_param = format!(
"listen_http_addr='{}'",
@@ -159,7 +163,7 @@ impl PageServerNode {
self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?;
let init_result = self
.try_init_timeline(create_tenant, initial_timeline_id)
.try_init_timeline(create_tenant, initial_timeline_id, pg_version)
.context("Failed to create initial tenant and timeline for pageserver");
match &init_result {
Ok(initial_timeline_id) => {
@@ -175,10 +179,16 @@ impl PageServerNode {
&self,
new_tenant_id: Option<TenantId>,
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info =
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
Ok(initial_timeline_info.timeline_id)
}
@@ -497,6 +507,7 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
) -> anyhow::Result<TimelineInfo> {
self.http_request(
Method::POST,
@@ -506,6 +517,7 @@ impl PageServerNode {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
pg_version,
})
.send()?
.error_from_body()?

View File

@@ -7,6 +7,8 @@
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
macro_rules! postgres_ffi {
@@ -24,11 +26,11 @@ macro_rules! postgres_ffi {
stringify!($version),
".rs"
));
include!(concat!("pg_constants_", stringify!($version), ".rs"));
}
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod waldecoder;
pub mod xlog_utils;
@@ -44,6 +46,9 @@ macro_rules! postgres_ffi {
postgres_ffi!(v14);
postgres_ffi!(v15);
pub mod pg_constants;
pub mod relfile_utils;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
@@ -52,8 +57,11 @@ pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
@@ -63,6 +71,50 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
if version == 14 {
bimg_info & v14::bindings::BKPIMAGE_IS_COMPRESSED != 0
} else {
assert_eq!(version, 15);
bimg_info & v15::bindings::BKPIMAGE_COMPRESS_PGLZ != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_LZ4 != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_ZSTD != 0
}
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
) -> Result<Bytes, SerializeError> {
match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
_ => Err(SerializeError::BadInput),
}
}
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64)> {
match pg_version {
14 => v14::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
15 => v15::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
_ => anyhow::bail!("Unknown version {}", pg_version),
}
}
// PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
//
// NOTE: this is not to be confused with Neon timelines; different concept!
@@ -74,7 +126,7 @@ pub const PG_TLI: u32 = 1;
// See TransactionIdIsNormal in transam.h
pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID
id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
}
// See TransactionIdPrecedes in transam.c
@@ -109,3 +161,74 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
}
pub mod waldecoder {
use crate::{v14, v15};
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;
pub enum State {
WaitingForRecord,
ReassemblingRecord {
recordbuf: BytesMut,
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
pub lsn: Lsn,
pub pg_version: u32,
pub inputbuf: BytesMut,
pub state: State,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
pub msg: String,
pub lsn: Lsn,
}
impl WalStreamDecoder {
pub fn new(lsn: Lsn, pg_version: u32) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
pg_version,
inputbuf: BytesMut::new(),
state: State::WaitingForRecord,
}
}
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
match self.pg_version {
14 => {
use self::v14::waldecoder::WalStreamDecoderHandler;
self.poll_decode_internal()
}
15 => {
use self::v15::waldecoder::WalStreamDecoderHandler;
self.poll_decode_internal()
}
_ => Err(WalDecodeError {
msg: format!("Unknown version {}", self.pg_version),
lsn: self.lsn,
}),
}
}
}
}

View File

@@ -1,7 +1,7 @@
//!
//! Common utilities for dealing with PostgreSQL non-relation files.
//!
use super::pg_constants;
use crate::pg_constants;
use crate::transaction_id_precedes;
use bytes::BytesMut;
use log::*;

View File

@@ -1,14 +1,16 @@
//!
//! Misc constants, copied from PostgreSQL headers.
//!
//! Only place version-independent constants here.
//!
//! TODO: These probably should be auto-generated using bindgen,
//! rather than copied by hand. Although on the other hand, it's nice
//! to have them all here in one place, and have the ability to add
//! comments on them.
//!
use super::bindings::{PageHeaderData, XLogRecord};
use crate::BLCKSZ;
use crate::{PageHeaderData, XLogRecord};
//
// From pg_tablespace_d.h
@@ -16,14 +18,6 @@ use crate::BLCKSZ;
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
// From storage_xlog.h
pub const XLOG_SMGR_CREATE: u8 = 0x10;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
@@ -114,7 +108,6 @@ pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0;
pub const DB_SHUTDOWNED: u32 = 1;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
@@ -169,10 +162,6 @@ pub const RM_HEAP_ID: u8 = 10;
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
// from dbcommands_xlog.h
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10;
@@ -197,8 +186,6 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous
/* Information stored in bimg_info */
pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
/* From transam.h */
pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3;

View File

@@ -0,0 +1,5 @@
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */

View File

@@ -0,0 +1,10 @@
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;
pub const XLOG_DBASE_CREATE_WAL_LOG: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x20;
pub const BKPIMAGE_APPLY: u8 = 0x02; /* page image should be restored during replay */
pub const BKPIMAGE_COMPRESS_PGLZ: u8 = 0x04; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_LZ4: u8 = 0x08; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */

View File

@@ -1,10 +1,17 @@
//!
//! Common utilities for dealing with PostgreSQL relation files.
//!
use super::pg_constants;
use once_cell::sync::OnceCell;
use regex::Regex;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
@@ -23,10 +30,10 @@ impl From<core::num::ParseIntError> for FilePathError {
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(pg_constants::MAIN_FORKNUM),
Some("fsm") => Ok(pg_constants::FSM_FORKNUM),
Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM),
Some("init") => Ok(pg_constants::INIT_FORKNUM),
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
@@ -34,10 +41,10 @@ pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
pg_constants::MAIN_FORKNUM => None,
pg_constants::FSM_FORKNUM => Some("fsm"),
pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"),
pg_constants::INIT_FORKNUM => Some("init"),
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}

View File

@@ -8,6 +8,7 @@
//! to look deeper into the WAL records to also understand which blocks they modify, the code
//! for that is in pageserver/src/walrecord.rs
//!
use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
use super::xlog_utils::*;
use crate::WAL_SEGMENT_SIZE;
@@ -16,55 +17,19 @@ use crc32c::*;
use log::*;
use std::cmp::min;
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;
enum State {
WaitingForRecord,
ReassemblingRecord {
recordbuf: BytesMut,
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
lsn: Lsn,
inputbuf: BytesMut,
state: State,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
lsn: Lsn,
pub trait WalStreamDecoderHandler {
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
}
//
// WalRecordStream is a Stream that returns a stream of WAL records
// FIXME: This isn't a proper rust stream
//
impl WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
inputbuf: BytesMut::new(),
state: State::WaitingForRecord,
}
}
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
impl WalStreamDecoderHandler for WalStreamDecoder {
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
let validate_impl = || {
if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
@@ -125,7 +90,7 @@ impl WalStreamDecoder {
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
/// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
///
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
// Run state machine that validates page headers, and reassembles records
// that cross page boundaries.
loop {

View File

@@ -9,12 +9,13 @@
use crc32c::crc32c_append;
use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData,
XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
};
use super::pg_constants;
use super::waldecoder::WalStreamDecoder;
use super::PG_MAJORVERSION;
use crate::pg_constants;
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
@@ -113,6 +114,30 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
}
}
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;
Ok((pg_control.encode(), pg_control.system_identifier))
}
pub fn get_current_timestamp() -> TimestampTz {
to_pg_timestamp(SystemTime::now())
}
@@ -144,7 +169,10 @@ pub fn find_end_of_wal(
let mut result = start_lsn;
let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ];
let mut decoder = WalStreamDecoder::new(start_lsn);
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
info!("find_end_of_wal PG_VERSION: {}", pg_version);
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
// loop over segments
loop {

View File

@@ -25,10 +25,10 @@ use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName};
use postgres_ffi::v14::{CheckPoint, ControlFileData};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
@@ -129,15 +129,15 @@ where
// TODO include checksum
// Create pgdata subdirs structure
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(*dir)?;
self.ar.append(&header, &mut io::empty())?;
}
// Send empty config files.
for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() {
for filepath in PGDATA_SPECIAL_FILES.iter() {
if *filepath == "pg_hba.conf" {
let data = pg_constants::PG_HBA.as_bytes();
let data = PG_HBA.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data)?;
} else {
@@ -267,16 +267,12 @@ where
None
};
// TODO pass this as a parameter
let pg_version = "14";
if spcnode == GLOBALTABLESPACE_OID {
let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
if spcnode == pg_constants::GLOBALTABLESPACE_OID {
let version_bytes = pg_version.as_bytes();
let header = new_tar_header("PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
let header = new_tar_header("global/PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
info!("timeline.pg_version {}", self.timeline.pg_version);
if let Some(img) = relmap_img {
// filenode map for global tablespace
@@ -305,7 +301,7 @@ where
return Ok(());
}
// User defined tablespaces are not supported
ensure!(spcnode == pg_constants::DEFAULTTABLESPACE_OID);
ensure!(spcnode == DEFAULTTABLESPACE_OID);
// Append dir path for each database
let path = format!("base/{}", dbnode);
@@ -314,9 +310,10 @@ where
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
let version_bytes = pg_version.as_bytes();
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?;
@@ -348,30 +345,6 @@ where
// Also send zenith.signal file with extra bootstrap data.
//
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.context("failed get control bytes")?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(self.lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
@@ -388,8 +361,23 @@ where
zenith_signal.as_bytes(),
)?;
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
//send pg_control
let pg_control_bytes = pg_control.encode();
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?;
@@ -398,8 +386,10 @@ where
let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..])?;
Ok(())

View File

@@ -50,6 +50,7 @@ fn main() -> Result<()> {
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
@@ -62,6 +63,7 @@ fn main() -> Result<()> {
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}

View File

@@ -21,6 +21,7 @@ use utils::{
use crate::tenant::TIMELINES_SEGMENT_NAME;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::DEFAULT_PG_VERSION;
/// The name of the metadata file pageserver creates per timeline.
pub const METADATA_FILE_NAME: &str = "metadata";
@@ -209,7 +210,7 @@ impl Default for PageServerConfigBuilder {
workdir: Set(PathBuf::new()),
pg_distrib_dir: Set(env::current_dir()
.expect("cannot access current directory")
.join("pg_install/v14")),
.join(format!("pg_install/v{}", DEFAULT_PG_VERSION))),
auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None),
@@ -374,13 +375,40 @@ impl PageServerConf {
//
// Postgres distribution paths
//
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let mut path = self.pg_distrib_dir.clone();
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
if pg_version != DEFAULT_PG_VERSION {
// step up to the parent directory
// We assume that the pg_distrib subdirs
// for different pg versions
// are located in the same directory
// and follow the naming convention: v14, v15, etc.
path.pop();
match pg_version {
14 => return path.join(format!("v{pg_version}")),
15 => return path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
};
}
path
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
/// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
@@ -449,10 +477,11 @@ impl PageServerConf {
);
}
if !conf.pg_distrib_dir.join("bin/postgres").exists() {
let pg_version = DEFAULT_PG_VERSION;
if !conf.pg_bin_dir(pg_version).join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
conf.pg_distrib_dir.display()
conf.pg_bin_dir(pg_version).display()
);
}
@@ -863,7 +892,7 @@ broker_endpoints = ['{broker_endpoint}']
let workdir = tempdir_path.join("workdir");
fs::create_dir_all(&workdir)?;
let pg_distrib_dir = tempdir_path.join("pg_distrib");
let pg_distrib_dir = tempdir_path.join(format!("pg_distrib/v{DEFAULT_PG_VERSION}"));
fs::create_dir_all(&pg_distrib_dir)?;
let postgres_bin_dir = pg_distrib_dir.join("bin");
fs::create_dir_all(&postgres_bin_dir)?;

View File

@@ -21,6 +21,7 @@ pub struct TimelineCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[serde_as]

View File

@@ -173,6 +173,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.new_timeline_id.map(TimelineId::from),
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version
).await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.

View File

@@ -16,11 +16,13 @@ use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::v14::relfile_utils::*;
use postgres_ffi::v14::waldecoder::*;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::ControlFileData;
use postgres_ffi::DBState_DB_SHUTDOWNED;
use postgres_ffi::Oid;
use postgres_ffi::XLogFileName;
use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
@@ -236,7 +238,7 @@ fn import_slru<Reader: Read>(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
@@ -354,7 +356,7 @@ pub fn import_wal_from_tar<Reader: Read>(
end_lsn: Lsn,
) -> Result<()> {
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn);
let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
@@ -439,7 +441,7 @@ fn import_file<Reader: Read>(
len: usize,
) -> Result<Option<ControlFileData>> {
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path
@@ -467,7 +469,7 @@ fn import_file<Reader: Read>(
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
@@ -495,7 +497,7 @@ fn import_file<Reader: Read>(
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;

View File

@@ -36,6 +36,8 @@ use crate::task_mgr::TaskKind;
/// format, bump this!
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 14;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;

View File

@@ -43,9 +43,9 @@ use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::CheckpointConfig;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
@@ -498,12 +498,16 @@ impl PageServerHandler {
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<()> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let timeline = tenant_mgr::get_tenant(tenant_id, true)?
.create_empty_timeline(timeline_id, base_lsn)?;
let timeline = tenant_mgr::get_tenant(tenant_id, true)?.create_empty_timeline(
timeline_id,
base_lsn,
pg_version,
)?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -958,16 +962,31 @@ impl postgres_backend_async::Handler for PageServerHandler {
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN"
let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
ensure!(params.len() >= 4);
let tenant_id = TenantId::from_str(params[0])?;
let timeline_id = TimelineId::from_str(params[1])?;
let base_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?;
let pg_version = if params.len() == 5 {
u32::from_str(params[4])?
} else {
// If version is not provided, assume default.
// TODO: this may lead to weird errors if the version is wrong.
crate::DEFAULT_PG_VERSION
};
self.check_permission(Some(tenant_id))?;
match self
.handle_import_basebackup(pgb, tenant_id, timeline_id, base_lsn, end_lsn)
.handle_import_basebackup(
pgb,
tenant_id,
timeline_id,
base_lsn,
end_lsn,
pg_version,
)
.await
{
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,

View File

@@ -13,7 +13,7 @@ use crate::tenant::Timeline;
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
@@ -125,8 +125,7 @@ impl Timeline {
return Ok(nblocks);
}
if (tag.forknum == pg_constants::FSM_FORKNUM
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, lsn, latest)?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
@@ -1090,6 +1089,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// 03 misc
// controlfile
// checkpoint
// pg_version
//
// Below is a full list of the keyspace allocation:
//
@@ -1128,7 +1128,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
//
// Checkpoint:
// 03 00000000 00000000 00000000 00 00000001
//-- Section 01: relation data and metadata
const DBDIR_KEY: Key = Key {
@@ -1402,8 +1401,9 @@ fn is_slru_block_key(key: Key) -> bool {
pub fn create_test_timeline(
tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId,
pg_version: u32,
) -> Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = tenant.create_empty_timeline(timeline_id, Lsn(8), pg_version)?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;

View File

@@ -2,8 +2,8 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::relfile_utils::forknumber_to_name;
use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::Oid;
///
@@ -78,7 +78,7 @@ impl fmt::Display for RelTag {
impl RelTag {
pub fn to_segfile_name(&self, segno: u32) -> String {
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID {
let mut name = if self.spcnode == GLOBALTABLESPACE_OID {
"global/".to_string()
} else {
format!("base/{}/", self.dbnode)

View File

@@ -1445,7 +1445,17 @@ mod test_utils {
}
pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0))
TimelineMetadata::new(
disk_consistent_lsn,
None,
None,
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do
// but it should be consistent with the one in the tests
crate::DEFAULT_PG_VERSION,
)
}
}

View File

@@ -341,13 +341,21 @@ mod tests {
use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use crate::DEFAULT_PG_VERSION;
#[test]
fn index_part_conversion() {
let harness = TenantHarness::create("index_part_conversion").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
let metadata = TimelineMetadata::new(
Lsn(5).align(),
Some(Lsn(4)),
None,
Lsn(3),
Lsn(2),
Lsn(1),
DEFAULT_PG_VERSION,
);
let remote_timeline = RemoteTimeline {
timeline_layers: HashSet::from([
timeline_path.join("layer_1"),
@@ -464,8 +472,15 @@ mod tests {
fn index_part_conversion_negatives() {
let harness = TenantHarness::create("index_part_conversion_negatives").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
let metadata = TimelineMetadata::new(
Lsn(5).align(),
Some(Lsn(4)),
None,
Lsn(3),
Lsn(2),
Lsn(1),
DEFAULT_PG_VERSION,
);
let conversion_result = IndexPart::from_remote_timeline(
&timeline_path,

View File

@@ -171,6 +171,7 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
) -> Result<Arc<Timeline>> {
// XXX: keep the lock to avoid races during timeline creation
let mut timelines = self.timelines.lock().unwrap();
@@ -186,7 +187,7 @@ impl Tenant {
}
let new_metadata =
TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn, pg_version,);
let new_timeline =
self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?;
new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
@@ -387,6 +388,11 @@ impl Tenant {
let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {} pg_version {}",
timeline_id,
metadata.pg_version()
);
let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
@@ -613,7 +619,7 @@ impl Tenant {
};
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
let new_timeline = Arc::new(Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
@@ -623,6 +629,7 @@ impl Tenant {
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
pg_version,
));
new_timeline
@@ -984,6 +991,7 @@ impl Tenant {
start_lsn,
*src_timeline.latest_gc_cutoff_lsn.read(),
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
let new_timeline = self.create_initialized_timeline(dst, metadata, &mut timelines)?;
info!("branched timeline {dst} from {src} at {start_lsn}");
@@ -1319,6 +1327,7 @@ pub mod harness {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
@@ -1345,6 +1354,7 @@ mod tests {
use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::DEFAULT_PG_VERSION;
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
@@ -1356,7 +1366,7 @@ mod tests {
#[test]
fn test_basic() -> Result<()> {
let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1378,9 +1388,9 @@ mod tests {
#[test]
fn no_duplicate_timelines() -> Result<()> {
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -1404,7 +1414,7 @@ mod tests {
#[test]
fn test_branch() -> Result<()> {
let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer();
use std::str::from_utf8;
@@ -1499,7 +1509,7 @@ mod tests {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -1529,7 +1539,7 @@ mod tests {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
Ok(_) => panic!("branching should have failed"),
@@ -1555,7 +1565,7 @@ mod tests {
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
@@ -1573,7 +1583,7 @@ mod tests {
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -1590,7 +1600,7 @@ mod tests {
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -1618,7 +1628,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
{
let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
@@ -1638,7 +1649,7 @@ mod tests {
// create two timelines
{
let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?;
@@ -1674,7 +1685,7 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load();
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -1711,7 +1722,7 @@ mod tests {
#[test]
fn test_images() -> Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1761,7 +1772,7 @@ mod tests {
#[test]
fn test_bulk_insert() -> Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let mut lsn = Lsn(0x10);
@@ -1801,7 +1812,7 @@ mod tests {
#[test]
fn test_random_updates() -> Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 1000;
@@ -1871,7 +1882,7 @@ mod tests {
#[test]
fn test_traverse_branches() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 1000;
@@ -1950,7 +1961,7 @@ mod tests {
#[test]
fn test_traverse_ancestors() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;

View File

@@ -63,6 +63,7 @@ struct TimelineMetadataBody {
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
}
impl TimelineMetadata {
@@ -73,6 +74,7 @@ impl TimelineMetadata {
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
) -> Self {
Self {
hdr: TimelineMetadataHeader {
@@ -87,6 +89,7 @@ impl TimelineMetadata {
ancestor_lsn,
latest_gc_cutoff_lsn,
initdb_lsn,
pg_version,
},
}
}
@@ -160,6 +163,10 @@ impl TimelineMetadata {
pub fn initdb_lsn(&self) -> Lsn {
self.body.initdb_lsn
}
pub fn pg_version(&self) -> u32 {
self.body.pg_version
}
}
/// Save timeline metadata to file
@@ -212,6 +219,8 @@ mod tests {
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let metadata_bytes = original_metadata

View File

@@ -37,7 +37,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::to_pg_timestamp;
use utils::{
id::{TenantId, TimelineId},
lsn::{AtomicLsn, Lsn, RecordLsn},
@@ -61,6 +61,8 @@ pub struct Timeline {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pg_version: u32,
pub layers: RwLock<LayerMap>,
last_freeze_at: AtomicLsn,
@@ -533,6 +535,7 @@ impl Timeline {
tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
pg_version: u32,
) -> Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -541,6 +544,7 @@ impl Timeline {
tenant_conf,
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
@@ -1260,6 +1264,7 @@ impl Timeline {
self.ancestor_lsn,
*self.latest_gc_cutoff_lsn.read(),
self.initdb_lsn,
self.pg_version,
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -2133,9 +2138,13 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
let img =
self.walredo_mgr
.request_redo(key, request_lsn, base_img, data.records)?;
let img = self.walredo_mgr.request_redo(
key,
request_lsn,
base_img,
data.records,
self.pg_version,
)?;
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();

View File

@@ -34,8 +34,9 @@ use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walrecord::*;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
@@ -82,7 +83,8 @@ impl<'a> WalIngest<'a> {
decoded: &mut DecodedWALRecord,
) -> Result<()> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
decode_wal_record(recdata, decoded, self.timeline.pg_version)
.context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -113,18 +115,49 @@ impl<'a> WalIngest<'a> {
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
debug!(
"handle RM_DBASE_ID for Postgres version {:?}",
self.timeline.pg_version
);
if self.timeline.pg_version == 14 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
debug!("XLOG_DBASE_CREATE v14");
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
} else if self.timeline.pg_version == 15 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
}
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
@@ -291,7 +324,7 @@ impl<'a> WalIngest<'a> {
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
@@ -392,7 +425,7 @@ impl<'a> WalIngest<'a> {
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
forknum: VISIBILITYMAP_FORKNUM,
spcnode: decoded.blocks[0].rnode_spcnode,
dbnode: decoded.blocks[0].rnode_dbnode,
relnode: decoded.blocks[0].rnode_relnode,
@@ -568,7 +601,7 @@ impl<'a> WalIngest<'a> {
spcnode,
dbnode,
relnode,
forknum: pg_constants::MAIN_FORKNUM,
forknum: MAIN_FORKNUM,
};
self.put_rel_truncation(modification, rel, rec.blkno)?;
}
@@ -577,7 +610,7 @@ impl<'a> WalIngest<'a> {
spcnode,
dbnode,
relnode,
forknum: pg_constants::FSM_FORKNUM,
forknum: FSM_FORKNUM,
};
// FIXME: 'blkno' stored in the WAL record is the new size of the
@@ -600,7 +633,7 @@ impl<'a> WalIngest<'a> {
spcnode,
dbnode,
relnode,
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
forknum: VISIBILITYMAP_FORKNUM,
};
// FIXME: Like with the FSM above, the logic to truncate the VM
@@ -672,7 +705,7 @@ impl<'a> WalIngest<'a> {
)?;
for xnode in &parsed.xnodes {
for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM {
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
let rel = RelTag {
forknum,
spcnode: xnode.spcnode,
@@ -1032,6 +1065,8 @@ mod tests {
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
use postgres_ffi::RELSEG_SIZE;
use crate::DEFAULT_PG_VERSION;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag {
spcnode: 0,
@@ -1059,7 +1094,7 @@ mod tests {
#[test]
fn test_relsize() -> Result<()> {
let tenant = TenantHarness::create("test_relsize")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1187,7 +1222,7 @@ mod tests {
#[test]
fn test_drop_extend() -> Result<()> {
let tenant = TenantHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1227,7 +1262,7 @@ mod tests {
#[test]
fn test_truncate_extend() -> Result<()> {
let tenant = TenantHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1315,7 +1350,7 @@ mod tests {
#[test]
fn test_large_rel() -> Result<()> {
let tenant = TenantHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut lsn = 0x10;

View File

@@ -1366,7 +1366,7 @@ mod tests {
},
timeline: harness
.load()
.create_empty_timeline(TIMELINE_ID, Lsn(0))
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION)
.expect("Failed to create an empty timeline for dummy wal connection manager"),
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),

View File

@@ -29,7 +29,7 @@ use crate::{
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::id::TenantTimelineId;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback};
@@ -166,7 +166,7 @@ pub async fn handle_walreceiver_connection(
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;

View File

@@ -3,12 +3,11 @@
//!
use anyhow::Result;
use bytes::{Buf, Bytes};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::XLOG_SIZE_OF_XLOG_RECORD;
use postgres_ffi::v14::XLogRecord;
use postgres_ffi::pg_constants;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz};
use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::bin_ser::DeserializeError;
@@ -390,6 +389,16 @@ impl XlXactParsedRecord {
xid = buf.get_u32_le();
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
}
if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
let nitems = buf.get_i32_le();
debug!(
"XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
nitems
);
//FIXME: do we need to handle dropped stats here?
}
XlXactParsedRecord {
xid,
info,
@@ -517,6 +526,7 @@ impl XlMultiXactTruncate {
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
pg_version: u32,
) -> Result<(), DeserializeError> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
@@ -610,9 +620,21 @@ pub fn decode_wal_record(
blk.hole_offset = buf.get_u16_le();
blk.bimg_info = buf.get_u8();
blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0;
blk.apply_image = if pg_version == 14 {
(blk.bimg_info & postgres_ffi::v14::bindings::BKPIMAGE_APPLY) != 0
} else {
assert_eq!(pg_version, 15);
(blk.bimg_info & postgres_ffi::v15::bindings::BKPIMAGE_APPLY) != 0
};
if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 {
let blk_img_is_compressed =
postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version);
if blk_img_is_compressed {
debug!("compressed block image , pg_version = {}", pg_version);
}
if blk_img_is_compressed {
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
blk.hole_length = buf.get_u16_le();
} else {
@@ -665,9 +687,7 @@ pub fn decode_wal_record(
* cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
* flag is set.
*/
if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0)
&& blk.bimg_len == BLCKSZ
{
if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
// TODO
/*
report_invalid_record(state,
@@ -683,7 +703,7 @@ pub fn decode_wal_record(
* IS_COMPRESSED flag is set.
*/
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0
&& !blk_img_is_compressed
&& blk.bimg_len != BLCKSZ
{
// TODO

View File

@@ -46,11 +46,12 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status,
};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::BLCKSZ;
///
@@ -82,6 +83,7 @@ pub trait WalRedoManager: Send + Sync {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError>;
}
@@ -144,6 +146,7 @@ impl WalRedoManager for PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
if records.is_empty() {
error!("invalid WAL redo request with no records");
@@ -166,6 +169,7 @@ impl WalRedoManager for PostgresRedoManager {
img,
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
)
};
img = Some(result?);
@@ -184,6 +188,7 @@ impl WalRedoManager for PostgresRedoManager {
img,
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
)
}
}
@@ -212,6 +217,7 @@ impl PostgresRedoManager {
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
@@ -222,7 +228,7 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id)?;
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
@@ -326,7 +332,7 @@ impl PostgresRedoManager {
// sanity check that this is modifying the correct relation
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert!(
rel.forknum == pg_constants::VISIBILITYMAP_FORKNUM,
rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}",
rel
);
@@ -570,7 +576,11 @@ impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
fn launch(conf: &PageServerConf, tenant_id: &TenantId) -> Result<PostgresRedoProcess, Error> {
fn launch(
conf: &PageServerConf,
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
@@ -588,12 +598,12 @@ impl PostgresRedoProcess {
fs::remove_dir_all(&datadir)?;
}
info!("running initdb in {}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.close_fds()
.output()
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
@@ -619,14 +629,14 @@ impl PostgresRedoProcess {
}
// Start postgres itself
let mut child = Command::new(conf.pg_bin_dir().join("postgres"))
let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres"))
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("PGDATA", &datadir)
// The redo process is not trusted, so it runs in seccomp mode
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't

View File

@@ -22,7 +22,7 @@ use crate::safekeeper::{
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_ffi::v14::xlog_utils;
use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{
lsn::Lsn,
@@ -47,6 +47,7 @@ pub struct AppendLogicalMessage {
epoch_start_lsn: Lsn,
begin_lsn: Lsn,
truncate_lsn: Lsn,
pg_version: u32,
}
#[derive(Serialize, Deserialize)]
@@ -68,7 +69,7 @@ pub fn handle_json_ctrl(
info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest
let tli = prepare_safekeeper(spg.ttid)?;
let tli = prepare_safekeeper(spg.ttid, append_request.pg_version)?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
@@ -95,11 +96,11 @@ pub fn handle_json_ctrl(
/// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
fn prepare_safekeeper(ttid: TenantTimelineId, pg_version: u32) -> Result<Arc<Timeline>> {
GlobalTimelines::create(
ttid,
ServerInfo {
pg_version: 0, // unknown
pg_version,
wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0,
},
@@ -135,7 +136,7 @@ struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(tli: &Arc<Timeline>, msg: &AppendLogicalMessage) -> Result<InsertedWAL> {
let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = tli.get_state().1;
let begin_lsn = msg.begin_lsn;

View File

@@ -27,7 +27,7 @@ use utils::{
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 6;
const SK_PROTOCOL_VERSION: u32 = 2;
const UNKNOWN_SERVER_VERSION: u32 = 0;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
pub type Term = u64;
@@ -594,15 +594,20 @@ where
SK_PROTOCOL_VERSION
);
}
// Postgres upgrade is not treated as fatal error
if msg.pg_version != self.state.server.pg_version
/* Postgres major version mismatch is treated as fatal error
* because safekeepers parse WAL headers and the format
* may change between versions.
*/
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
warn!(
bail!(
"incompatible server version {}, expected {}",
msg.pg_version, self.state.server.pg_version
msg.pg_version,
self.state.server.pg_version
);
}
if msg.tenant_id != self.state.tenant_id {
bail!(
"invalid tenant ID, got {}, expected {}",
@@ -634,6 +639,10 @@ where
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}
self.state.persist(&state)?;
}

View File

@@ -8,7 +8,7 @@ use crate::GlobalTimelines;
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use postgres_ffi::v14::xlog_utils::get_current_timestamp;
use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::min;

View File

@@ -11,7 +11,8 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNoOffsetToRecPtr};
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::GenericRemoteStorage;
use tokio::fs::File;

View File

@@ -29,13 +29,14 @@ use utils::{id::TenantTimelineId, lsn::Lsn};
use crate::metrics::{time_io_closure, WalStorageMetrics};
use crate::safekeeper::SafeKeeperState;
use crate::safekeeper::UNKNOWN_SERVER_VERSION;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
use postgres_ffi::v14::xlog_utils::XLogFileName;
use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
use postgres_ffi::waldecoder::WalStreamDecoder;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
@@ -139,7 +140,7 @@ impl PhysicalStorage {
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn),
decoder: WalStreamDecoder::new(write_lsn, UNKNOWN_SERVER_VERSION),
file: None,
})
}
@@ -291,7 +292,8 @@ impl Storage for PhysicalStorage {
self.decoder.available(),
startpos,
);
self.decoder = WalStreamDecoder::new(startpos);
let pg_version = self.decoder.pg_version;
self.decoder = WalStreamDecoder::new(startpos, pg_version);
}
self.decoder.feed_bytes(buf);
loop {

View File

@@ -59,7 +59,7 @@ Env = Dict[str, str]
Fn = TypeVar("Fn", bound=Callable[..., Any])
DEFAULT_OUTPUT_DIR = "test_output"
DEFAULT_POSTGRES_DIR = "pg_install/v14"
DEFAULT_PG_VERSION_DEFAULT = "14"
DEFAULT_BRANCH_NAME = "main"
BASE_PORT = 15000
@@ -71,6 +71,7 @@ base_dir = ""
neon_binpath = ""
pg_distrib_dir = ""
top_output_dir = ""
pg_version = ""
def pytest_configure(config):
@@ -100,12 +101,21 @@ def pytest_configure(config):
Path(top_output_dir).mkdir(exist_ok=True)
# Find the postgres installation.
global pg_version
pg_version = os.environ.get("DEFAULT_PG_VERSION", DEFAULT_PG_VERSION_DEFAULT)
global pg_distrib_dir
# TODO get rid of the POSTGRES_DISTRIB_DIR env var ?
# use DEFAULT_PG_VERSION instead to generate the path
env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR")
if env_postgres_bin:
pg_distrib_dir = env_postgres_bin
else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR))
pg_distrib_dir = os.path.normpath(
os.path.join(base_dir, "pg_install/v{}".format(pg_version))
)
log.info(f"pg_distrib_dir is {pg_distrib_dir}")
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
@@ -1185,6 +1195,7 @@ class AbstractNeonCli(abc.ABC):
env_vars = os.environ.copy()
env_vars["NEON_REPO_DIR"] = str(self.env.repo_dir)
env_vars["POSTGRES_DISTRIB_DIR"] = str(pg_distrib_dir)
env_vars["DEFAULT_PG_VERSION"] = str(pg_version)
if self.env.rust_log_override is not None:
env_vars["RUST_LOG"] = self.env.rust_log_override
for (extra_env_key, extra_env_value) in (extra_env_vars or {}).items():
@@ -1251,6 +1262,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--pg-version",
pg_version,
]
)
else:
@@ -1262,6 +1275,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--pg-version",
pg_version,
]
+ sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), [])
)
@@ -1296,6 +1311,8 @@ class NeonCli(AbstractNeonCli):
new_branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
pg_version,
]
res = self.raw_cli(cmd)
@@ -1317,6 +1334,8 @@ class NeonCli(AbstractNeonCli):
branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
pg_version,
]
res = self.raw_cli(cmd)
@@ -1395,6 +1414,9 @@ class NeonCli(AbstractNeonCli):
cmd = ["init", f"--config={tmp.name}"]
if initial_timeline_id:
cmd.extend(["--timeline-id", str(initial_timeline_id)])
cmd.extend(["--pg-version", pg_version])
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=self.env.remote_storage,
@@ -1476,6 +1498,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id or self.env.initial_tenant),
"--branch-name",
branch_name,
"--pg-version",
pg_version,
]
if lsn is not None:
args.extend(["--lsn", str(lsn)])
@@ -1500,6 +1524,8 @@ class NeonCli(AbstractNeonCli):
"start",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
pg_version,
]
if lsn is not None:
args.append(f"--lsn={lsn}")

View File

@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
PgBin,
Postgres,
pg_distrib_dir,
pg_version,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -96,6 +97,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
end_lsn,
"--wal-tarfile",
wal,
"--pg-version",
pg_version,
]
)
@@ -248,6 +251,8 @@ def _import(
str(lsn),
"--base-tarfile",
os.path.join(tar_output_file),
"--pg-version",
pg_version,
]
)

View File

@@ -5,7 +5,13 @@ import os
from pathlib import Path
import pytest
from fixtures.neon_fixtures import NeonEnv, base_dir, check_restored_datadir_content, pg_distrib_dir
from fixtures.neon_fixtures import (
NeonEnv,
base_dir,
check_restored_datadir_content,
pg_distrib_dir,
pg_version,
)
# Run the main PostgreSQL regression tests, in src/test/regress.
@@ -26,8 +32,8 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, cap
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress")
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/regress")
build_path = os.path.join(pg_distrib_dir, "../build/v{}/src/test/regress").format(pg_version)
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(pg_version)
bindir = os.path.join(pg_distrib_dir, "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
@@ -80,8 +86,8 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/isolation")
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/isolation")
build_path = os.path.join(pg_distrib_dir, "../build/v{}/src/test/isolation".format(pg_version))
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/isolation".format(pg_version))
bindir = os.path.join(pg_distrib_dir, "bin")
schedule = os.path.join(src_path, "isolation_schedule")
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
@@ -124,7 +130,7 @@ def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, ca
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress")
build_path = os.path.join(pg_distrib_dir, "../build/v{}/src/test/regress").format(pg_version)
src_path = os.path.join(base_dir, "test_runner/sql_regress")
bindir = os.path.join(pg_distrib_dir, "bin")
schedule = os.path.join(src_path, "parallel_schedule")

View File

@@ -29,6 +29,7 @@ from fixtures.neon_fixtures import (
SafekeeperPort,
available_remote_storages,
neon_binpath,
pg_version,
wait_for_last_record_lsn,
wait_for_upload,
)
@@ -634,6 +635,9 @@ class ProposerPostgres(PgProtocol):
}
basepath = self.pg_bin.run_capture(command, env)
log.info(f"postgres --sync-safekeepers output: {basepath}")
stdout_filename = basepath + ".stdout"
with open(stdout_filename, "r") as stdout_f:
@@ -662,7 +666,9 @@ class ProposerPostgres(PgProtocol):
# insert wal in all safekeepers and run sync on proposer
def test_sync_safekeepers(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor,
):
# We don't really need the full environment for this test, just the
@@ -699,6 +705,7 @@ def test_sync_safekeepers(
"begin_lsn": int(begin_lsn),
"epoch_start_lsn": int(epoch_start_lsn),
"truncate_lsn": int(epoch_start_lsn),
"pg_version": int(pg_version) * 10000,
},
)
lsn = Lsn(res["inserted_wal"]["end_lsn"])