mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
We need checksums to verify data integrity, when we read it from untrusted place (e.g. local disk) or via untrusted communication channel (e.g. network). At the same time, we trust pageserver <-> redo process communication channel, as it is just a pipe. Here we enable calculation of data checksums in the wal redo process and when we extract FPI during WAL injestion. Compute node (Postgres) will verify checksum of every page after receiving it back from pageserver. So it is pretty similar to how vanilla Postgres checks them. There are two other places where we should verify checksums to detect data corruption earlier: - when we receive WAL records from safekeepers (already implemented, see: WalStreamDecoder::poll_decode) - when we write layer files to disk and read back in memory from local disk or S3
287 lines
9.6 KiB
Rust
287 lines
9.6 KiB
Rust
use anyhow::*;
|
|
use core::time::Duration;
|
|
use log::*;
|
|
use postgres::types::PgLsn;
|
|
use postgres::Client;
|
|
use std::cmp::Ordering;
|
|
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
use std::process::{Command, Stdio};
|
|
use std::time::Instant;
|
|
use tempfile::{tempdir, TempDir};
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
pub struct Conf {
|
|
pub pg_distrib_dir: PathBuf,
|
|
pub datadir: PathBuf,
|
|
}
|
|
|
|
pub struct PostgresServer {
|
|
process: std::process::Child,
|
|
_unix_socket_dir: TempDir,
|
|
client_config: postgres::Config,
|
|
}
|
|
|
|
impl Conf {
|
|
fn pg_bin_dir(&self) -> PathBuf {
|
|
self.pg_distrib_dir.join("bin")
|
|
}
|
|
|
|
fn pg_lib_dir(&self) -> PathBuf {
|
|
self.pg_distrib_dir.join("lib")
|
|
}
|
|
|
|
fn new_pg_command(&self, command: impl AsRef<Path>) -> Result<Command> {
|
|
let path = self.pg_bin_dir().join(command);
|
|
ensure!(path.exists(), "Command {:?} does not exist", path);
|
|
let mut cmd = Command::new(path);
|
|
cmd.env_clear()
|
|
.env("LD_LIBRARY_PATH", self.pg_lib_dir())
|
|
.env("DYLD_LIBRARY_PATH", self.pg_lib_dir());
|
|
Ok(cmd)
|
|
}
|
|
|
|
pub fn initdb(&self) -> Result<()> {
|
|
if let Some(parent) = self.datadir.parent() {
|
|
info!("Pre-creating parent directory {:?}", parent);
|
|
// Tests may be run concurrently and there may be a race to create `test_output/`.
|
|
// std::fs::create_dir_all is guaranteed to have no races with another thread creating directories.
|
|
std::fs::create_dir_all(parent)?;
|
|
}
|
|
info!(
|
|
"Running initdb in {:?} with user \"postgres\"",
|
|
self.datadir
|
|
);
|
|
let output = self
|
|
.new_pg_command("initdb")?
|
|
.arg("-D")
|
|
.arg("--data-checksums")
|
|
.arg(self.datadir.as_os_str())
|
|
.args(&["-U", "postgres", "--no-instructions", "--no-sync"])
|
|
.output()?;
|
|
debug!("initdb output: {:?}", output);
|
|
ensure!(
|
|
output.status.success(),
|
|
"initdb failed, stdout and stderr follow:\n{}{}",
|
|
String::from_utf8_lossy(&output.stdout),
|
|
String::from_utf8_lossy(&output.stderr),
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
pub fn start_server(&self) -> Result<PostgresServer> {
|
|
info!("Starting Postgres server in {:?}", self.datadir);
|
|
let log_file = fs::File::create(self.datadir.join("pg.log")).with_context(|| {
|
|
format!(
|
|
"Failed to create pg.log file in directory {}",
|
|
self.datadir.display()
|
|
)
|
|
})?;
|
|
let unix_socket_dir = tempdir()?; // We need a directory with a short name for Unix socket (up to 108 symbols)
|
|
let unix_socket_dir_path = unix_socket_dir.path().to_owned();
|
|
let server_process = self
|
|
.new_pg_command("postgres")?
|
|
.args(&["-c", "listen_addresses="])
|
|
.arg("-k")
|
|
.arg(unix_socket_dir_path.as_os_str())
|
|
.arg("-D")
|
|
.arg(self.datadir.as_os_str())
|
|
.args(&["-c", "wal_keep_size=50MB"]) // Ensure old WAL is not removed
|
|
.args(&["-c", "logging_collector=on"]) // stderr will mess up with tests output
|
|
.args(&["-c", "shared_preload_libraries=neon"]) // can only be loaded at startup
|
|
// Disable background processes as much as possible
|
|
.args(&["-c", "wal_writer_delay=10s"])
|
|
.args(&["-c", "autovacuum=off"])
|
|
.stderr(Stdio::from(log_file))
|
|
.spawn()?;
|
|
let server = PostgresServer {
|
|
process: server_process,
|
|
_unix_socket_dir: unix_socket_dir,
|
|
client_config: {
|
|
let mut c = postgres::Config::new();
|
|
c.host_path(&unix_socket_dir_path);
|
|
c.user("postgres");
|
|
c.connect_timeout(Duration::from_millis(1000));
|
|
c
|
|
},
|
|
};
|
|
Ok(server)
|
|
}
|
|
|
|
pub fn pg_waldump(
|
|
&self,
|
|
first_segment_name: &str,
|
|
last_segment_name: &str,
|
|
) -> Result<std::process::Output> {
|
|
let first_segment_file = self.datadir.join(first_segment_name);
|
|
let last_segment_file = self.datadir.join(last_segment_name);
|
|
info!(
|
|
"Running pg_waldump for {} .. {}",
|
|
first_segment_file.display(),
|
|
last_segment_file.display()
|
|
);
|
|
let output = self
|
|
.new_pg_command("pg_waldump")?
|
|
.args(&[
|
|
&first_segment_file.as_os_str(),
|
|
&last_segment_file.as_os_str(),
|
|
])
|
|
.output()?;
|
|
debug!("waldump output: {:?}", output);
|
|
Ok(output)
|
|
}
|
|
}
|
|
|
|
impl PostgresServer {
|
|
pub fn connect_with_timeout(&self) -> Result<Client> {
|
|
let retry_until = Instant::now() + *self.client_config.get_connect_timeout().unwrap();
|
|
while Instant::now() < retry_until {
|
|
use std::result::Result::Ok;
|
|
if let Ok(client) = self.client_config.connect(postgres::NoTls) {
|
|
return Ok(client);
|
|
}
|
|
std::thread::sleep(Duration::from_millis(100));
|
|
}
|
|
bail!("Connection timed out");
|
|
}
|
|
|
|
pub fn kill(&mut self) {
|
|
self.process.kill().unwrap();
|
|
self.process.wait().unwrap();
|
|
}
|
|
}
|
|
|
|
impl Drop for PostgresServer {
|
|
fn drop(&mut self) {
|
|
use std::result::Result::Ok;
|
|
match self.process.try_wait() {
|
|
Ok(Some(_)) => return,
|
|
Ok(None) => {
|
|
warn!("Server was not terminated, will be killed");
|
|
}
|
|
Err(e) => {
|
|
error!("Unable to get status of the server: {}, will be killed", e);
|
|
}
|
|
}
|
|
let _ = self.process.kill();
|
|
}
|
|
}
|
|
|
|
pub trait PostgresClientExt: postgres::GenericClient {
|
|
fn pg_current_wal_insert_lsn(&mut self) -> Result<PgLsn> {
|
|
Ok(self
|
|
.query_one("SELECT pg_current_wal_insert_lsn()", &[])?
|
|
.get(0))
|
|
}
|
|
fn pg_current_wal_flush_lsn(&mut self) -> Result<PgLsn> {
|
|
Ok(self
|
|
.query_one("SELECT pg_current_wal_flush_lsn()", &[])?
|
|
.get(0))
|
|
}
|
|
}
|
|
|
|
impl<C: postgres::GenericClient> PostgresClientExt for C {}
|
|
|
|
fn generate_internal<C: postgres::GenericClient>(
|
|
client: &mut C,
|
|
f: impl Fn(&mut C, PgLsn) -> Result<Option<PgLsn>>,
|
|
) -> Result<PgLsn> {
|
|
client.execute("create extension if not exists neon_test_utils", &[])?;
|
|
|
|
let wal_segment_size = client.query_one(
|
|
"select cast(setting as bigint) as setting, unit \
|
|
from pg_settings where name = 'wal_segment_size'",
|
|
&[],
|
|
)?;
|
|
ensure!(
|
|
wal_segment_size.get::<_, String>("unit") == "B",
|
|
"Unexpected wal_segment_size unit"
|
|
);
|
|
ensure!(
|
|
wal_segment_size.get::<_, i64>("setting") == 16 * 1024 * 1024,
|
|
"Unexpected wal_segment_size in bytes"
|
|
);
|
|
|
|
let initial_lsn = client.pg_current_wal_insert_lsn()?;
|
|
info!("LSN initial = {}", initial_lsn);
|
|
|
|
let last_lsn = match f(client, initial_lsn)? {
|
|
None => client.pg_current_wal_insert_lsn()?,
|
|
Some(last_lsn) => match last_lsn.cmp(&client.pg_current_wal_insert_lsn()?) {
|
|
Ordering::Less => bail!("Some records were inserted after the generated WAL"),
|
|
Ordering::Equal => last_lsn,
|
|
Ordering::Greater => bail!("Reported LSN is greater than insert_lsn"),
|
|
},
|
|
};
|
|
|
|
// Some records may be not flushed, e.g. non-transactional logical messages.
|
|
client.execute("select neon_xlogflush(pg_current_wal_insert_lsn())", &[])?;
|
|
match last_lsn.cmp(&client.pg_current_wal_flush_lsn()?) {
|
|
Ordering::Less => bail!("Some records were flushed after the generated WAL"),
|
|
Ordering::Equal => {}
|
|
Ordering::Greater => bail!("Reported LSN is greater than flush_lsn"),
|
|
}
|
|
Ok(last_lsn)
|
|
}
|
|
|
|
pub fn generate_simple(client: &mut impl postgres::GenericClient) -> Result<PgLsn> {
|
|
generate_internal(client, |client, _| {
|
|
client.execute("CREATE table t(x int)", &[])?;
|
|
Ok(None)
|
|
})
|
|
}
|
|
|
|
fn generate_single_logical_message(
|
|
client: &mut impl postgres::GenericClient,
|
|
transactional: bool,
|
|
) -> Result<PgLsn> {
|
|
generate_internal(client, |client, initial_lsn| {
|
|
ensure!(
|
|
initial_lsn < PgLsn::from(0x0200_0000 - 1024 * 1024),
|
|
"Initial LSN is too far in the future"
|
|
);
|
|
|
|
let message_lsn: PgLsn = client
|
|
.query_one(
|
|
"select pg_logical_emit_message($1, 'big-16mb-msg', \
|
|
concat(repeat('abcd', 16 * 256 * 1024), 'end')) as message_lsn",
|
|
&[&transactional],
|
|
)?
|
|
.get("message_lsn");
|
|
ensure!(
|
|
message_lsn > PgLsn::from(0x0200_0000 + 4 * 8192),
|
|
"Logical message did not cross the segment boundary"
|
|
);
|
|
ensure!(
|
|
message_lsn < PgLsn::from(0x0400_0000),
|
|
"Logical message crossed two segments"
|
|
);
|
|
|
|
if transactional {
|
|
// Transactional logical messages are part of a transaction, so the one above is
|
|
// followed by a small COMMIT record.
|
|
|
|
let after_message_lsn = client.pg_current_wal_insert_lsn()?;
|
|
ensure!(
|
|
message_lsn < after_message_lsn,
|
|
"No record found after the emitted message"
|
|
);
|
|
Ok(Some(after_message_lsn))
|
|
} else {
|
|
Ok(Some(message_lsn))
|
|
}
|
|
})
|
|
}
|
|
|
|
pub fn generate_wal_record_crossing_segment_followed_by_small_one(
|
|
client: &mut impl postgres::GenericClient,
|
|
) -> Result<PgLsn> {
|
|
generate_single_logical_message(client, true)
|
|
}
|
|
|
|
pub fn generate_last_wal_record_crossing_segment<C: postgres::GenericClient>(
|
|
client: &mut C,
|
|
) -> Result<PgLsn> {
|
|
generate_single_logical_message(client, false)
|
|
}
|