mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Verbosity in this case is good when reading the code. Short options are better when operating in an interactive shell. Signed-off-by: Tristan Partin <tristan@neon.tech>
450 lines
17 KiB
Rust
450 lines
17 KiB
Rust
use anyhow::{bail, ensure};
|
|
use camino_tempfile::{tempdir, Utf8TempDir};
|
|
use log::*;
|
|
use postgres::types::PgLsn;
|
|
use postgres::Client;
|
|
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
|
|
use postgres_ffi::{
|
|
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
|
|
};
|
|
use std::ffi::OsStr;
|
|
use std::path::{Path, PathBuf};
|
|
use std::process::Command;
|
|
use std::time::{Duration, Instant};
|
|
|
|
macro_rules! xlog_utils_test {
|
|
($version:ident) => {
|
|
#[path = "."]
|
|
mod $version {
|
|
#[allow(unused_imports)]
|
|
pub use postgres_ffi::$version::wal_craft_test_export::*;
|
|
#[allow(clippy::duplicate_mod)]
|
|
#[cfg(test)]
|
|
mod xlog_utils_test;
|
|
}
|
|
};
|
|
}
|
|
|
|
postgres_ffi::for_all_postgres_versions! { xlog_utils_test }
|
|
|
|
pub struct Conf {
|
|
pub pg_version: u32,
|
|
pub pg_distrib_dir: PathBuf,
|
|
pub datadir: PathBuf,
|
|
}
|
|
|
|
pub struct PostgresServer {
|
|
process: std::process::Child,
|
|
_unix_socket_dir: Utf8TempDir,
|
|
client_config: postgres::Config,
|
|
}
|
|
|
|
pub static REQUIRED_POSTGRES_CONFIG: [&str; 4] = [
|
|
"wal_keep_size=50MB", // Ensure old WAL is not removed
|
|
"shared_preload_libraries=neon", // can only be loaded at startup
|
|
// Disable background processes as much as possible
|
|
"wal_writer_delay=10s",
|
|
"autovacuum=off",
|
|
];
|
|
|
|
impl Conf {
|
|
pub fn pg_distrib_dir(&self) -> anyhow::Result<PathBuf> {
|
|
let path = self.pg_distrib_dir.clone();
|
|
|
|
#[allow(clippy::manual_range_patterns)]
|
|
match self.pg_version {
|
|
14 | 15 | 16 | 17 => Ok(path.join(format!("v{}", self.pg_version))),
|
|
_ => bail!("Unsupported postgres version: {}", self.pg_version),
|
|
}
|
|
}
|
|
|
|
fn pg_bin_dir(&self) -> anyhow::Result<PathBuf> {
|
|
Ok(self.pg_distrib_dir()?.join("bin"))
|
|
}
|
|
|
|
fn pg_lib_dir(&self) -> anyhow::Result<PathBuf> {
|
|
Ok(self.pg_distrib_dir()?.join("lib"))
|
|
}
|
|
|
|
pub fn wal_dir(&self) -> PathBuf {
|
|
self.datadir.join("pg_wal")
|
|
}
|
|
|
|
fn new_pg_command(&self, command: impl AsRef<Path>) -> anyhow::Result<Command> {
|
|
let path = self.pg_bin_dir()?.join(command);
|
|
ensure!(path.exists(), "Command {:?} does not exist", path);
|
|
let mut cmd = Command::new(path);
|
|
cmd.env_clear()
|
|
.env("LD_LIBRARY_PATH", self.pg_lib_dir()?)
|
|
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir()?);
|
|
Ok(cmd)
|
|
}
|
|
|
|
pub fn initdb(&self) -> anyhow::Result<()> {
|
|
if let Some(parent) = self.datadir.parent() {
|
|
info!("Pre-creating parent directory {:?}", parent);
|
|
// Tests may be run concurrently and there may be a race to create `test_output/`.
|
|
// std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
|
|
std::fs::create_dir_all(parent)?;
|
|
}
|
|
info!(
|
|
"Running initdb in {:?} with user \"postgres\"",
|
|
self.datadir
|
|
);
|
|
let output = self
|
|
.new_pg_command("initdb")?
|
|
.arg("--pgdata")
|
|
.arg(&self.datadir)
|
|
.args(["--username", "postgres", "--no-instructions", "--no-sync"])
|
|
.output()?;
|
|
debug!("initdb output: {:?}", output);
|
|
ensure!(
|
|
output.status.success(),
|
|
"initdb failed, stdout and stderr follow:\n{}{}",
|
|
String::from_utf8_lossy(&output.stdout),
|
|
String::from_utf8_lossy(&output.stderr),
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
pub fn start_server(&self) -> anyhow::Result<PostgresServer> {
|
|
info!("Starting Postgres server in {:?}", self.datadir);
|
|
let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
|
|
let unix_socket_dir_path = unix_socket_dir.path().to_owned();
|
|
let server_process = self
|
|
.new_pg_command("postgres")?
|
|
.args(["-c", "listen_addresses="])
|
|
.arg("-k")
|
|
.arg(&unix_socket_dir_path)
|
|
.arg("-D")
|
|
.arg(&self.datadir)
|
|
.args(REQUIRED_POSTGRES_CONFIG.iter().flat_map(|cfg| ["-c", cfg]))
|
|
.spawn()?;
|
|
let server = PostgresServer {
|
|
process: server_process,
|
|
_unix_socket_dir: unix_socket_dir,
|
|
client_config: {
|
|
let mut c = postgres::Config::new();
|
|
c.host_path(&unix_socket_dir_path);
|
|
c.user("postgres");
|
|
c.connect_timeout(Duration::from_millis(10000));
|
|
c
|
|
},
|
|
};
|
|
Ok(server)
|
|
}
|
|
|
|
pub fn pg_waldump(
|
|
&self,
|
|
first_segment_name: &OsStr,
|
|
last_segment_name: &OsStr,
|
|
) -> anyhow::Result<std::process::Output> {
|
|
let first_segment_file = self.datadir.join(first_segment_name);
|
|
let last_segment_file = self.datadir.join(last_segment_name);
|
|
info!(
|
|
"Running pg_waldump for {} .. {}",
|
|
first_segment_file.display(),
|
|
last_segment_file.display()
|
|
);
|
|
let output = self
|
|
.new_pg_command("pg_waldump")?
|
|
.args([&first_segment_file, &last_segment_file])
|
|
.output()?;
|
|
debug!("waldump output: {:?}", output);
|
|
Ok(output)
|
|
}
|
|
}
|
|
|
|
impl PostgresServer {
|
|
pub fn connect_with_timeout(&self) -> anyhow::Result<Client> {
|
|
let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
|
|
while Instant::now() < retry_until {
|
|
if let Ok(client) = self.client_config.connect(postgres::NoTls) {
|
|
return Ok(client);
|
|
}
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
}
|
|
bail!("Connection timed out");
|
|
}
|
|
|
|
pub fn kill(mut self) {
|
|
self.process.kill().unwrap();
|
|
self.process.wait().unwrap();
|
|
}
|
|
}
|
|
|
|
impl Drop for PostgresServer {
|
|
fn drop(&mut self) {
|
|
match self.process.try_wait() {
|
|
Ok(Some(_)) => return,
|
|
Ok(None) => {
|
|
warn!("Server was not terminated, will be killed");
|
|
}
|
|
Err(e) => {
|
|
error!("Unable to get status of the server: {}, will be killed", e);
|
|
}
|
|
}
|
|
let _ = self.process.kill();
|
|
}
|
|
}
|
|
|
|
pub trait PostgresClientExt: postgres::GenericClient {
|
|
fn pg_current_wal_insert_lsn(&mut self) -> anyhow::Result<PgLsn> {
|
|
Ok(self
|
|
.query_one("SELECT pg_current_wal_insert_lsn()", &[])?
|
|
.get(0))
|
|
}
|
|
fn pg_current_wal_flush_lsn(&mut self) -> anyhow::Result<PgLsn> {
|
|
Ok(self
|
|
.query_one("SELECT pg_current_wal_flush_lsn()", &[])?
|
|
.get(0))
|
|
}
|
|
}
|
|
|
|
impl<C: postgres::GenericClient> PostgresClientExt for C {}
|
|
|
|
pub fn ensure_server_config(client: &mut impl postgres::GenericClient) -> anyhow::Result<()> {
|
|
client.execute("create extension if not exists neon_test_utils", &[])?;
|
|
|
|
let wal_keep_size: String = client.query_one("SHOW wal_keep_size", &[])?.get(0);
|
|
ensure!(wal_keep_size == "50MB");
|
|
let wal_writer_delay: String = client.query_one("SHOW wal_writer_delay", &[])?.get(0);
|
|
ensure!(wal_writer_delay == "10s");
|
|
let autovacuum: String = client.query_one("SHOW autovacuum", &[])?.get(0);
|
|
ensure!(autovacuum == "off");
|
|
|
|
let wal_segment_size = client.query_one(
|
|
"select cast(setting as bigint) as setting, unit \
|
|
from pg_settings where name = 'wal_segment_size'",
|
|
&[],
|
|
)?;
|
|
ensure!(
|
|
wal_segment_size.get::<_, String>("unit") == "B",
|
|
"Unexpected wal_segment_size unit"
|
|
);
|
|
ensure!(
|
|
wal_segment_size.get::<_, i64>("setting") == WAL_SEGMENT_SIZE as i64,
|
|
"Unexpected wal_segment_size in bytes"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub trait Crafter {
|
|
const NAME: &'static str;
|
|
|
|
/// Generates WAL using the client `client`. Returns a vector of some valid
|
|
/// "interesting" intermediate LSNs which one may start reading from.
|
|
/// test_end_of_wal uses this to check various starting points.
|
|
///
|
|
/// Note that postgres is generally keen about writing some WAL. While we
|
|
/// try to disable it (autovacuum, big wal_writer_delay, etc) it is always
|
|
/// possible, e.g. xl_running_xacts are dumped each 15s. So checks about
|
|
/// stable WAL end would be flaky unless postgres is shut down. For this
|
|
/// reason returning potential end of WAL here is pointless. Most of the
|
|
/// time this doesn't happen though, so it is reasonable to create needed
|
|
/// WAL structure and immediately kill postgres like test_end_of_wal does.
|
|
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>>;
|
|
}
|
|
|
|
/// Wraps some WAL craft function, providing current LSN to it before the
|
|
/// insertion and flushing WAL afterwards. Also pushes initial LSN to the
|
|
/// result.
|
|
fn craft_internal<C: postgres::GenericClient>(
|
|
client: &mut C,
|
|
f: impl Fn(&mut C, PgLsn) -> anyhow::Result<Vec<PgLsn>>,
|
|
) -> anyhow::Result<Vec<PgLsn>> {
|
|
ensure_server_config(client)?;
|
|
|
|
let initial_lsn = client.pg_current_wal_insert_lsn()?;
|
|
info!("LSN initial = {}", initial_lsn);
|
|
|
|
let mut intermediate_lsns = f(client, initial_lsn)?;
|
|
if !intermediate_lsns.starts_with(&[initial_lsn]) {
|
|
intermediate_lsns.insert(0, initial_lsn);
|
|
}
|
|
|
|
// Some records may be not flushed, e.g. non-transactional logical messages. Flush now.
|
|
//
|
|
// If the previous WAL record ended exactly at page boundary, pg_current_wal_insert_lsn
|
|
// returns the position just after the page header on the next page. That's where the next
|
|
// record will be inserted. But the page header hasn't actually been written to the WAL
|
|
// yet, and if you try to flush it, you get a "request to flush past end of generated WAL"
|
|
// error. Because of that, if the insert location is just after a page header, back off to
|
|
// previous page boundary.
|
|
let mut lsn = u64::from(client.pg_current_wal_insert_lsn()?);
|
|
if lsn % WAL_SEGMENT_SIZE as u64 == XLOG_SIZE_OF_XLOG_LONG_PHD as u64 {
|
|
lsn -= XLOG_SIZE_OF_XLOG_LONG_PHD as u64;
|
|
} else if lsn % XLOG_BLCKSZ as u64 == XLOG_SIZE_OF_XLOG_SHORT_PHD as u64 {
|
|
lsn -= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64;
|
|
}
|
|
client.execute("select neon_xlogflush($1)", &[&PgLsn::from(lsn)])?;
|
|
Ok(intermediate_lsns)
|
|
}
|
|
|
|
pub struct Simple;
|
|
impl Crafter for Simple {
|
|
const NAME: &'static str = "simple";
|
|
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
|
craft_internal(client, |client, _| {
|
|
client.execute("CREATE table t(x int)", &[])?;
|
|
Ok(Vec::new())
|
|
})
|
|
}
|
|
}
|
|
|
|
pub struct LastWalRecordXlogSwitch;
|
|
impl Crafter for LastWalRecordXlogSwitch {
|
|
const NAME: &'static str = "last_wal_record_xlog_switch";
|
|
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
|
// Do not use craft_internal because here we end up with flush_lsn exactly on
|
|
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
|
ensure_server_config(client)?;
|
|
|
|
client.execute("CREATE table t(x int)", &[])?;
|
|
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
|
// pg_switch_wal returns end of last record of the switched segment,
|
|
// i.e. end of SWITCH itself.
|
|
let xlog_switch_record_end: PgLsn = client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
|
let before_xlog_switch_u64 = u64::from(before_xlog_switch);
|
|
let next_segment = PgLsn::from(
|
|
before_xlog_switch_u64 - (before_xlog_switch_u64 % WAL_SEGMENT_SIZE as u64)
|
|
+ WAL_SEGMENT_SIZE as u64,
|
|
);
|
|
ensure!(
|
|
xlog_switch_record_end <= next_segment,
|
|
"XLOG_SWITCH record ended after the expected segment boundary: {} > {}",
|
|
xlog_switch_record_end,
|
|
next_segment
|
|
);
|
|
Ok(vec![before_xlog_switch, xlog_switch_record_end])
|
|
}
|
|
}
|
|
|
|
pub struct LastWalRecordXlogSwitchEndsOnPageBoundary;
|
|
/// Craft xlog SWITCH record ending at page boundary.
|
|
impl Crafter for LastWalRecordXlogSwitchEndsOnPageBoundary {
|
|
const NAME: &'static str = "last_wal_record_xlog_switch_ends_on_page_boundary";
|
|
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
|
// Do not use generate_internal because here we end up with flush_lsn exactly on
|
|
// the segment boundary and insert_lsn after the initial page header, which is unusual.
|
|
ensure_server_config(client)?;
|
|
|
|
client.execute("CREATE table t(x int)", &[])?;
|
|
|
|
// Add padding so the XLOG_SWITCH record ends exactly on XLOG_BLCKSZ boundary. We
|
|
// will use carefully-sized logical messages to advance WAL insert location such
|
|
// that there is just enough space on the page for the XLOG_SWITCH record.
|
|
loop {
|
|
// We start with measuring how much WAL it takes for one logical message,
|
|
// considering all alignments and headers.
|
|
let before_lsn = client.pg_current_wal_insert_lsn()?;
|
|
client.execute(
|
|
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', 10))",
|
|
&[],
|
|
)?;
|
|
let after_lsn = client.pg_current_wal_insert_lsn()?;
|
|
|
|
// Did the record cross a page boundary? If it did, start over. Crossing a
|
|
// page boundary adds to the apparent size of the record because of the page
|
|
// header, which throws off the calculation.
|
|
if u64::from(before_lsn) / XLOG_BLCKSZ as u64
|
|
!= u64::from(after_lsn) / XLOG_BLCKSZ as u64
|
|
{
|
|
continue;
|
|
}
|
|
// base_size is the size of a logical message without the payload
|
|
let base_size = u64::from(after_lsn) - u64::from(before_lsn) - 10;
|
|
|
|
// Is there enough space on the page for another logical message and an
|
|
// XLOG_SWITCH? If not, start over.
|
|
let page_remain = XLOG_BLCKSZ as u64 - u64::from(after_lsn) % XLOG_BLCKSZ as u64;
|
|
if page_remain < base_size + XLOG_SIZE_OF_XLOG_RECORD as u64 {
|
|
continue;
|
|
}
|
|
|
|
// We will write another logical message, such that after the logical message
|
|
// record, there will be space for exactly one XLOG_SWITCH. How large should
|
|
// the logical message's payload be? An XLOG_SWITCH record has no data => its
|
|
// size is exactly XLOG_SIZE_OF_XLOG_RECORD.
|
|
let repeats = page_remain - base_size - XLOG_SIZE_OF_XLOG_RECORD as u64;
|
|
|
|
client.execute(
|
|
"SELECT pg_logical_emit_message(false, 'swch', REPEAT('a', $1))",
|
|
&[&(repeats as i32)],
|
|
)?;
|
|
info!(
|
|
"current_wal_insert_lsn={}, XLOG_SIZE_OF_XLOG_RECORD={}",
|
|
client.pg_current_wal_insert_lsn()?,
|
|
XLOG_SIZE_OF_XLOG_RECORD
|
|
);
|
|
|
|
// Emit the XLOG_SWITCH
|
|
let before_xlog_switch = client.pg_current_wal_insert_lsn()?;
|
|
let xlog_switch_record_end: PgLsn =
|
|
client.query_one("SELECT pg_switch_wal()", &[])?.get(0);
|
|
|
|
if u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
|
|
!= XLOG_SIZE_OF_XLOG_SHORT_PHD
|
|
{
|
|
warn!(
|
|
"XLOG_SWITCH message ended not on page boundary: {}, offset = {}, repeating",
|
|
xlog_switch_record_end,
|
|
u64::from(xlog_switch_record_end) as usize % XLOG_BLCKSZ
|
|
);
|
|
continue;
|
|
}
|
|
return Ok(vec![before_xlog_switch, xlog_switch_record_end]);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Write ~16MB logical message; it should cross WAL segment.
|
|
fn craft_seg_size_logical_message(
|
|
client: &mut impl postgres::GenericClient,
|
|
transactional: bool,
|
|
) -> anyhow::Result<Vec<PgLsn>> {
|
|
craft_internal(client, |client, initial_lsn| {
|
|
ensure!(
|
|
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
|
|
"Initial LSN is too far in the future"
|
|
);
|
|
|
|
let message_lsn: PgLsn = client
|
|
.query_one(
|
|
"select pg_logical_emit_message($1, 'big-16mb-msg', \
|
|
concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
|
|
&[&transactional],
|
|
)?
|
|
.get("message_lsn");
|
|
ensure!(
|
|
message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
|
|
"Logical message did not cross the segment boundary"
|
|
);
|
|
ensure!(
|
|
message_lsn < PgLsn::from(0x0400_0000),
|
|
"Logical message crossed two segments"
|
|
);
|
|
|
|
Ok(vec![message_lsn])
|
|
})
|
|
}
|
|
|
|
pub struct WalRecordCrossingSegmentFollowedBySmallOne;
|
|
impl Crafter for WalRecordCrossingSegmentFollowedBySmallOne {
|
|
const NAME: &'static str = "wal_record_crossing_segment_followed_by_small_one";
|
|
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
|
// Transactional message crossing WAL segment will be followed by small
|
|
// commit record.
|
|
craft_seg_size_logical_message(client, true)
|
|
}
|
|
}
|
|
|
|
pub struct LastWalRecordCrossingSegment;
|
|
impl Crafter for LastWalRecordCrossingSegment {
|
|
const NAME: &'static str = "last_wal_record_crossing_segment";
|
|
fn craft(client: &mut impl postgres::GenericClient) -> anyhow::Result<Vec<PgLsn>> {
|
|
craft_seg_size_logical_message(client, false)
|
|
}
|
|
}
|