From 37b0236e9a48ceebb0d5cc8e0f656f13a89643e5 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Fri, 11 Jun 2021 19:26:48 +0300 Subject: [PATCH] Move wal acceptor tests to python. Includes fixtures for wal acceptors and associated setup. Nothing really new here, but surprisingly this caught some issues in walproposer. ref #182 --- .circleci/config.yml | 3 +- Cargo.lock | 14 - Cargo.toml | 1 - control_plane/src/compute.rs | 45 -- integration_tests/.gitignore | 1 - integration_tests/Cargo.toml | 18 - integration_tests/src/lib.rs | 386 ------------------ integration_tests/tests/test_wal_acceptor.rs | 330 --------------- test_runner/batch_others/test_config.py | 2 +- test_runner/batch_others/test_twophase.py | 5 +- test_runner/batch_others/test_wal_acceptor.py | 201 +++++++++ .../batch_pg_regress/test_isolation.py | 2 +- test_runner/fixtures/zenith_fixtures.py | 181 +++++++- 13 files changed, 379 insertions(+), 810 deletions(-) delete mode 100644 integration_tests/.gitignore delete mode 100644 integration_tests/Cargo.toml delete mode 100644 integration_tests/src/lib.rs delete mode 100644 integration_tests/tests/test_wal_acceptor.rs create mode 100644 test_runner/batch_others/test_wal_acceptor.py diff --git a/.circleci/config.yml b/.circleci/config.yml index 9c68256bd2..e430dde192 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -119,8 +119,7 @@ jobs: - target # Run rust unit tests - # FIXME: remove -p zenith_utils once integration tests are moved to python - - run: cargo test -p zenith_utils + - run: cargo test # Install the rust binaries, for use by test jobs # `--locked` is required; otherwise, `cargo install` will ignore Cargo.lock. diff --git a/Cargo.lock b/Cargo.lock index 6fa7f1b3c3..6a130d30b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,20 +786,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "integration_tests" -version = "0.1.0" -dependencies = [ - "anyhow", - "control_plane", - "lazy_static", - "nix", - "pageserver", - "postgres", - "rand", - "walkeeper", -] - [[package]] name = "ipnet" version = "2.3.0" diff --git a/Cargo.toml b/Cargo.toml index 8a9230c159..c4f2c21c11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,5 @@ [workspace] members = [ - "integration_tests", "pageserver", "walkeeper", "zenith", diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index fd41b7a946..eeba29d82d 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -97,51 +97,6 @@ impl ComputeControlPlane { Ok(node) } - pub fn new_test_node(&mut self, branch_name: &str) -> Arc { - let timeline_id = self - .pageserver - .branch_get_by_name(branch_name) - .expect("failed to get timeline_id") - .timeline_id; - - let node = self.new_from_page_server(true, timeline_id, branch_name); - let node = node.unwrap(); - - // Configure the node to stream WAL directly to the pageserver - node.append_conf( - "postgresql.conf", - format!( - "shared_preload_libraries = zenith\n\ - zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping - node.connstr() - ) - .as_str(), - ) - .unwrap(); - - node - } - - pub fn new_test_master_node(&mut self, branch_name: &str) -> Arc { - let timeline_id = self - .pageserver - .branch_get_by_name(branch_name) - .expect("failed to get timeline_id") - .timeline_id; - - let node = self - .new_from_page_server(true, timeline_id, branch_name) - .unwrap(); - - node.append_conf( - "postgresql.conf", - "synchronous_standby_names = 'walproposer'\n", - ) - .unwrap(); - - node - } - pub fn new_node(&mut self, branch_name: &str) -> Result> { let timeline_id = self.pageserver.branch_get_by_name(branch_name)?.timeline_id; diff --git a/integration_tests/.gitignore b/integration_tests/.gitignore deleted file mode 100644 index 80006e4280..0000000000 --- a/integration_tests/.gitignore +++ /dev/null @@ -1 +0,0 @@ -tmp_check/ \ No newline at end of file diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml deleted file mode 100644 index 53247ac5cb..0000000000 --- a/integration_tests/Cargo.toml +++ /dev/null @@ -1,18 +0,0 @@ -[package] -name = "integration_tests" -version = "0.1.0" -authors = ["Stas Kelvich "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -lazy_static = "1.4.0" -rand = "0.8.3" -anyhow = "1.0" -nix = "0.20" -postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } - -pageserver = { path = "../pageserver" } -walkeeper = { path = "../walkeeper" } -control_plane = { path = "../control_plane" } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs deleted file mode 100644 index b0dafec499..0000000000 --- a/integration_tests/src/lib.rs +++ /dev/null @@ -1,386 +0,0 @@ -use anyhow::{bail, Result}; -use nix::sys::signal::{kill, Signal}; -use nix::unistd::Pid; -use std::collections::BTreeMap; -use std::convert::TryInto; -use std::fs::{self, File}; -use std::io::Read; -use std::net::SocketAddr; -use std::path::{Path, PathBuf}; -use std::process::{Command, ExitStatus}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use control_plane::compute::PostgresNode; -use control_plane::read_pidfile; -use control_plane::{local_env::LocalEnv, storage::PageServerNode}; - -// Find the directory where the binaries were put (i.e. target/debug/) -fn cargo_bin_dir() -> PathBuf { - let mut pathbuf = std::env::current_exe().unwrap(); - - pathbuf.pop(); - if pathbuf.ends_with("deps") { - pathbuf.pop(); - } - - pathbuf -} - -// local compute env for tests -pub fn create_test_env(testname: &str) -> LocalEnv { - let base_path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_check/") - .join(testname); - - let base_path_str = base_path.to_str().unwrap(); - - // Remove remnants of old test repo - let _ = fs::remove_dir_all(&base_path); - - fs::create_dir_all(&base_path) - .unwrap_or_else(|_| panic!("could not create directory for {}", base_path_str)); - - let pgdatadirs_path = base_path.join("pgdatadirs"); - fs::create_dir(&pgdatadirs_path) - .unwrap_or_else(|_| panic!("could not create directory {:?}", pgdatadirs_path)); - - LocalEnv { - pageserver_connstring: "postgresql://127.0.0.1:64000".to_string(), - pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"), - zenith_distrib_dir: Some(cargo_bin_dir()), - base_data_dir: base_path, - remotes: BTreeMap::default(), - } -} - -// -// Collection of several example deployments useful for tests. -// -// I'm intendedly modelling storage and compute control planes as a separate entities -// as it is closer to the actual setup. -// -pub struct TestStorageControlPlane { - pub wal_acceptors: Vec, - pub pageserver: Arc, - pub test_done: AtomicBool, -} - -impl TestStorageControlPlane { - // postgres <-> page_server - // - // Initialize a new repository and configure a page server to run in it - // - pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane { - let pserver = Arc::new(PageServerNode { - env: local_env.clone(), - kill_on_exit: true, - listen_address: None, - }); - pserver.init().unwrap(); - pserver.start().unwrap(); - - TestStorageControlPlane { - wal_acceptors: Vec::new(), - pageserver: pserver, - test_done: AtomicBool::new(false), - } - } - - // postgres <-> {wal_acceptor1, wal_acceptor2, ...} - pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane { - let mut cplane = TestStorageControlPlane { - wal_acceptors: Vec::new(), - pageserver: Arc::new(PageServerNode { - env: local_env.clone(), - kill_on_exit: true, - listen_address: None, - }), - test_done: AtomicBool::new(false), - // repopath, - }; - cplane.pageserver.init().unwrap(); - cplane.pageserver.start().unwrap(); - - const WAL_ACCEPTOR_PORT: usize = 54321; - - let datadir_base = local_env.base_data_dir.join("safekeepers"); - fs::create_dir_all(&datadir_base).unwrap(); - - for i in 0..redundancy { - let wal_acceptor = WalAcceptorNode { - listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i) - .parse() - .unwrap(), - data_dir: datadir_base.join(format!("wal_acceptor_{}", i)), - env: local_env.clone(), - pass_to_pageserver: true, - }; - wal_acceptor.init(); - wal_acceptor.start(); - cplane.wal_acceptors.push(wal_acceptor); - } - cplane - } - - pub fn stop(&self) { - for wa in self.wal_acceptors.iter() { - let _ = wa.stop(); - } - self.test_done.store(true, Ordering::Relaxed); - } - - pub fn get_wal_acceptor_conn_info(&self) -> String { - self.wal_acceptors - .iter() - .map(|wa| wa.listen.to_string()) - .collect::>() - .join(",") - } - - pub fn is_running(&self) -> bool { - self.test_done.load(Ordering::Relaxed) - } -} - -impl Drop for TestStorageControlPlane { - fn drop(&mut self) { - self.stop(); - } -} - -/////////////////////////////////////////////////////////////////////////////// -// -// PostgresNodeExt -// -/////////////////////////////////////////////////////////////////////////////// - -/// -/// Testing utilities for PostgresNode type -/// -pub trait PostgresNodeExt { - fn pg_regress(&self) -> ExitStatus; - fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus; - fn open_psql(&self, db: &str) -> postgres::Client; - fn dump_log_file(&self); - fn safe_psql(&self, db: &str, sql: &str) -> Vec; -} - -impl PostgresNodeExt for PostgresNode { - fn pg_regress(&self) -> ExitStatus { - self.safe_psql("postgres", "CREATE DATABASE regression"); - - let regress_run_path = self.env.base_data_dir.join("regress"); - fs::create_dir_all(®ress_run_path).unwrap(); - fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); - std::env::set_current_dir(regress_run_path).unwrap(); - - let regress_build_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); - let regress_src_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); - - let regress_check = Command::new(regress_build_path.join("pg_regress")) - .args(&[ - "--bindir=''", - "--use-existing", - format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(), - format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), - format!( - "--schedule={}", - regress_src_path.join("parallel_schedule").to_str().unwrap() - ) - .as_str(), - format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), - ]) - .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("PGPORT", self.address.port().to_string()) - .env("PGUSER", self.whoami()) - .env("PGHOST", self.address.ip().to_string()) - .status() - .expect("pg_regress failed"); - if !regress_check.success() { - if let Ok(mut file) = File::open("regression.diffs") { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("--------------- regression.diffs:\n{}", buffer); - } - self.dump_log_file(); - } - regress_check - } - - fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus { - let port = self.address.port().to_string(); - let clients = clients.to_string(); - let seconds = seconds.to_string(); - let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench")) - .args(&["-i", "-p", port.as_str(), "postgres"]) - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("pgbench -i"); - let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench")) - .args(&[ - "-p", - port.as_str(), - "-T", - seconds.as_str(), - "-P", - "1", - "-c", - clients.as_str(), - "-M", - "prepared", - "postgres", - ]) - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("pgbench run"); - pg_bench_run - } - - fn dump_log_file(&self) { - if let Ok(mut file) = File::open(self.env.pageserver_data_dir().join("pageserver.log")) { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("--------------- pageserver.log:\n{}", buffer); - } - } - - fn safe_psql(&self, db: &str, sql: &str) -> Vec { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.address.ip(), - self.address.port(), - db, - self.whoami() - ); - let mut client = postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap(); - - println!("Running {}", sql); - let result = client.query(sql, &[]); - if result.is_err() { - self.dump_log_file(); - } - result.unwrap() - } - - fn open_psql(&self, db: &str) -> postgres::Client { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.address.ip(), - self.address.port(), - db, - self.whoami() - ); - postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap() - } -} - -/////////////////////////////////////////////////////////////////////////////// -// -// WalAcceptorNode -// -/////////////////////////////////////////////////////////////////////////////// - -// -// Control routines for WalAcceptor. -// -// Now used only in test setups. -// -pub struct WalAcceptorNode { - listen: SocketAddr, - data_dir: PathBuf, - env: LocalEnv, - pass_to_pageserver: bool, -} - -impl WalAcceptorNode { - pub fn init(&self) { - if self.data_dir.exists() { - fs::remove_dir_all(self.data_dir.clone()).unwrap(); - } - fs::create_dir_all(self.data_dir.clone()).unwrap(); - } - - pub fn start(&self) { - println!( - "Starting wal_acceptor in {} listening '{}'", - self.data_dir.to_str().unwrap(), - self.listen - ); - - let ps_arg = if self.pass_to_pageserver { - // Tell page server it can receive WAL from this WAL safekeeper - ["--pageserver", "127.0.0.1:64000", "--recall", "1 second"].to_vec() - } else { - [].to_vec() - }; - - let status = Command::new( - self.env - .zenith_distrib_dir - .as_ref() - .unwrap() - .join("wal_acceptor"), - ) - .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-l", self.listen.to_string().as_str()]) - .args(&ps_arg) - .arg("-d") - .arg("-n") - .status() - .expect("failed to start wal_acceptor"); - - if !status.success() { - panic!("wal_acceptor start failed"); - } - } - - pub fn stop(&self) -> Result<()> { - println!("Stopping wal acceptor on {}", self.listen); - let pidfile = self.data_dir.join("wal_acceptor.pid"); - let pid = read_pidfile(&pidfile)?; - let pid = Pid::from_raw(pid); - if kill(pid, Signal::SIGTERM).is_err() { - bail!("Failed to kill wal_acceptor with pid {}", pid); - } - Ok(()) - } -} - -impl Drop for WalAcceptorNode { - fn drop(&mut self) { - // Ignore errors. - let _ = self.stop(); - } -} - -/////////////////////////////////////////////////////////////////////////////// -// -// WalProposerNode -// -/////////////////////////////////////////////////////////////////////////////// - -pub struct WalProposerNode { - pub pid: u32, -} - -impl WalProposerNode { - pub fn stop(&self) { - // std::process::Child::id() returns u32, we need i32. - let pid: i32 = self.pid.try_into().unwrap(); - let pid = Pid::from_raw(pid); - kill(pid, Signal::SIGTERM).expect("failed to execute kill"); - } -} - -impl Drop for WalProposerNode { - fn drop(&mut self) { - self.stop(); - } -} diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs deleted file mode 100644 index 2dfe1b831e..0000000000 --- a/integration_tests/tests/test_wal_acceptor.rs +++ /dev/null @@ -1,330 +0,0 @@ -use rand::Rng; -use std::sync::Arc; -use std::time::SystemTime; -use std::{thread, time}; - -use control_plane::compute::{ComputeControlPlane, PostgresNode}; - -use integration_tests::PostgresNodeExt; -use integration_tests::TestStorageControlPlane; - -const DOWNTIME: u64 = 2; - -fn start_node_with_wal_proposer( - timeline: &str, - compute_cplane: &mut ComputeControlPlane, - wal_acceptors: &str, -) -> Arc { - let node = compute_cplane.new_test_master_node(timeline); - let _node = node.append_conf( - "postgresql.conf", - &format!("wal_acceptors='{}'\n", wal_acceptors), - ); - node.start().unwrap(); - node -} - -#[test] -fn test_embedded_wal_proposer() { - let local_env = integration_tests::create_test_env("test_embedded_wal_proposer"); - - const REDUNDANCY: usize = 3; - let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - - // start postgres - let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); - - // check basic work with table - node.safe_psql( - "postgres", - "CREATE TABLE t(key int primary key, value text)", - ); - node.safe_psql( - "postgres", - "INSERT INTO t SELECT generate_series(1,100000), 'payload'", - ); - let count: i64 = node - .safe_psql("postgres", "SELECT sum(key) FROM t") - .first() - .unwrap() - .get(0); - println!("sum = {}", count); - assert_eq!(count, 5000050000); - // check wal files equality -} - -#[test] -fn test_acceptors_normal_work() { - let local_env = integration_tests::create_test_env("test_acceptors_normal_work"); - - const REDUNDANCY: usize = 3; - let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - - // start postgres - let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); - - // check basic work with table - node.safe_psql( - "postgres", - "CREATE TABLE t(key int primary key, value text)", - ); - node.safe_psql( - "postgres", - "INSERT INTO t SELECT generate_series(1,100000), 'payload'", - ); - let count: i64 = node - .safe_psql("postgres", "SELECT sum(key) FROM t") - .first() - .unwrap() - .get(0); - println!("sum = {}", count); - assert_eq!(count, 5000050000); - // check wal files equality -} - -// Run page server and multiple safekeepers, and multiple compute nodes running -// against different timelines. -#[test] -fn test_many_timelines() { - // Initialize a new repository, and set up WAL safekeepers and page server. - const REDUNDANCY: usize = 3; - const N_TIMELINES: usize = 5; - let local_env = integration_tests::create_test_env("test_many_timelines"); - let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - - // Create branches - let mut timelines: Vec = vec!["main".to_string()]; - - for i in 1..N_TIMELINES { - let branchname = format!("experimental{}", i); - storage_cplane - .pageserver - .branch_create(&branchname, "main") - .unwrap(); - timelines.push(branchname); - } - - // start postgres on each timeline - let mut nodes = Vec::new(); - for tli_name in timelines { - let node = start_node_with_wal_proposer(&tli_name, &mut compute_cplane, &wal_acceptors); - nodes.push(node.clone()); - } - - // create schema - for node in &nodes { - node.safe_psql( - "postgres", - "CREATE TABLE t(key int primary key, value text)", - ); - } - - // Populate data - for node in &nodes { - node.safe_psql( - "postgres", - "INSERT INTO t SELECT generate_series(1,100000), 'payload'", - ); - } - - // Check data - for node in &nodes { - let count: i64 = node - .safe_psql("postgres", "SELECT sum(key) FROM t") - .first() - .unwrap() - .get(0); - println!("sum = {}", count); - assert_eq!(count, 5000050000); - } -} - -// Majority is always alive -#[test] -fn test_acceptors_restarts() { - let local_env = integration_tests::create_test_env("test_acceptors_restarts"); - - // Start pageserver that reads WAL directly from that postgres - const REDUNDANCY: usize = 3; - const FAULT_PROBABILITY: f32 = 0.01; - - let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - let mut rng = rand::thread_rng(); - - // start postgres - let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); - - let mut failed_node: Option = None; - - // check basic work with table - node.safe_psql( - "postgres", - "CREATE TABLE t(key int primary key, value text)", - ); - let mut psql = node.open_psql("postgres"); - for i in 1..=1000 { - psql.execute("INSERT INTO t values ($1, 'payload')", &[&i]) - .unwrap(); - let prob: f32 = rng.gen(); - if prob <= FAULT_PROBABILITY { - if let Some(node) = failed_node { - storage_cplane.wal_acceptors[node].start(); - failed_node = None; - } else { - let node: usize = rng.gen_range(0..REDUNDANCY); - failed_node = Some(node); - storage_cplane.wal_acceptors[node].stop().unwrap(); - } - } - } - let count: i64 = node - .safe_psql("postgres", "SELECT sum(key) FROM t") - .first() - .unwrap() - .get(0); - println!("sum = {}", count); - assert_eq!(count, 500500); -} - -fn start_acceptor(cplane: &Arc, no: usize) { - let cp = cplane.clone(); - thread::spawn(move || { - thread::sleep(time::Duration::from_secs(DOWNTIME)); - cp.wal_acceptors[no].start(); - }); -} - -// Stop majority of acceptors while compute is under the load. Boot -// them again and check that nothing was losed. Repeat. -// N_CRASHES env var -#[test] -fn test_acceptors_unavailability() { - let local_env = integration_tests::create_test_env("test_acceptors_unavailability"); - - // Start pageserver that reads WAL directly from that postgres - const REDUNDANCY: usize = 2; - - let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - - // start postgres - let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); - - // check basic work with table - node.safe_psql( - "postgres", - "CREATE TABLE t(key int primary key, value text)", - ); - let mut psql = node.open_psql("postgres"); - psql.execute("INSERT INTO t values (1, 'payload')", &[]) - .unwrap(); - - // Shut down all wal acceptors - storage_cplane.wal_acceptors[0].stop().unwrap(); - let cp = Arc::new(storage_cplane); - start_acceptor(&cp, 0); - let now = SystemTime::now(); - psql.execute("INSERT INTO t values (2, 'payload')", &[]) - .unwrap(); - // Here we check that the query above was hanging - // while wal_acceptor was unavailiable - assert!(now.elapsed().unwrap().as_secs() >= DOWNTIME); - psql.execute("INSERT INTO t values (3, 'payload')", &[]) - .unwrap(); - - cp.wal_acceptors[1].stop().unwrap(); - start_acceptor(&cp, 1); - psql.execute("INSERT INTO t values (4, 'payload')", &[]) - .unwrap(); - // Here we check that the query above was hanging - // while wal_acceptor was unavailiable - assert!(now.elapsed().unwrap().as_secs() >= 2 * DOWNTIME); - - psql.execute("INSERT INTO t values (5, 'payload')", &[]) - .unwrap(); - - let count: i64 = node - .safe_psql("postgres", "SELECT sum(key) FROM t") - .first() - .unwrap() - .get(0); - println!("sum = {}", count); - // Ensure that all inserts succeeded. - // Including ones that were waiting for wal acceptor restart. - assert_eq!(count, 15); -} - -fn simulate_failures(cplane: Arc) { - let mut rng = rand::thread_rng(); - let n_acceptors = cplane.wal_acceptors.len(); - let failure_period = time::Duration::from_secs(1); - while cplane.is_running() { - thread::sleep(failure_period); - let mask: u32 = rng.gen_range(0..(1 << n_acceptors)); - for i in 0..n_acceptors { - if (mask & (1 << i)) != 0 { - cplane.wal_acceptors[i].stop().unwrap(); - } - } - thread::sleep(failure_period); - for i in 0..n_acceptors { - if (mask & (1 << i)) != 0 { - cplane.wal_acceptors[i].start(); - } - } - } -} - -// Race condition test -#[test] -fn test_race_conditions() { - let local_env = integration_tests::create_test_env("test_race_conditions"); - - // Start pageserver that reads WAL directly from that postgres - const REDUNDANCY: usize = 3; - - let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant( - &local_env, REDUNDANCY, - )); - let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - - // start postgres - let node = start_node_with_wal_proposer("main", &mut compute_cplane, &wal_acceptors); - - // check basic work with table - node.safe_psql( - "postgres", - "CREATE TABLE t(key int primary key, value text)", - ); - - let cp = storage_cplane.clone(); - let failures_thread = thread::spawn(move || { - simulate_failures(cp); - }); - - let mut psql = node.open_psql("postgres"); - for i in 1..=1000 { - psql.execute("INSERT INTO t values ($1, 'payload')", &[&i]) - .unwrap(); - } - let count: i64 = node - .safe_psql("postgres", "SELECT sum(key) FROM t") - .first() - .unwrap() - .get(0); - println!("sum = {}", count); - assert_eq!(count, 500500); - - storage_cplane.stop(); - failures_thread.join().unwrap(); -} diff --git a/test_runner/batch_others/test_config.py b/test_runner/batch_others/test_config.py index eda70e89d9..2b0b04238e 100644 --- a/test_runner/batch_others/test_config.py +++ b/test_runner/batch_others/test_config.py @@ -11,7 +11,7 @@ def test_config(zenith_cli, pageserver, postgres, pg_bin): zenith_cli.run(["branch", "test_config", "empty"]) # change config - pg = postgres.create_start('test_config', ['log_min_messages=debug1']) + pg = postgres.create_start('test_config', config_lines=['log_min_messages=debug1']) print('postgres is running on test_config branch') with psycopg2.connect(pg.connstr()) as conn: diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index 610f003494..4f95103fba 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -9,7 +9,7 @@ pytest_plugins = ("fixtures.zenith_fixtures") def test_twophase(zenith_cli, pageserver, postgres, pg_bin): zenith_cli.run(["branch", "test_twophase", "empty"]) - pg = postgres.create_start('test_twophase', ['max_prepared_transactions=5']) + pg = postgres.create_start('test_twophase', config_lines=['max_prepared_transactions=5']) print("postgres is running on 'test_twophase' branch") conn = psycopg2.connect(pg.connstr()) @@ -31,7 +31,8 @@ def test_twophase(zenith_cli, pageserver, postgres, pg_bin): # Create a branch with the transaction in prepared state zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"]) - pg2 = postgres.create_start('test_twophase_prepared', ['max_prepared_transactions=5']) + pg2 = postgres.create_start('test_twophase_prepared', + config_lines=['max_prepared_transactions=5']) conn2 = psycopg2.connect(pg2.connstr()) conn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cur2 = conn2.cursor() diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py new file mode 100644 index 0000000000..f709be1b6d --- /dev/null +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -0,0 +1,201 @@ +import psycopg2 +import pytest +import random +import time + +from multiprocessing import Process, Value + +pytest_plugins = ("fixtures.zenith_fixtures") + + +# basic test, write something in setup with wal acceptors, ensure that commits +# succeed and data is written +def test_normal_work(zenith_cli, pageserver, postgres, wa_factory): + zenith_cli.run(["branch", "test_wal_acceptors_normal_work", "empty"]) + wa_factory.start_n_new(3) + pg = postgres.create_start('test_wal_acceptors_normal_work', + wal_acceptors=wa_factory.get_connstrs()) + + pg_conn = psycopg2.connect(pg.connstr()) + # do commit after each statement as waiting for acceptors happens there + pg_conn.autocommit = True + + cur = pg_conn.cursor() + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (5000050000, ) + + +# Run page server and multiple acceptors, and multiple compute nodes running +# against different timelines. +def test_many_timelines(zenith_cli, pageserver, postgres, wa_factory): + n_timelines = 2 + + wa_factory.start_n_new(3) + + branches = ["test_wal_acceptors_many_timelines_{}".format(tlin) for tlin in range(n_timelines)] + + # start postgres on each timeline + pgs = [] + for branch in branches: + zenith_cli.run(["branch", branch, "empty"]) + pgs.append(postgres.create_start(branch, wal_acceptors=wa_factory.get_connstrs())) + + # Do everything in different loops to have actions on different timelines + # interleaved. + # create schema + for pg in pgs: + pg.safe_psql("CREATE TABLE t(key int primary key, value text)") + + # Populate data + for pg in pgs: + pg.safe_psql("INSERT INTO t SELECT generate_series(1,100000), 'payload'") + + # Check data + for pg in pgs: + res = pg.safe_psql("SELECT sum(key) FROM t") + assert res[0] == (5000050000, ) + + +# Check that dead minority doesn't prevent the commits: execute insert n_inserts +# times, with fault_probability chance of getting a wal acceptor down or up +# along the way. 2 of 3 are always alive, so the work keeps going. +def test_restarts(zenith_cli, pageserver, postgres, wa_factory): + fault_probability = 0.01 + n_inserts = 1000 + n_acceptors = 3 + + wa_factory.start_n_new(n_acceptors) + + zenith_cli.run(["branch", "test_wal_acceptors_restarts", "empty"]) + pg = postgres.create_start('test_wal_acceptors_restarts', + wal_acceptors=wa_factory.get_connstrs()) + + pg_conn = psycopg2.connect(pg.connstr()) + # do commit after each statement as waiting for acceptors happens there + pg_conn.autocommit = True + + cur = pg_conn.cursor() + + failed_node = None + cur.execute('CREATE TABLE t(key int primary key, value text)') + for i in range(n_inserts): + cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1, )) + + if random.random() <= fault_probability: + if failed_node is None: + failed_node = wa_factory.instances[random.randrange(0, n_acceptors)] + failed_node.stop() + else: + failed_node.start() + failed_node = None + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (500500, ) + + +start_delay_sec = 2 + + +def delayed_wal_acceptor_start(wa): + time.sleep(start_delay_sec) + wa.start() + + +# When majority of acceptors is offline, commits are expected to be frozen +def test_unavailability(zenith_cli, pageserver, postgres, wa_factory): + wa_factory.start_n_new(2) + + zenith_cli.run(["branch", "test_wal_acceptors_unavailability", "empty"]) + pg = postgres.create_start('test_wal_acceptors_unavailability', + wal_acceptors=wa_factory.get_connstrs()) + + pg_conn = psycopg2.connect(pg.connstr()) + # do commit after each statement as waiting for acceptors happens there + pg_conn.autocommit = True + cur = pg_conn.cursor() + + # check basic work with table + cur.execute('CREATE TABLE t(key int primary key, value text)') + cur.execute("INSERT INTO t values (1, 'payload')") + + # shutdown one of two acceptors, that is, majority + wa_factory.instances[0].stop() + + proc = Process(target=delayed_wal_acceptor_start, args=(wa_factory.instances[0], )) + proc.start() + + start = time.time() + cur.execute("INSERT INTO t values (2, 'payload')") + # ensure that the query above was hanging while acceptor was down + assert (time.time() - start) >= start_delay_sec + proc.join() + + # for the world's balance, do the same with second acceptor + wa_factory.instances[1].stop() + + proc = Process(target=delayed_wal_acceptor_start, args=(wa_factory.instances[1], )) + proc.start() + + start = time.time() + cur.execute("INSERT INTO t values (3, 'payload')") + # ensure that the query above was hanging while acceptor was down + assert (time.time() - start) >= start_delay_sec + proc.join() + + cur.execute("INSERT INTO t values (4, 'payload')") + + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (10, ) + + +# shut down random subset of acceptors, sleep, wake them up, rinse, repeat +def xmas_garland(acceptors, stop): + while not bool(stop.value): + victims = [] + for wa in acceptors: + if random.random() >= 0.5: + victims.append(wa) + for v in victims: + v.stop() + time.sleep(1) + for v in victims: + v.start() + time.sleep(1) + + +# value which gets unset on exit +@pytest.fixture +def stop_value(): + stop = Value('i', 0) + yield stop + stop.value = 1 + + +# do inserts while concurrently getting up/down subsets of acceptors +def test_race_conditions(zenith_cli, pageserver, postgres, wa_factory, stop_value): + + wa_factory.start_n_new(3) + + zenith_cli.run(["branch", "test_wal_acceptors_race_conditions", "empty"]) + pg = postgres.create_start('test_wal_acceptors_race_conditions', + wal_acceptors=wa_factory.get_connstrs()) + + pg_conn = psycopg2.connect(pg.connstr()) + # do commit after each statement as waiting for acceptors happens there + pg_conn.autocommit = True + cur = pg_conn.cursor() + + cur.execute('CREATE TABLE t(key int primary key, value text)') + + proc = Process(target=xmas_garland, args=(wa_factory.instances, stop_value)) + proc.start() + + for i in range(1000): + cur.execute("INSERT INTO t values (%s, 'payload');", (i + 1, )) + + cur.execute('SELECT sum(key) FROM t') + assert cur.fetchone() == (500500, ) + + stop_value.value = 1 + proc.join() diff --git a/test_runner/batch_pg_regress/test_isolation.py b/test_runner/batch_pg_regress/test_isolation.py index 38c007af64..c19f538540 100644 --- a/test_runner/batch_pg_regress/test_isolation.py +++ b/test_runner/batch_pg_regress/test_isolation.py @@ -14,7 +14,7 @@ def test_isolation(pageserver, postgres, pg_bin, zenith_cli, test_output_dir, pg # Connect to postgres and create a database called "regression". # isolation tests use prepared transactions, so enable them - pg = postgres.create_start('test_isolation', ['max_prepared_transactions=100']) + pg = postgres.create_start('test_isolation', config_lines=['max_prepared_transactions=100']) pg_conn = psycopg2.connect(pg.connstr()) pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cur = pg_conn.cursor() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index fee626b616..167a4efe27 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,8 +1,12 @@ import getpass import os +import signal import pytest import shutil import subprocess +import psycopg2 + +from pathlib import Path from typing import Any, Callable, Dict, Iterator, List, Optional, TypeVar, Literal @@ -31,6 +35,8 @@ Fn = TypeVar('Fn', bound=Callable[..., Any]) DEFAULT_OUTPUT_DIR = 'test_output' DEFAULT_POSTGRES_DIR = 'tmp_install' +DEFAULT_PAGESERVER_PORT = 64000 + def determine_scope(fixture_name: str, config: Any) -> str: return 'session' @@ -155,7 +161,9 @@ class ZenithPageserver: # returns a libpq connection string for connecting to it. def connstr(self) -> str: username = getpass.getuser() - conn_str = 'host={} port={} dbname=postgres user={}'.format('localhost', 64000, username) + conn_str = 'host={} port={} dbname=postgres user={}'.format('localhost', + DEFAULT_PAGESERVER_PORT, + username) return conn_str @@ -197,9 +205,14 @@ class Postgres: self.branch: Optional[str] = None # dubious, see asserts below # path to conf is /pgdatadirs//postgresql.conf - def create(self, branch: str, config_lines: Optional[List[str]] = None) -> 'Postgres': + def create(self, + branch: str, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None) -> 'Postgres': """ Create the pg data directory. + If wal_acceptors is not None, node will use wal acceptors; config is + adjusted accordingly. Returns self. """ @@ -208,6 +221,10 @@ class Postgres: self.zenith_cli.run(['pg', 'create', branch]) self.branch = branch + if wal_acceptors is not None: + self.adjust_for_wal_acceptors(wal_acceptors) + if config_lines is None: + config_lines = [] self.config(config_lines) return self @@ -224,6 +241,32 @@ class Postgres: return self + """ Path to postgresql.conf """ + + def config_file_path(self) -> str: + filename = 'pgdatadirs/{}/postgresql.conf'.format(self.branch) + return os.path.join(self.repo_dir, filename) + + """ + Adjust instance config for working with wal acceptors instead of + pageserver (pre-configured by CLI) directly. + """ + + def adjust_for_wal_acceptors(self, wal_acceptors) -> 'Postgres': + with open(self.config_file_path(), "r") as f: + cfg_lines = f.readlines() + with open(self.config_file_path(), "w") as f: + for cfg_line in cfg_lines: + # walproposer uses different application_name + if ("synchronous_standby_names" in cfg_line or + # don't ask pageserver to fetch WAL from compute + "callmemaybe_connstring" in cfg_line): + continue + f.write(cfg_line) + f.write("synchronous_standby_names = 'walproposer'\n") + f.write("wal_acceptors = '{}'\n".format(wal_acceptors)) + return self + def config(self, lines: List[str]) -> 'Postgres': """ Add lines to postgresql.conf. @@ -231,9 +274,7 @@ class Postgres: Returns self. """ - filename = 'pgdatadirs/{}/postgresql.conf'.format(self.branch) - config_name = os.path.join(self.repo_dir, filename) - with open(config_name, 'a') as conf: + with open(self.config_file_path(), 'a') as conf: for line in lines: conf.write(line) conf.write('\n') @@ -264,13 +305,16 @@ class Postgres: return self - def create_start(self, branch: str, config_lines: Optional[List[str]] = None) -> 'Postgres': + def create_start(self, + branch: str, + wal_acceptors: Optional[str] = None, + config_lines: Optional[List[str]] = None) -> 'Postgres': """ Create a Postgres instance, then start it. Returns self. """ - self.create(branch, config_lines).start() + self.create(branch, wal_acceptors, config_lines).start() return self @@ -284,6 +328,17 @@ class Postgres: return conn_str + def safe_psql(self, query, dbname='postgres', username=None): + """ + Execute query against the node and return all (fetchall) results + """ + with psycopg2.connect(self.connstr(dbname, username)) as conn: + with conn.cursor() as curs: + curs.execute(query) + if curs.description is None: + return [] # query didn't return data + return curs.fetchall() + class PostgresFactory: """ An object representing multiple running postgres daemons. """ @@ -296,13 +351,14 @@ class PostgresFactory: def create_start(self, branch: str = "main", + wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None) -> Postgres: pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1) self.num_instances += 1 self.instances.append(pg) - return pg.create_start(branch, config_lines) + return pg.create_start(branch, wal_acceptors, config_lines) def stop_all(self) -> 'PostgresFactory': for pg in self.instances: @@ -382,12 +438,119 @@ def pg_bin(test_output_dir: str, pg_distrib_dir: str) -> PgBin: return PgBin(test_output_dir, pg_distrib_dir) +""" Read content of file into number """ + + +def read_pid(path): + return int(Path(path).read_text()) + + +""" An object representing a running wal acceptor daemon. """ + + +class WalAcceptor: + def __init__(self, wa_binpath, data_dir, port, num): + self.wa_binpath = wa_binpath + self.data_dir = data_dir + self.port = port + self.num = num # identifier for logging + + def start(self) -> 'WalAcceptor': + # create data directory if not exists + Path(self.data_dir).mkdir(parents=True, exist_ok=True) + + cmd = [self.wa_binpath] + cmd.extend(["-D", self.data_dir]) + cmd.extend(["-l", "127.0.0.1:{}".format(self.port)]) + cmd.append("--daemonize") + cmd.append("--no-sync") + # Tell page server it can receive WAL from this WAL safekeeper + cmd.extend(["--pageserver", "127.0.0.1:{}".format(DEFAULT_PAGESERVER_PORT)]) + cmd.extend(["--recall", "1 second"]) + print('Running command "{}"'.format(' '.join(cmd))) + subprocess.run(cmd, check=True) + + return self + + def stop(self) -> 'WalAcceptor': + print('Stopping wal acceptor {}'.format(self.num)) + pidfile_path = os.path.join(self.data_dir, "wal_acceptor.pid") + try: + pid = read_pid(pidfile_path) + try: + os.kill(pid, signal.SIGTERM) + except Exception: + pass # pidfile might be obsolete + # TODO: cleanup pid file on exit in wal acceptor + return self + # for _ in range(5): + # print('waiting wal acceptor {} (pid {}) to stop...', self.num, pid) + # try: + # read_pid(pidfile_path) + # except FileNotFoundError: + # return # done + # time.sleep(1) + # raise Exception('Failed to wait for wal acceptor {} shutdown'.format(self.num)) + except FileNotFoundError: + print("Wal acceptor {} is not running".format(self.num)) + return self + + +class WalAcceptorFactory: + """ An object representing multiple running wal acceptors. """ + def __init__(self, zenith_binpath, data_dir): + self.wa_binpath = os.path.join(zenith_binpath, 'wal_acceptor') + self.data_dir = data_dir + self.instances = [] + self.initial_port = 54321 + """ + Start new wal acceptor. + """ + + def start_new(self) -> WalAcceptor: + wa_num = len(self.instances) + wa = WalAcceptor(self.wa_binpath, + os.path.join(self.data_dir, "wal_acceptor_{}".format(wa_num)), + self.initial_port + wa_num, wa_num) + wa.start() + self.instances.append(wa) + return wa + + """ + Start n new wal acceptors + """ + + def start_n_new(self, n): + for _ in range(n): + self.start_new() + + def stop_all(self) -> 'WalAcceptorFactory': + for wa in self.instances: + wa.stop() + return self + + """ Get list of wal acceptor endpoints suitable for wal_acceptors GUC """ + + def get_connstrs(self) -> str: + return ','.join(["127.0.0.1:{}".format(wa.port) for wa in self.instances]) + + +@zenfixture +def wa_factory(zenith_binpath, repo_dir) -> Iterator[WalAcceptorFactory]: + """ Gives WalAcceptorFactory providing wal acceptors. """ + wafactory = WalAcceptorFactory(zenith_binpath, os.path.join(repo_dir, "wal_acceptors")) + yield wafactory + # After the yield comes any cleanup code we need. + print('Starting wal acceptors cleanup') + wafactory.stop_all() + + @zenfixture def base_dir() -> str: """ find the base directory (currently this is the git root) """ base_dir = os.path.normpath(os.path.join(get_self_dir(), '../..')) - print('base_dir is', base_dir) + print('\nbase_dir is', base_dir) return base_dir