diff --git a/libs/postgres_ffi/wal_craft/src/lib.rs b/libs/postgres_ffi/wal_craft/src/lib.rs index 88ae41c636..9f3f4dc20d 100644 --- a/libs/postgres_ffi/wal_craft/src/lib.rs +++ b/libs/postgres_ffi/wal_craft/src/lib.rs @@ -1,5 +1,4 @@ -use anyhow::*; -use core::time::Duration; +use anyhow::{bail, ensure}; use log::*; use postgres::types::PgLsn; use postgres::Client; @@ -8,7 +7,7 @@ use postgres_ffi::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD}; use std::cmp::Ordering; use std::path::{Path, PathBuf}; use std::process::Command; -use std::time::Instant; +use std::time::{Duration, Instant}; use tempfile::{tempdir, TempDir}; #[derive(Debug, Clone, PartialEq, Eq)] @@ -55,7 +54,7 @@ impl Conf { self.datadir.join("pg_wal") } - fn new_pg_command(&self, command: impl AsRef) -> Result { + fn new_pg_command(&self, command: impl AsRef) -> anyhow::Result { let path = self.pg_bin_dir()?.join(command); ensure!(path.exists(), "Command {:?} does not exist", path); let mut cmd = Command::new(path); @@ -65,7 +64,7 @@ impl Conf { Ok(cmd) } - pub fn initdb(&self) -> Result<()> { + pub fn initdb(&self) -> anyhow::Result<()> { if let Some(parent) = self.datadir.parent() { info!("Pre-creating parent directory {:?}", parent); // Tests may be run concurrently and there may be a race to create `test_output/`. @@ -79,7 +78,7 @@ impl Conf { let output = self .new_pg_command("initdb")? .arg("-D") - .arg(self.datadir.as_os_str()) + .arg(&self.datadir) .args(["-U", "postgres", "--no-instructions", "--no-sync"]) .output()?; debug!("initdb output: {:?}", output); @@ -92,7 +91,7 @@ impl Conf { Ok(()) } - pub fn start_server(&self) -> Result { + pub fn start_server(&self) -> anyhow::Result { info!("Starting Postgres server in {:?}", self.datadir); let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols) let unix_socket_dir_path = unix_socket_dir.path().to_owned(); @@ -100,9 +99,9 @@ impl Conf { .new_pg_command("postgres")? .args(["-c", "listen_addresses="]) .arg("-k") - .arg(unix_socket_dir_path.as_os_str()) + .arg(&unix_socket_dir_path) .arg("-D") - .arg(self.datadir.as_os_str()) + .arg(&self.datadir) .args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg])) .spawn()?; let server = PostgresServer { @@ -123,7 +122,7 @@ impl Conf { &self, first_segment_name: &str, last_segment_name: &str, - ) -> Result { + ) -> anyhow::Result { let first_segment_file = self.datadir.join(first_segment_name); let last_segment_file = self.datadir.join(last_segment_name); info!( @@ -133,10 +132,7 @@ impl Conf { ); let output = self .new_pg_command("pg_waldump")? - .args([ - &first_segment_file.as_os_str(), - &last_segment_file.as_os_str(), - ]) + .args([&first_segment_file, &last_segment_file]) .output()?; debug!("waldump output: {:?}", output); Ok(output) @@ -144,10 +140,9 @@ impl Conf { } impl PostgresServer { - pub fn connect_with_timeout(&self) -> Result { + pub fn connect_with_timeout(&self) -> anyhow::Result { let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap(); while Instant::now() < retry_until { - use std::result::Result::Ok; if let Ok(client) = self.client_config.connect(postgres::NoTls) { return Ok(client); } @@ -164,7 +159,6 @@ impl PostgresServer { impl Drop for PostgresServer { fn drop(&mut self) { - use std::result::Result::Ok; match self.process.try_wait() { Ok(Some(_)) => return, Ok(None) => { @@ -179,12 +173,12 @@ impl Drop for PostgresServer { } pub trait PostgresClientExt: postgres::GenericClient { - fn pg_current_wal_insert_lsn(&mut self) -> Result { + fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result { Ok(self .query_one("SELECT pg_current_wal_insert_lsn()", &[])? .get(0)) } - fn pg_current_wal_flush_lsn(&mut self) -> Result { + fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result { Ok(self .query_one("SELECT pg_current_wal_flush_lsn()", &[])? .get(0)) @@ -193,7 +187,7 @@ pub trait PostgresClientExt: postgres::GenericClient { impl PostgresClientExt for C {} -pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> Result<()> { +pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> { client.execute("create extension if not exists neon_test_utils", &[])?; let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0); @@ -227,13 +221,13 @@ pub trait Crafter { /// * A vector of some valid "interesting" intermediate LSNs which one may start reading from. /// May include or exclude Lsn(0) and the end-of-wal. /// * The expected end-of-wal LSN. - fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec, PgLsn)>; + fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec, PgLsn)>; } fn craft_internal( client: &mut C, - f: impl Fn(&mut C, PgLsn) -> Result<(Vec, Option)>, -) -> Result<(Vec, PgLsn)> { + f: impl Fn(&mut C, PgLsn) -> anyhow::Result<(Vec, Option)>, +) -> anyhow::Result<(Vec, PgLsn)> { ensure_server_config(client)?; let initial_lsn = client.pg_current_wal_insert_lsn()?; @@ -265,7 +259,7 @@ fn craft_internal( pub struct Simple; impl Crafter for Simple { const NAME: &'static str = "simple"; - fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec, PgLsn)> { + fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec, PgLsn)> { craft_internal(client, |client, _| { client.execute("CREATE table t(x int)", &[])?; Ok((Vec::new(), None)) @@ -276,7 +270,7 @@ impl Crafter for Simple { pub struct LastWalRecordXlogSwitch; impl Crafter for LastWalRecordXlogSwitch { const NAME: &'static str = "last_wal_record_xlog_switch"; - fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec, PgLsn)> { + fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec, PgLsn)> { // Do not use generate_internal because here we end up with flush_lsn exactly on // the segment boundary and insert_lsn after the initial page header, which is unusual. ensure_server_config(client)?; @@ -298,7 +292,7 @@ impl Crafter for LastWalRecordXlogSwitch { pub struct LastWalRecordXlogSwitchEndsOnPageBoundary; impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary { const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary"; - fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec, PgLsn)> { + fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec, PgLsn)> { // Do not use generate_internal because here we end up with flush_lsn exactly on // the segment boundary and insert_lsn after the initial page header, which is unusual. ensure_server_config(client)?; @@ -365,7 +359,7 @@ impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary { fn craft_single_logical_message( client: &mut impl postgres::GenericClient, transactional: bool, -) -> Result<(Vec, PgLsn)> { +) -> anyhow::Result<(Vec, PgLsn)> { craft_internal(client, |client, initial_lsn| { ensure!( initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024), @@ -407,7 +401,7 @@ fn craft_single_logical_message( pub struct WalRecordCrossingSegmentFollowedBySmallOne; impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne { const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one"; - fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec, PgLsn)> { + fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec, PgLsn)> { craft_single_logical_message(client, true) } } @@ -415,7 +409,7 @@ impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne { pub struct LastWalRecordCrossingSegment; impl Crafter for LastWalRecordCrossingSegment { const NAME: &'static str = "last_wal_record_crossing_segment"; - fn craft(client: &mut impl postgres::GenericClient) -> Result<(Vec, PgLsn)> { + fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<(Vec, PgLsn)> { craft_single_logical_message(client, false) } }