From c08fa9d5627ec1d3a9a424484fcb018a0613cae2 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Fri, 24 Jun 2022 22:58:09 +0300 Subject: [PATCH] postgres_ffi/wal_generate: support generating WAL for an already running Postgres server * ensure_server_config() function is added to ensure the server does not have background processes which intervene with WAL generation * Rework command line syntax * Add `print-postgres-config` subcommand which prints the required server configuration --- Cargo.lock | 1 + libs/postgres_ffi/wal_generate/Cargo.toml | 1 + .../wal_generate/src/bin/wal_generate.rs | 118 ++++++++++++------ libs/postgres_ffi/wal_generate/src/lib.rs | 38 ++++-- 4 files changed, 110 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e812ce7eab..1f4cb8f3d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3760,6 +3760,7 @@ dependencies = [ "clap 3.0.14", "env_logger", "log", + "once_cell", "postgres", "tempfile", ] diff --git a/libs/postgres_ffi/wal_generate/Cargo.toml b/libs/postgres_ffi/wal_generate/Cargo.toml index a10671dddd..7edb36937d 100644 --- a/libs/postgres_ffi/wal_generate/Cargo.toml +++ b/libs/postgres_ffi/wal_generate/Cargo.toml @@ -10,5 +10,6 @@ anyhow = "1.0" clap = "3.0" env_logger = "0.9" log = "0.4" +once_cell = "1.8.0" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tempfile = "3.2" diff --git a/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs b/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs index 07ceb31c7f..0da47f32c1 100644 --- a/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs +++ b/libs/postgres_ffi/wal_generate/src/bin/wal_generate.rs @@ -1,5 +1,6 @@ use anyhow::*; -use clap::{App, Arg}; +use clap::{App, Arg, ArgMatches}; +use std::str::FromStr; use wal_generate::*; fn main() -> Result<()> { @@ -7,52 +8,91 @@ fn main() -> Result<()> { env_logger::Env::default().default_filter_or("wal_generate=info"), ) .init(); + let type_arg = &Arg::new("type") + .takes_value(true) + .help("Type of WAL to generate") + .possible_values([ + "simple", + "last_wal_record_crossing_segment", + "wal_record_crossing_segment_followed_by_small_one", + ]) + .required(true); let arg_matches = App::new("Postgres WAL generator") .about("Generates Postgres databases with specific WAL properties") - .arg( - Arg::new("datadir") - .short('D') - .long("datadir") - .takes_value(true) - .help("Data directory for the Postgres server") - .required(true) + .subcommand( + App::new("print-postgres-config") + .about("Print the configuration required for PostgreSQL server before running this script") ) - .arg( - Arg::new("pg-distrib-dir") - .long("pg-distrib-dir") - .takes_value(true) - .help("Directory with Postgres distribution (bin and lib directories, e.g. tmp_install)") - .default_value("/usr/local") + .subcommand( + App::new("with-initdb") + .about("Generate WAL in a new data directory first initialized with initdb") + .arg(type_arg) + .arg( + Arg::new("datadir") + .takes_value(true) + .help("Data directory for the Postgres server") + .required(true) + ) + .arg( + Arg::new("pg-distrib-dir") + .long("pg-distrib-dir") + .takes_value(true) + .help("Directory with Postgres distribution (bin and lib directories, e.g. tmp_install)") + .default_value("/usr/local") + ) ) - .arg( - Arg::new("type") - .long("type") - .takes_value(true) - .help("Type of WAL to generate") - .possible_values(["simple", "last_wal_record_crossing_segment", "wal_record_crossing_segment_followed_by_small_one"]) - .required(true) + .subcommand( + App::new("in-existing") + .about("Generate WAL at an existing recently created Postgres database. Note that server may append new WAL entries on shutdown.") + .arg(type_arg) + .arg( + Arg::new("connection") + .takes_value(true) + .help("Connection string to the Postgres database to populate") + .required(true) + ) ) .get_matches(); - let cfg = Conf { - pg_distrib_dir: arg_matches.value_of("pg-distrib-dir").unwrap().into(), - datadir: arg_matches.value_of("datadir").unwrap().into(), + let wal_generate = |arg_matches: &ArgMatches, client| { + let lsn = match arg_matches.value_of("type").unwrap() { + "simple" => generate_simple(client)?, + "last_wal_record_crossing_segment" => { + generate_last_wal_record_crossing_segment(client)? + } + "wal_record_crossing_segment_followed_by_small_one" => { + generate_wal_record_crossing_segment_followed_by_small_one(client)? + } + a => panic!("Unknown --type argument: {}", a), + }; + println!("end_of_wal = {}", lsn); + Ok(()) }; - cfg.initdb()?; - let mut srv = cfg.start_server()?; - let lsn = match arg_matches.value_of("type").unwrap() { - "simple" => generate_simple(&mut srv.connect_with_timeout()?)?, - "last_wal_record_crossing_segment" => { - generate_last_wal_record_crossing_segment(&mut srv.connect_with_timeout()?)? + + match arg_matches.subcommand() { + None => panic!("No subcommand provided"), + Some(("print-postgres-config", _)) => { + for cfg in REQUIRED_POSTGRES_CONFIG.iter() { + println!("{}", cfg); + } + Ok(()) } - "wal_record_crossing_segment_followed_by_small_one" => { - generate_wal_record_crossing_segment_followed_by_small_one( - &mut srv.connect_with_timeout()?, - )? + Some(("with-initdb", arg_matches)) => { + let cfg = Conf { + pg_distrib_dir: arg_matches.value_of("pg-distrib-dir").unwrap().into(), + datadir: arg_matches.value_of("datadir").unwrap().into(), + }; + cfg.initdb()?; + let mut srv = cfg.start_server()?; + wal_generate(arg_matches, &mut srv.connect_with_timeout()?)?; + srv.kill(); + Ok(()) } - a => panic!("Unknown --type argument: {}", a), - }; - println!("end_of_wal = {}", lsn); - srv.kill(); - Ok(()) + Some(("in-existing", arg_matches)) => wal_generate( + arg_matches, + &mut postgres::Config::from_str(arg_matches.value_of("connection").unwrap())? + .connect(postgres::NoTls)?, + ), + Some(_) => panic!("Unknown subcommand"), + } } diff --git a/libs/postgres_ffi/wal_generate/src/lib.rs b/libs/postgres_ffi/wal_generate/src/lib.rs index 2b3f5ef703..78ce320515 100644 --- a/libs/postgres_ffi/wal_generate/src/lib.rs +++ b/libs/postgres_ffi/wal_generate/src/lib.rs @@ -1,6 +1,7 @@ use anyhow::*; use core::time::Duration; use log::*; +use once_cell::sync::Lazy; use postgres::types::PgLsn; use postgres::Client; use std::cmp::Ordering; @@ -22,6 +23,16 @@ pub struct PostgresServer { client_config: postgres::Config, } +pub static REQUIRED_POSTGRES_CONFIG: Lazy> = Lazy::new(|| { + vec![ + "wal_keep_size=50MB", // Ensure old WAL is not removed + "shared_preload_libraries=neon", // can only be loaded at startup + // Disable background processes as much as possible + "wal_writer_delay=10s", + "autovacuum=off", + ] +}); + impl Conf { fn pg_bin_dir(&self) -> PathBuf { self.pg_distrib_dir.join("bin") @@ -85,12 +96,8 @@ impl Conf { .arg(unix_socket_dir_path.as_os_str()) .arg("-D") .arg(self.datadir.as_os_str()) - .args(&["-c", "wal_keep_size=50MB"]) // Ensure old WAL is not removed .args(&["-c", "logging_collector=on"]) // stderr will mess up with tests output - .args(&["-c", "shared_preload_libraries=neon"]) // can only be loaded at startup - // Disable background processes as much as possible - .args(&["-c", "wal_writer_delay=10s"]) - .args(&["-c", "autovacuum=off"]) + .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg])) .stderr(Stdio::from(log_file)) .spawn()?; let server = PostgresServer { @@ -181,12 +188,16 @@ pub trait PostgresClientExt: postgres::GenericClient { impl PostgresClientExt for C {} -fn generate_internal( - client: &mut C, - f: impl Fn(&mut C, PgLsn) -> Result>, -) -> Result { +pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result<()> { client.execute("create extension if not exists neon_test_utils", &[])?; + let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0); + ensure!(wal_keep_size == "50MB"); + let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0); + ensure!(wal_writer_delay == "10s"); + let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0); + ensure!(autovacuum == "off"); + let wal_segment_size = client.query_one( "select cast(setting as bigint) as setting, unit \ from pg_settings where name = 'wal_segment_size'", @@ -201,6 +212,15 @@ fn generate_internal( "Unexpected wal_segment_size in bytes" ); + Ok(()) +} + +fn generate_internal( + client: &mut C, + f: impl Fn(&mut C, PgLsn) -> Result>, +) -> Result { + ensure_server_config(client)?; + let initial_lsn = client.pg_current_wal_insert_lsn()?; info!("LSN initial = {}", initial_lsn);