From 3c6890bf1dd72722c646d918b984d2392a010ce2 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Thu, 21 Apr 2022 14:54:22 +0300 Subject: [PATCH] postgres_ffi: add complex WAL tests for find_end_of_wal * Actual generation logic is in a separate crate `postgres_ffi/wal_generate` * The create also provides a binary for debug purposes akin to `initdb` * Two tests currently fail and are ignored * There is no easy way to test this directly in Safekeeper as it starts restoring from commit_lsn. So testing would require disconnecting Safekeeper just after it has received the WAL, but before it is committed. --- Cargo.lock | 15 + libs/postgres_ffi/Cargo.toml | 5 + libs/postgres_ffi/src/xlog_utils.rs | 143 ++++++--- libs/postgres_ffi/wal_generate/Cargo.toml | 14 + .../wal_generate/src/bin/wal_generate.rs | 58 ++++ libs/postgres_ffi/wal_generate/src/lib.rs | 278 ++++++++++++++++++ 6 files changed, 466 insertions(+), 47 deletions(-) create mode 100644 libs/postgres_ffi/wal_generate/Cargo.toml create mode 100644 libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs create mode 100644 libs/postgres_ffi/wal_generate/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6a320ee274..6acad6dac8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2047,15 +2047,18 @@ dependencies = [ "bytes", "chrono", "crc32c", + "env_logger", "hex", "lazy_static", "log", "memoffset", + "postgres", "rand", "regex", "serde", "thiserror", "utils", + "wal_generate", "workspace_hack", ] @@ -3627,6 +3630,18 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "wal_generate" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap 3.0.14", + "env_logger", + "log", + "postgres", + "tempfile", +] + [[package]] name = "walkdir" version = "2.3.2" diff --git a/libs/postgres_ffi/Cargo.toml b/libs/postgres_ffi/Cargo.toml index 7be5ca1b93..129c93cf6d 100644 --- a/libs/postgres_ffi/Cargo.toml +++ b/libs/postgres_ffi/Cargo.toml @@ -20,5 +20,10 @@ serde = { version = "1.0", features = ["derive"] } utils = { path = "../utils" } workspace_hack = { version = "0.1", path = "../../workspace_hack" } +[dev-dependencies] +env_logger = "0.9" +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +wal_generate = { path = "wal_generate" } + [build-dependencies] bindgen = "0.59.1" diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 7882058868..3e30f9905e 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -476,78 +476,127 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result anyhow::Result, + expected_end_of_wal_non_partial: Lsn, + last_segment: &str, + ) { + use wal_generate::*; + // 1. Generate some WAL let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) .join("..") .join(".."); - let data_dir = top_path.join("test_output/test_find_end_of_wal"); - let initdb_path = top_path.join("tmp_install/bin/initdb"); - let lib_path = top_path.join("tmp_install/lib"); - if data_dir.exists() { - fs::remove_dir_all(&data_dir).unwrap(); + let cfg = Conf { + pg_distrib_dir: top_path.join("tmp_install"), + datadir: top_path.join(format!("test_output/{}", test_name)), + }; + if cfg.datadir.exists() { + fs::remove_dir_all(&cfg.datadir).unwrap(); } - println!("Using initdb from '{}'", initdb_path.display()); - println!("Data directory '{}'", data_dir.display()); - let initdb_output = Command::new(initdb_path) - .args(&["-D", data_dir.to_str().unwrap()]) - .arg("--no-instructions") - .arg("--no-sync") - .env_clear() - .env("LD_LIBRARY_PATH", &lib_path) - .env("DYLD_LIBRARY_PATH", &lib_path) - .output() - .unwrap(); - assert!( - initdb_output.status.success(), - "initdb failed. Status: '{}', stdout: '{}', stderr: '{}'", - initdb_output.status, - String::from_utf8_lossy(&initdb_output.stdout), - String::from_utf8_lossy(&initdb_output.stderr), - ); + cfg.initdb().unwrap(); + let mut srv = cfg.start_server().unwrap(); + let expected_wal_end: Lsn = + u64::from(generate_wal(&mut srv.connect_with_timeout().unwrap()).unwrap()).into(); + srv.kill(); // 2. Pick WAL generated by initdb - let wal_dir = data_dir.join("pg_wal"); + let wal_dir = cfg.datadir.join("pg_wal"); let wal_seg_size = 16 * 1024 * 1024; // 3. Check end_of_wal on non-partial WAL segment (we treat it as fully populated) let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap(); let wal_end = Lsn(wal_end); - println!("wal_end={}, tli={}", wal_end, tli); - assert_eq!(wal_end, "0/2000000".parse::().unwrap()); + info!( + "find_end_of_wal returned (wal_end={}, tli={})", + wal_end, tli + ); + assert_eq!(wal_end, expected_end_of_wal_non_partial); // 4. Get the actual end of WAL by pg_waldump - let waldump_path = top_path.join("tmp_install/bin/pg_waldump"); - let waldump_output = Command::new(waldump_path) - .arg(wal_dir.join("000000010000000000000001")) - .env_clear() - .env("LD_LIBRARY_PATH", &lib_path) - .env("DYLD_LIBRARY_PATH", &lib_path) - .output() - .unwrap(); - let waldump_output = std::str::from_utf8(&waldump_output.stderr).unwrap(); - println!("waldump_output = '{}'", &waldump_output); - let re = Regex::new(r"invalid record length at (.+):").unwrap(); - let caps = re.captures(waldump_output).unwrap(); + let waldump_output = cfg + .pg_waldump("000000010000000000000001", last_segment) + .unwrap() + .stderr; + let waldump_output = std::str::from_utf8(&waldump_output).unwrap(); + let caps = match Regex::new(r"invalid record length at (.+):") + .unwrap() + .captures(waldump_output) + { + Some(caps) => caps, + None => { + error!("Unable to parse pg_waldump's stderr:\n{}", waldump_output); + panic!(); + } + }; let waldump_wal_end = Lsn::from_str(caps.get(1).unwrap().as_str()).unwrap(); + info!( + "waldump erred on {}, expected wal end at {}", + waldump_wal_end, expected_wal_end + ); + assert_eq!(waldump_wal_end, expected_wal_end); // 5. Rename file to partial to actually find last valid lsn fs::rename( - wal_dir.join("000000010000000000000001"), - wal_dir.join("000000010000000000000001.partial"), + wal_dir.join(last_segment), + wal_dir.join(format!("{}.partial", last_segment)), ) .unwrap(); let (wal_end, tli) = find_end_of_wal(&wal_dir, wal_seg_size, true, Lsn(0)).unwrap(); let wal_end = Lsn(wal_end); - println!("wal_end={}, tli={}", wal_end, tli); + info!( + "find_end_of_wal returned (wal_end={}, tli={})", + wal_end, tli + ); assert_eq!(wal_end, waldump_wal_end); } + #[test] + pub fn test_find_end_of_wal_simple() { + init_logging(); + test_end_of_wal( + "test_find_end_of_wal_simple", + wal_generate::generate_simple, + "0/2000000".parse::().unwrap(), + "000000010000000000000001", + ); + } + + #[test] + #[ignore = "not yet fixed, needs correct skipping of contrecord"] // TODO + pub fn test_find_end_of_wal_crossing_segment_followed_by_small_one() { + init_logging(); + test_end_of_wal( + "test_find_end_of_wal_crossing_segment_followed_by_small_one", + wal_generate::generate_wal_record_crossing_segment_followed_by_small_one, + "0/3000000".parse::().unwrap(), + "000000010000000000000002", + ); + } + + #[test] + #[ignore = "not yet fixed, needs correct parsing of pre-last segments"] // TODO + pub fn test_find_end_of_wal_last_crossing_segment() { + init_logging(); + test_end_of_wal( + "test_find_end_of_wal_last_crossing_segment", + wal_generate::generate_last_wal_record_crossing_segment, + "0/3000000".parse::().unwrap(), + "000000010000000000000002", + ); + } + /// Check the math in update_next_xid /// /// NOTE: These checks are sensitive to the value of XID_CHECKPOINT_INTERVAL, diff --git a/libs/postgres_ffi/wal_generate/Cargo.toml b/libs/postgres_ffi/wal_generate/Cargo.toml new file mode 100644 index 0000000000..a10671dddd --- /dev/null +++ b/libs/postgres_ffi/wal_generate/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "wal_generate" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +clap = "3.0" +env_logger = "0.9" +log = "0.4" +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +tempfile = "3.2" diff --git a/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs b/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs new file mode 100644 index 0000000000..07ceb31c7f --- /dev/null +++ b/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs @@ -0,0 +1,58 @@ +use anyhow::*; +use clap::{App, Arg}; +use wal_generate::*; + +fn main() -> Result<()> { + env_logger::Builder::from_env( + env_logger::Env::default().default_filter_or("wal_generate=info"), + ) + .init(); + let arg_matches = App::new("Postgres WAL generator") + .about("Generates Postgres databases with specific WAL properties") + .arg( + Arg::new("datadir") + .short('D') + .long("datadir") + .takes_value(true) + .help("Data directory for the Postgres server") + .required(true) + ) + .arg( + Arg::new("pg-distrib-dir") + .long("pg-distrib-dir") + .takes_value(true) + .help("Directory with Postgres distribution (bin and lib directories, e.g. tmp_install)") + .default_value("/usr/local") + ) + .arg( + Arg::new("type") + .long("type") + .takes_value(true) + .help("Type of WAL to generate") + .possible_values(["simple", "last_wal_record_crossing_segment", "wal_record_crossing_segment_followed_by_small_one"]) + .required(true) + ) + .get_matches(); + + let cfg = Conf { + pg_distrib_dir: arg_matches.value_of("pg-distrib-dir").unwrap().into(), + datadir: arg_matches.value_of("datadir").unwrap().into(), + }; + cfg.initdb()?; + let mut srv = cfg.start_server()?; + let lsn = match arg_matches.value_of("type").unwrap() { + "simple" => generate_simple(&mut srv.connect_with_timeout()?)?, + "last_wal_record_crossing_segment" => { + generate_last_wal_record_crossing_segment(&mut srv.connect_with_timeout()?)? + } + "wal_record_crossing_segment_followed_by_small_one" => { + generate_wal_record_crossing_segment_followed_by_small_one( + &mut srv.connect_with_timeout()?, + )? + } + a => panic!("Unknown --type argument: {}", a), + }; + println!("end_of_wal = {}", lsn); + srv.kill(); + Ok(()) +} diff --git a/libs/postgres_ffi/wal_generate/src/lib.rs b/libs/postgres_ffi/wal_generate/src/lib.rs new file mode 100644 index 0000000000..a5cd81d68a --- /dev/null +++ b/libs/postgres_ffi/wal_generate/src/lib.rs @@ -0,0 +1,278 @@ +use anyhow::*; +use core::time::Duration; +use log::*; +use postgres::types::PgLsn; +use postgres::Client; +use std::cmp::Ordering; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::time::Instant; +use tempfile::{tempdir, TempDir}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Conf { + pub pg_distrib_dir: PathBuf, + pub datadir: PathBuf, +} + +pub struct PostgresServer { + process: std::process::Child, + _unix_socket_dir: TempDir, + client_config: postgres::Config, +} + +impl Conf { + fn pg_bin_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("bin") + } + + fn pg_lib_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("lib") + } + + fn new_pg_command(&self, command: impl AsRef) -> Result { + let path = self.pg_bin_dir().join(command); + ensure!(path.exists(), "Command {:?} does not exist", path); + let mut cmd = Command::new(path); + cmd.env_clear() + .env("LD_LIBRARY_PATH", self.pg_lib_dir()) + .env("DYLD_LIBRARY_PATH", self.pg_lib_dir()); + Ok(cmd) + } + + pub fn initdb(&self) -> Result<()> { + if let Some(parent) = self.datadir.parent() { + info!("Pre-creating parent directory {:?}", parent); + // Tests may be run concurrently and there may be a race to create `test_output/`. + // std::fs::create_dir_all is guaranteed to have no races with another thread creating directories. + std::fs::create_dir_all(parent)?; + } + info!( + "Running initdb in {:?} with user \"postgres\"", + self.datadir + ); + let output = self + .new_pg_command("initdb")? + .arg("-D") + .arg(self.datadir.as_os_str()) + .args(&["-U", "postgres", "--no-instructions", "--no-sync"]) + .output()?; + debug!("initdb output: {:?}", output); + ensure!( + output.status.success(), + "initdb failed, stdout and stderr follow:\n{}{}", + String::from_utf8_lossy(&output.stdout), + String::from_utf8_lossy(&output.stderr), + ); + Ok(()) + } + + pub fn start_server(&self) -> Result { + info!("Starting Postgres server in {:?}", self.datadir); + let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols) + let unix_socket_dir_path = unix_socket_dir.path().to_owned(); + let server_process = self + .new_pg_command("postgres")? + .args(&["-c", "listen_addresses="]) + .arg("-k") + .arg(unix_socket_dir_path.as_os_str()) + .arg("-D") + .arg(self.datadir.as_os_str()) + .args(&["-c", "wal_keep_size=50MB"]) // Ensure old WAL is not removed + .args(&["-c", "logging_collector=on"]) // stderr will mess up with tests output + .args(&["-c", "shared_preload_libraries=zenith"]) // can only be loaded at startup + // Disable background processes as much as possible + .args(&["-c", "wal_writer_delay=10s"]) + .args(&["-c", "autovacuum=off"]) + .stderr(Stdio::null()) + .spawn()?; + let server = PostgresServer { + process: server_process, + _unix_socket_dir: unix_socket_dir, + client_config: { + let mut c = postgres::Config::new(); + c.host_path(&unix_socket_dir_path); + c.user("postgres"); + c.connect_timeout(Duration::from_millis(1000)); + c + }, + }; + Ok(server) + } + + pub fn pg_waldump( + &self, + first_segment_name: &str, + last_segment_name: &str, + ) -> Result { + let first_segment_file = self.datadir.join(first_segment_name); + let last_segment_file = self.datadir.join(last_segment_name); + info!( + "Running pg_waldump for {} .. {}", + first_segment_file.display(), + last_segment_file.display() + ); + let output = self + .new_pg_command("pg_waldump")? + .args(&[ + &first_segment_file.as_os_str(), + &last_segment_file.as_os_str(), + ]) + .output()?; + debug!("waldump output: {:?}", output); + Ok(output) + } +} + +impl PostgresServer { + pub fn connect_with_timeout(&self) -> Result { + let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap(); + while Instant::now() < retry_until { + use std::result::Result::Ok; + if let Ok(client) = self.client_config.connect(postgres::NoTls) { + return Ok(client); + } + std::thread::sleep(Duration::from_millis(100)); + } + bail!("Connection timed out"); + } + + pub fn kill(&mut self) { + self.process.kill().unwrap(); + self.process.wait().unwrap(); + } +} + +impl Drop for PostgresServer { + fn drop(&mut self) { + use std::result::Result::Ok; + match self.process.try_wait() { + Ok(Some(_)) => return, + Ok(None) => { + warn!("Server was not terminated, will be killed"); + } + Err(e) => { + error!("Unable to get status of the server: {}, will be killed", e); + } + } + let _ = self.process.kill(); + } +} + +pub trait PostgresClientExt: postgres::GenericClient { + fn pg_current_wal_insert_lsn(&mut self) -> Result { + Ok(self + .query_one("SELECT pg_current_wal_insert_lsn()", &[])? + .get(0)) + } + fn pg_current_wal_flush_lsn(&mut self) -> Result { + Ok(self + .query_one("SELECT pg_current_wal_flush_lsn()", &[])? + .get(0)) + } +} + +impl PostgresClientExt for C {} + +fn generate_internal( + client: &mut C, + f: impl Fn(&mut C, PgLsn) -> Result>, +) -> Result { + client.execute("create extension if not exists zenith_test_utils", &[])?; + + let wal_segment_size = client.query_one( + "select cast(setting as bigint) as setting, unit \ + from pg_settings where name = 'wal_segment_size'", + &[], + )?; + ensure!( + wal_segment_size.get::<_, String>("unit") == "B", + "Unexpected wal_segment_size unit" + ); + ensure!( + wal_segment_size.get::<_, i64>("setting") == 16 * 1024 * 1024, + "Unexpected wal_segment_size in bytes" + ); + + let initial_lsn = client.pg_current_wal_insert_lsn()?; + info!("LSN initial = {}", initial_lsn); + + let last_lsn = match f(client, initial_lsn)? { + None => client.pg_current_wal_insert_lsn()?, + Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) { + Ordering::Less => bail!("Some records were inserted after the generated WAL"), + Ordering::Equal => last_lsn, + Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"), + }, + }; + + // Some records may be not flushed, e.g. non-transactional logical messages. + client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?; + match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) { + Ordering::Less => bail!("Some records were flushed after the generated WAL"), + Ordering::Equal => {} + Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"), + } + Ok(last_lsn) +} + +pub fn generate_simple(client: &mut impl postgres::GenericClient) -> Result { + generate_internal(client, |client, _| { + client.execute("CREATE table t(x int)", &[])?; + Ok(None) + }) +} + +fn generate_single_logical_message( + client: &mut impl postgres::GenericClient, + transactional: bool, +) -> Result { + generate_internal(client, |client, initial_lsn| { + ensure!( + initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024), + "Initial LSN is too far in the future" + ); + + let message_lsn: PgLsn = client + .query_one( + "select pg_logical_emit_message($1, 'big-16mb-msg', \ + concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn", + &[&transactional], + )? + .get("message_lsn"); + ensure!( + message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192), + "Logical message did not cross the segment boundary" + ); + ensure!( + message_lsn < PgLsn::from(0x0400_0000), + "Logical message crossed two segments" + ); + + if transactional { + // Transactional logical messages are part of a transaction, so the one above is + // followed by a small COMMIT record. + + let after_message_lsn = client.pg_current_wal_insert_lsn()?; + ensure!( + message_lsn < after_message_lsn, + "No record found after the emitted message" + ); + Ok(Some(after_message_lsn)) + } else { + Ok(Some(message_lsn)) + } + }) +} + +pub fn generate_wal_record_crossing_segment_followed_by_small_one( + client: &mut impl postgres::GenericClient, +) -> Result { + generate_single_logical_message(client, true) +} + +pub fn generate_last_wal_record_crossing_segment( + client: &mut C, +) -> Result { + generate_single_logical_message(client, false) +}