diff --git a/Cargo.lock b/Cargo.lock index d390df94e0..c078510129 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2874,7 +2874,6 @@ dependencies = [ "serde", "thiserror", "utils", - "wal_craft", "workspace_hack", ] @@ -4894,7 +4893,9 @@ dependencies = [ "once_cell", "postgres", "postgres_ffi", + "regex", "tempfile", + "utils", "workspace_hack", ] diff --git a/Cargo.toml b/Cargo.toml index 1cb8d65948..d7bffe67e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,20 @@ members = [ "storage_broker", "workspace_hack", "trace", - "libs/*", + "libs/compute_api", + "libs/pageserver_api", + "libs/postgres_ffi", + "libs/safekeeper_api", + "libs/utils", + "libs/consumption_metrics", + "libs/postgres_backend", + "libs/pq_proto", + "libs/tenant_size_model", + "libs/metrics", + "libs/postgres_connection", + "libs/remote_storage", + "libs/tracing-utils", + "libs/postgres_ffi/wal_craft", ] [workspace.package] diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 159fc5946d..86e72f6bdd 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -24,7 +24,6 @@ workspace_hack.workspace = true [dev-dependencies] env_logger.workspace = true postgres.workspace = true -wal_craft = { path = "wal_craft" } [build-dependencies] anyhow.workspace = true diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index b8eb469cb0..cc115664d5 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -33,6 +33,7 @@ macro_rules! postgres_ffi { } pub mod controlfile_utils; pub mod nonrelfile_utils; + pub mod wal_craft_test_export; pub mod waldecoder_handler; pub mod xlog_utils; @@ -45,8 +46,15 @@ macro_rules! postgres_ffi { }; } -postgres_ffi!(v14); -postgres_ffi!(v15); +#[macro_export] +macro_rules! for_all_postgres_versions { + ($macro:tt) => { + $macro!(v14); + $macro!(v15); + }; +} + +for_all_postgres_versions! { postgres_ffi } pub mod pg_constants; pub mod relfile_utils; diff --git a/libs/postgres_ffi/src/wal_craft_test_export.rs b/libs/postgres_ffi/src/wal_craft_test_export.rs new file mode 100644 index 0000000000..147567c442 --- /dev/null +++ b/libs/postgres_ffi/src/wal_craft_test_export.rs @@ -0,0 +1,6 @@ +//! This module is for WAL craft to test with postgres_ffi. Should not import any thing in normal usage. + +pub use super::PG_MAJORVERSION; +pub use super::xlog_utils::*; +pub use super::bindings::*; +pub use crate::WAL_SEGMENT_SIZE; diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 4d7bb61883..61a9c38a84 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -481,220 +481,4 @@ pub fn encode_logical_message(prefix: &str, message: &str) -> Vec { wal } -#[cfg(test)] -mod tests { - use super::super::PG_MAJORVERSION; - use super::*; - use regex::Regex; - use std::cmp::min; - use std::fs; - use std::{env, str::FromStr}; - use utils::const_assert; - - fn init_logging() { - let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or( - format!("wal_craft=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"), - )) - .is_test(true) - .try_init(); - } - - fn test_end_of_wal(test_name: &str) { - use wal_craft::*; - - let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); - - // Craft some WAL - let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) - .join("..") - .join(".."); - let cfg = Conf { - pg_version, - pg_distrib_dir: top_path.join("pg_install"), - datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), - }; - if cfg.datadir.exists() { - fs::remove_dir_all(&cfg.datadir).unwrap(); - } - cfg.initdb().unwrap(); - let srv = cfg.start_server().unwrap(); - let (intermediate_lsns, expected_end_of_wal_partial) = - C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap(); - let intermediate_lsns: Vec = intermediate_lsns - .iter() - .map(|&lsn| u64::from(lsn).into()) - .collect(); - let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); - srv.kill(); - - // Check find_end_of_wal on the initial WAL - let last_segment = cfg - .wal_dir() - .read_dir() - .unwrap() - .map(|f| f.unwrap().file_name().into_string().unwrap()) - .filter(|fname| IsXLogFileName(fname)) - .max() - .unwrap(); - check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); - for start_lsn in intermediate_lsns - .iter() - .chain(std::iter::once(&expected_end_of_wal)) - { - // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. - // We assume that `start_lsn` is non-decreasing. - info!( - "Checking with start_lsn={}, erasing WAL before it", - start_lsn - ); - for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { - let fname = file.file_name().into_string().unwrap(); - if !IsXLogFileName(&fname) { - continue; - } - let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); - let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); - if seg_start_lsn > u64::from(*start_lsn) { - continue; - } - let mut f = File::options().write(true).open(file.path()).unwrap(); - const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; - f.write_all( - &ZEROS[0..min( - WAL_SEGMENT_SIZE, - (u64::from(*start_lsn) - seg_start_lsn) as usize, - )], - ) - .unwrap(); - } - check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); - } - } - - fn check_pg_waldump_end_of_wal( - cfg: &wal_craft::Conf, - last_segment: &str, - expected_end_of_wal: Lsn, - ) { - // Get the actual end of WAL by pg_waldump - let waldump_output = cfg - .pg_waldump("000000010000000000000001", last_segment) - .unwrap() - .stderr; - let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); - let caps = match Regex::new(r"invalid record length at (.+):") - .unwrap() - .captures(waldump_output) - { - Some(caps) => caps, - None => { - error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); - panic!(); - } - }; - let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); - info!( - "waldump erred on {}, expected wal end at {}", - waldump_wal_end, expected_end_of_wal - ); - assert_eq!(waldump_wal_end, expected_end_of_wal); - } - - fn check_end_of_wal( - cfg: &wal_craft::Conf, - last_segment: &str, - start_lsn: Lsn, - expected_end_of_wal: Lsn, - ) { - // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) - // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); - // info!( - // "find_end_of_wal returned wal_end={} with non-partial WAL segment", - // wal_end - // ); - // assert_eq!(wal_end, expected_end_of_wal_non_partial); - - // Rename file to partial to actually find last valid lsn, then rename it back. - fs::rename( - cfg.wal_dir().join(last_segment), - cfg.wal_dir().join(format!("{}.partial", last_segment)), - ) - .unwrap(); - let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); - info!( - "find_end_of_wal returned wal_end={} with partial WAL segment", - wal_end - ); - assert_eq!(wal_end, expected_end_of_wal); - fs::rename( - cfg.wal_dir().join(format!("{}.partial", last_segment)), - cfg.wal_dir().join(last_segment), - ) - .unwrap(); - } - - const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024); - - #[test] - pub fn test_find_end_of_wal_simple() { - init_logging(); - test_end_of_wal::("test_find_end_of_wal_simple"); - } - - #[test] - pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { - init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_crossing_segment_followed_by_small_one", - ); - } - - #[test] - pub fn test_find_end_of_wal_last_crossing_segment() { - init_logging(); - test_end_of_wal::( - "test_find_end_of_wal_last_crossing_segment", - ); - } - - /// Check the math in update_next_xid - /// - /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, - /// currently 1024. - #[test] - pub fn test_update_next_xid() { - let checkpoint_buf = [0u8; std::mem::size_of::()]; - let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); - - checkpoint.nextXid = FullTransactionId { value: 10 }; - assert_eq!(checkpoint.nextXid.value, 10); - - // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL - // boundary - checkpoint.update_next_xid(100); - assert_eq!(checkpoint.nextXid.value, 1024); - - // No change - checkpoint.update_next_xid(500); - assert_eq!(checkpoint.nextXid.value, 1024); - checkpoint.update_next_xid(1023); - assert_eq!(checkpoint.nextXid.value, 1024); - - // The function returns the *next* XID, given the highest XID seen so - // far. So when we pass 1024, the nextXid gets bumped up to the next - // XID_CHECKPOINT_INTERVAL boundary. - checkpoint.update_next_xid(1024); - assert_eq!(checkpoint.nextXid.value, 2048); - } - - #[test] - pub fn test_encode_logical_message() { - let expected = [ - 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, - 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, - 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, - ]; - let actual = encode_logical_message("prefix", "message"); - assert_eq!(expected, actual[..]); - } -} +// If you need to craft WAL and write tests for this module, put it at wal_craft crate. diff --git a/libs/postgres_ffi/wal_craft/Cargo.toml b/libs/postgres_ffi/wal_craft/Cargo.toml index 992bf7460b..bea888b23e 100644 --- a/libs/postgres_ffi/wal_craft/Cargo.toml +++ b/libs/postgres_ffi/wal_craft/Cargo.toml @@ -15,3 +15,7 @@ postgres_ffi.workspace = true tempfile.workspace = true workspace_hack.workspace = true + +[dev-dependencies] +regex.workspace = true +utils.workspace = true diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 9f3f4dc20d..d4aed88048 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -10,6 +10,20 @@ use std::process::Command; use std::time::{Duration, Instant}; use tempfile::{tempdir, TempDir}; +macro_rules! xlog_utils_test { + ($version:ident) => { + #[path = "."] + mod $version { + pub use postgres_ffi::$version::wal_craft_test_export::*; + #[allow(clippy::duplicate_mod)] + #[cfg(test)] + mod xlog_utils_test; + } + }; +} + +postgres_ffi::for_all_postgres_versions! { xlog_utils_test } + #[derive(Debug, Clone, PartialEq, Eq)] pub struct Conf { pub pg_version: u32, diff --git a/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs new file mode 100644 index 0000000000..6ff4c563b2 --- /dev/null +++ b/libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs @@ -0,0 +1,219 @@ +//! Tests for postgres_ffi xlog_utils module. Put it here to break cyclic dependency. + +use super::*; +use crate::{error, info}; +use regex::Regex; +use std::cmp::min; +use std::fs::{self, File}; +use std::io::Write; +use std::{env, str::FromStr}; +use utils::const_assert; +use utils::lsn::Lsn; + +fn init_logging() { + let _ = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or( + format!("crate=info,postgres_ffi::{PG_MAJORVERSION}::xlog_utils=trace"), + )) + .is_test(true) + .try_init(); +} + +fn test_end_of_wal(test_name: &str) { + use crate::*; + + let pg_version = PG_MAJORVERSION[1..3].parse::().unwrap(); + + // Craft some WAL + let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("..") + .join("..") + .join(".."); + let cfg = Conf { + pg_version, + pg_distrib_dir: top_path.join("pg_install"), + datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), + }; + if cfg.datadir.exists() { + fs::remove_dir_all(&cfg.datadir).unwrap(); + } + cfg.initdb().unwrap(); + let srv = cfg.start_server().unwrap(); + let (intermediate_lsns, expected_end_of_wal_partial) = + C::craft(&mut srv.connect_with_timeout().unwrap()).unwrap(); + let intermediate_lsns: Vec = intermediate_lsns + .iter() + .map(|&lsn| u64::from(lsn).into()) + .collect(); + let expected_end_of_wal: Lsn = u64::from(expected_end_of_wal_partial).into(); + srv.kill(); + + // Check find_end_of_wal on the initial WAL + let last_segment = cfg + .wal_dir() + .read_dir() + .unwrap() + .map(|f| f.unwrap().file_name().into_string().unwrap()) + .filter(|fname| IsXLogFileName(fname)) + .max() + .unwrap(); + check_pg_waldump_end_of_wal(&cfg, &last_segment, expected_end_of_wal); + for start_lsn in intermediate_lsns + .iter() + .chain(std::iter::once(&expected_end_of_wal)) + { + // Erase all WAL before `start_lsn` to ensure it's not used by `find_end_of_wal`. + // We assume that `start_lsn` is non-decreasing. + info!( + "Checking with start_lsn={}, erasing WAL before it", + start_lsn + ); + for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() { + let fname = file.file_name().into_string().unwrap(); + if !IsXLogFileName(&fname) { + continue; + } + let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE); + let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); + if seg_start_lsn > u64::from(*start_lsn) { + continue; + } + let mut f = File::options().write(true).open(file.path()).unwrap(); + const ZEROS: [u8; WAL_SEGMENT_SIZE] = [0u8; WAL_SEGMENT_SIZE]; + f.write_all( + &ZEROS[0..min( + WAL_SEGMENT_SIZE, + (u64::from(*start_lsn) - seg_start_lsn) as usize, + )], + ) + .unwrap(); + } + check_end_of_wal(&cfg, &last_segment, *start_lsn, expected_end_of_wal); + } +} + +fn check_pg_waldump_end_of_wal( + cfg: &crate::Conf, + last_segment: &str, + expected_end_of_wal: Lsn, +) { + // Get the actual end of WAL by pg_waldump + let waldump_output = cfg + .pg_waldump("000000010000000000000001", last_segment) + .unwrap() + .stderr; + let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); + let caps = match Regex::new(r"invalid record length at (.+):") + .unwrap() + .captures(waldump_output) + { + Some(caps) => caps, + None => { + error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); + panic!(); + } + }; + let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); + info!( + "waldump erred on {}, expected wal end at {}", + waldump_wal_end, expected_end_of_wal + ); + assert_eq!(waldump_wal_end, expected_end_of_wal); +} + +fn check_end_of_wal( + cfg: &crate::Conf, + last_segment: &str, + start_lsn: Lsn, + expected_end_of_wal: Lsn, +) { + // Check end_of_wal on non-partial WAL segment (we treat it as fully populated) + // let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + // info!( + // "find_end_of_wal returned wal_end={} with non-partial WAL segment", + // wal_end + // ); + // assert_eq!(wal_end, expected_end_of_wal_non_partial); + + // Rename file to partial to actually find last valid lsn, then rename it back. + fs::rename( + cfg.wal_dir().join(last_segment), + cfg.wal_dir().join(format!("{}.partial", last_segment)), + ) + .unwrap(); + let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap(); + info!( + "find_end_of_wal returned wal_end={} with partial WAL segment", + wal_end + ); + assert_eq!(wal_end, expected_end_of_wal); + fs::rename( + cfg.wal_dir().join(format!("{}.partial", last_segment)), + cfg.wal_dir().join(last_segment), + ) + .unwrap(); +} + +const_assert!(WAL_SEGMENT_SIZE == 16 * 1024 * 1024); + +#[test] +pub fn test_find_end_of_wal_simple() { + init_logging(); + test_end_of_wal::("test_find_end_of_wal_simple"); +} + +#[test] +pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { + init_logging(); + test_end_of_wal::( + "test_find_end_of_wal_crossing_segment_followed_by_small_one", + ); +} + +#[test] +pub fn test_find_end_of_wal_last_crossing_segment() { + init_logging(); + test_end_of_wal::( + "test_find_end_of_wal_last_crossing_segment", + ); +} + +/// Check the math in update_next_xid +/// +/// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, +/// currently 1024. +#[test] +pub fn test_update_next_xid() { + let checkpoint_buf = [0u8; std::mem::size_of::()]; + let mut checkpoint = CheckPoint::decode(&checkpoint_buf).unwrap(); + + checkpoint.nextXid = FullTransactionId { value: 10 }; + assert_eq!(checkpoint.nextXid.value, 10); + + // The input XID gets rounded up to the next XID_CHECKPOINT_INTERVAL + // boundary + checkpoint.update_next_xid(100); + assert_eq!(checkpoint.nextXid.value, 1024); + + // No change + checkpoint.update_next_xid(500); + assert_eq!(checkpoint.nextXid.value, 1024); + checkpoint.update_next_xid(1023); + assert_eq!(checkpoint.nextXid.value, 1024); + + // The function returns the *next* XID, given the highest XID seen so + // far. So when we pass 1024, the nextXid gets bumped up to the next + // XID_CHECKPOINT_INTERVAL boundary. + checkpoint.update_next_xid(1024); + assert_eq!(checkpoint.nextXid.value, 2048); +} + +#[test] +pub fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, + 38, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, + 101, 102, 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix", "message"); + assert_eq!(expected, actual[..]); +}