diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index 13c37f8259..d9d04abfd9 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -3,7 +3,7 @@ // use log::*; -use std::{fs::File, str::FromStr}; +use std::{fs::File, str::FromStr, fs::OpenOptions}; use std::io; use std::path::PathBuf; use std::thread; @@ -129,8 +129,8 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { // There should'n be any logging to stdin/stdout. Redirect it to the main log so // that we will see any accidental manual fpritf's or backtraces. - let stdout = File::create(conf.data_dir.join("pageserver.log")).unwrap(); - let stderr = File::create(conf.data_dir.join("pageserver.log")).unwrap(); + let stdout = OpenOptions::new().create(true).append(true).open(conf.data_dir.join("pageserver.log")).unwrap(); + let stderr = OpenOptions::new().create(true).append(true).open(conf.data_dir.join("pageserver.log")).unwrap(); let daemonize = Daemonize::new() .pid_file(conf.data_dir.join("pageserver.pid")) diff --git a/src/bin/wal_acceptor.rs b/src/bin/wal_acceptor.rs index 589046aa91..1c507d5bb1 100644 --- a/src/bin/wal_acceptor.rs +++ b/src/bin/wal_acceptor.rs @@ -2,9 +2,12 @@ // Main entry point for the wal_acceptor executable // use log::*; +use std::{fs::File, fs::OpenOptions}; use std::io; use std::path::PathBuf; use std::thread; +use daemonize::Daemonize; +use std::path::Path; use clap::{App, Arg}; @@ -29,6 +32,11 @@ fn main() -> Result<(), io::Error> { .long("listen") .takes_value(true) .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)")) + .arg(Arg::with_name("daemonize") + .short("d") + .long("daemonize") + .takes_value(false) + .help("Run in the background")) .arg(Arg::with_name("no-sync") .short("n") .long("no-sync") @@ -38,6 +46,7 @@ fn main() -> Result<(), io::Error> { let mut conf = WalAcceptorConf { data_dir: PathBuf::from("./"), + daemonize: false, no_sync: false, listen_addr: "127.0.0.1:5454".parse().unwrap() }; @@ -50,6 +59,10 @@ fn main() -> Result<(), io::Error> { conf.no_sync = true; } + if arg_matches.is_present("daemonize") { + conf.daemonize = true; + } + if let Some(addr) = arg_matches.value_of("listen") { conf.listen_addr = addr.parse().unwrap(); } @@ -59,12 +72,32 @@ fn main() -> Result<(), io::Error> { fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> { // Initialize logger - let _scope_guard = init_noninteractive_logging(); + let _scope_guard = init_logging(&conf); let _log_guard = slog_stdlog::init().unwrap(); // Note: this `info!(...)` macro comes from `log` crate info!("standard logging redirected to slog"); - let mut threads = Vec::new(); + if conf.daemonize { + info!("daemonizing..."); + + // There should'n be any logging to stdin/stdout. Redirect it to the main log so + // that we will see any accidental manual fpritf's or backtraces. + let stdout = OpenOptions::new().create(true).append(true).open(conf.data_dir.join("wal_acceptor.log")).unwrap(); + let stderr = OpenOptions::new().create(true).append(true).open(conf.data_dir.join("wal_acceptor.log")).unwrap(); + + let daemonize = Daemonize::new() + .pid_file(conf.data_dir.join("wal_acceptor.pid")) + .working_directory(Path::new(".")) + .stdout(stdout) + .stderr(stderr); + + match daemonize.start() { + Ok(_) => info!("Success, daemonized"), + Err(e) => error!("Error, {}", e), + } + } + + let mut threads = Vec::new(); let wal_acceptor_thread = thread::Builder::new() .name("WAL acceptor thread".into()) .spawn(|| { @@ -80,10 +113,20 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> { Ok(()) } -fn init_noninteractive_logging() -> slog_scope::GlobalLoggerGuard { - let decorator = slog_term::TermDecorator::new().build(); - let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let drain = slog_async::Async::new(drain).chan_size(1000).build().fuse(); - let logger = slog::Logger::root(drain, slog::o!()); - return slog_scope::set_global_logger(logger); +fn init_logging(conf: &WalAcceptorConf) -> slog_scope::GlobalLoggerGuard { + if conf.daemonize { + let log = conf.data_dir.join("wal_acceptor.log"); + let log_file = File::create(log).unwrap_or_else(|_| panic!("Could not create log file")); + let decorator = slog_term::PlainSyncDecorator::new(log_file); + let drain = slog_term::CompactFormat::new(decorator).build(); + let drain = std::sync::Mutex::new(drain).fuse(); + let logger = slog::Logger::root(drain, slog::o!()); + slog_scope::set_global_logger(logger) + } else { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).chan_size(1000).build().fuse(); + let logger = slog::Logger::root(drain, slog::o!()); + return slog_scope::set_global_logger(logger); + } } diff --git a/src/control_plane.rs b/src/control_plane.rs index d7fbd862b8..b105de4127 100644 --- a/src/control_plane.rs +++ b/src/control_plane.rs @@ -7,7 +7,7 @@ // local installations. // -use std::{fs::{self, OpenOptions}, rc::Rc}; +use std::fs::{self, OpenOptions}; use std::path::{Path, PathBuf}; use std::process::Command; use std::str; @@ -15,6 +15,7 @@ use std::{ io::Write, net::{IpAddr, Ipv4Addr, SocketAddr}, }; +use std::sync::Arc; use std::fs::File; use postgres::{Client, NoTls}; @@ -39,8 +40,8 @@ lazy_static! { // as it is closer to the actual setup. // pub struct StorageControlPlane { - wal_acceptors: Vec, - page_servers: Vec, + pub wal_acceptors: Vec, + pub page_servers: Vec, } impl StorageControlPlane { @@ -62,7 +63,27 @@ impl StorageControlPlane { cplane } - // // postgres <-> wal_acceptor x3 <-> page_server + + pub fn fault_tolerant(redundancy : usize) -> StorageControlPlane { + let mut cplane = StorageControlPlane { + wal_acceptors: Vec::new(), + page_servers: Vec::new(), + }; + const WAL_ACCEPTOR_PORT : usize = 54321; + + for i in 0..redundancy { + let wal_acceptor = WalAcceptorNode { + listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i).parse().unwrap(), + data_dir: TEST_WORKDIR.join(format!("wal_acceptor_{}", i)) + }; + wal_acceptor.init(); + wal_acceptor.start(); + cplane.wal_acceptors.push(wal_acceptor); + } + cplane + } + + // // postgres <-> wal_acceptor x3 <-> page_server // fn local(&mut self) -> StorageControlPlane { // } @@ -70,7 +91,9 @@ impl StorageControlPlane { &self.page_servers[0].page_service_addr } - fn get_wal_acceptor_conn_info() {} + pub fn get_wal_acceptor_conn_info(&self) -> String { + self.wal_acceptors.iter().map(|wa|wa.listen.to_string().to_string()).collect::>().join(",") + } pub fn page_server_psql(&self, sql: &str) -> Vec { let addr = &self.page_servers[0].page_service_addr; @@ -161,7 +184,47 @@ pub struct WalAcceptorNode { data_dir: PathBuf, } -impl WalAcceptorNode {} +impl WalAcceptorNode { + pub fn init(&self) { + fs::remove_dir_all(self.data_dir.clone()).unwrap(); + fs::create_dir_all(self.data_dir.clone()).unwrap(); + } + + pub fn start(&self) { + println!("Starting wal_acceptor in {} listening '{}'", self.data_dir.to_str().unwrap(), self.listen); + + let status = Command::new(CARGO_BIN_DIR.join("wal_acceptor")) + .args(&["-D", self.data_dir.to_str().unwrap()]) + .args(&["-l", self.listen.to_string().as_str()]) + .arg("-d") + .arg("-n") + .status() + .expect("failed to start wal_acceptor"); + + if !status.success() { + panic!("wal_acceptor start failed"); + } + } + + pub fn stop(&self) { + let pidfile = self.data_dir.join("wal_acceptor.pid"); + if let Ok(pid) = fs::read_to_string(pidfile) { + let _status = Command::new("kill") + .arg(pid) + .env_clear() + .status() + .expect("failed to execute kill"); + } + } +} + + +impl Drop for WalAcceptorNode { + fn drop(&mut self) { + self.stop(); + // fs::remove_dir_all(self.data_dir.clone()).unwrap(); + } +} /////////////////////////////////////////////////////////////////////////////// @@ -173,7 +236,7 @@ pub struct ComputeControlPlane<'a> { work_dir: PathBuf, last_assigned_port: u16, storage_cplane: &'a StorageControlPlane, - nodes: Vec>, + nodes: Vec>, } impl ComputeControlPlane<'_> { @@ -195,7 +258,7 @@ impl ComputeControlPlane<'_> { port } - pub fn new_vanilla_node<'a>(&mut self) -> &Rc { + pub fn new_vanilla_node<'a>(&mut self) -> &Arc { // allocate new node entry with generated port let node_id = self.nodes.len() + 1; let node = PostgresNode { @@ -205,7 +268,7 @@ impl ComputeControlPlane<'_> { pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), pg_bin_dir: self.pg_bin_dir.clone(), }; - self.nodes.push(Rc::new(node)); + self.nodes.push(Arc::new(node)); let node = self.nodes.last().unwrap(); // initialize data directory @@ -259,7 +322,7 @@ impl ComputeControlPlane<'_> { pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), pg_bin_dir: self.pg_bin_dir.clone(), }; - self.nodes.push(Rc::new(node)); + self.nodes.push(Arc::new(node)); let node = self.nodes.last().unwrap(); // initialize data directory w/o files @@ -297,7 +360,7 @@ impl ComputeControlPlane<'_> { node } - pub fn new_node(&mut self) -> Rc { + pub fn new_node(&mut self) -> Arc { let storage_cplane = self.storage_cplane; let node = self.new_vanilla_node(); @@ -310,6 +373,40 @@ impl ComputeControlPlane<'_> { node.clone() } + + pub fn new_master_node(&mut self) -> Arc { + let node = self.new_vanilla_node(); + + node.append_conf("postgresql.conf", "synchronous_standby_names = 'safekeeper_proxy'\n\ + "); + node.clone() + } +} + +/////////////////////////////////////////////////////////////////////////////// + +pub struct WalProposerNode { + pid: u32 +} + +impl WalProposerNode { + pub fn stop(&self) { + let status = Command::new("kill") + .arg(self.pid.to_string()) + .env_clear() + .status() + .expect("failed to execute kill"); + + if !status.success() { + panic!("kill start failed"); + } + } +} + +impl Drop for WalProposerNode { + fn drop(&mut self) { + self.stop(); + } } /////////////////////////////////////////////////////////////////////////////// @@ -352,9 +449,10 @@ impl PostgresNode { } pub fn start(&self, storage_cplane: &StorageControlPlane) { - let _res = storage_cplane - .page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); - + if storage_cplane.page_servers.len() != 0 { + let _res = storage_cplane + .page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); + } println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl(&["start"], true); } @@ -394,10 +492,21 @@ impl PostgresNode { ); let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); - println!("Running {}", sql); + println!("Running {}", sql); client.query(sql, &[]).unwrap() } + pub fn open_psql(&self, db: &str) -> Client { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.ip, + self.port, + db, + self.whoami() + ); + Client::connect(connstring.as_str(), NoTls).unwrap() + } + pub fn get_pgdata(&self) -> Option<&str> { self.pgdata.to_str() @@ -427,6 +536,23 @@ impl PostgresNode { } } + pub fn start_proxy(&self, wal_acceptors : String) -> WalProposerNode { + let proxy_path = PG_BIN_DIR.join("safekeeper_proxy"); + match Command::new(proxy_path.as_path()) + .args(&["-s", &wal_acceptors]) + .args(&["-h", &self.ip.to_string()]) + .args(&["-p", &self.port.to_string()]) + .arg("-v") + .stderr(File::create(TEST_WORKDIR.join("safepkeeper_proxy.log")).unwrap()) + .spawn() + { + Ok(child) => + WalProposerNode { pid: child.id() }, + Err(e) => + panic!("Failed to launch {:?}: {}", proxy_path, e) + } + } + // TODO pub fn pg_bench() {} pub fn pg_regress() {} diff --git a/src/lib.rs b/src/lib.rs index 6f34a0743a..8056f86747 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ pub struct PageServerConf { #[derive(Debug, Clone)] pub struct WalAcceptorConf { pub data_dir: PathBuf, + pub daemonize: bool, pub no_sync: bool, pub listen_addr: SocketAddr, } diff --git a/tests/test_wal_acceptor.rs b/tests/test_wal_acceptor.rs index e829b89115..078211bcae 100644 --- a/tests/test_wal_acceptor.rs +++ b/tests/test_wal_acceptor.rs @@ -1,9 +1,91 @@ // Restart acceptors one by one while compute is under the load. +use pageserver::control_plane::ComputeControlPlane; +use pageserver::control_plane::StorageControlPlane; +use rand::Rng; +use std::{thread, time}; +use std::sync::Arc; +use std::time::SystemTime; + +#[test] +fn test_acceptors_normal_work() { + // Start pageserver that reads WAL directly from that postgres + const REDUNDANCY : usize = 3; + let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); + + // start postgre + let node = compute_cplane.new_master_node(); + node.start(&storage_cplane); + + // start proxy + let _proxy = node.start_proxy(wal_acceptors); + + // check basic work with table + node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); + node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'"); + let count: i64 = node + .safe_psql("postgres", "SELECT sum(key) FROM t") + .first() + .unwrap() + .get(0); + println!("sum = {}", count); + assert_eq!(count, 5000050000); + // check wal files equality +} + // Majority is always alive #[test] fn test_acceptors_restarts() { + // Start pageserver that reads WAL directly from that postgres + const REDUNDANCY : usize = 3; + const FAULT_PROBABILITY : f32 = 0.01; - // check wal files equality + let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); + let mut rng = rand::thread_rng(); + + // start postgre + let node = compute_cplane.new_master_node(); + node.start(&storage_cplane); + + // start proxy + let _proxy = node.start_proxy(wal_acceptors); + let mut failed_node : Option = None; + + // check basic work with table + node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); + let mut psql = node.open_psql("postgres"); + for i in 1..=1000 { + psql.execute("INSERT INTO t values ($1, 'payload')", &[&i]).unwrap(); + let prob : f32 = rng.gen(); + if prob <= FAULT_PROBABILITY { + if let Some(node) = failed_node { + storage_cplane.wal_acceptors[node].start(); + failed_node = None; + } else { + let node : usize = rng.gen_range(0..REDUNDANCY); + failed_node = Some(node); + storage_cplane.wal_acceptors[node].stop(); + } + } + } + let count: i64 = node + .safe_psql("postgres", "SELECT sum(key) FROM t") + .first() + .unwrap() + .get(0); + println!("sum = {}", count); + assert_eq!(count, 500500); +} + +fn start_acceptor(plane : &Arc, no : usize) { + let sp = plane.clone(); + thread::spawn(move || { + thread::sleep(time::Duration::from_millis(1000)); + sp.wal_acceptors[no].start(); + }); } // Stop majority of acceptors while compute is under the load. Boot @@ -11,6 +93,45 @@ fn test_acceptors_restarts() { // N_CRASHES env var #[test] fn test_acceptors_unavalability() { + // Start pageserver that reads WAL directly from that postgres + const REDUNDANCY : usize = 2; - // check wal files equality + let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); + + // start postgre + let node = compute_cplane.new_master_node(); + node.start(&storage_cplane); + + // start proxy + let _proxy = node.start_proxy(wal_acceptors); + + // check basic work with table + node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); + let mut psql = node.open_psql("postgres"); + psql.execute("INSERT INTO t values (1, 'payload')", &[]).unwrap(); + + storage_cplane.wal_acceptors[0].stop(); + let ap = Arc::new(storage_cplane); + start_acceptor(&ap, 0); + let now = SystemTime::now(); + psql.execute("INSERT INTO t values (2, 'payload')", &[]).unwrap(); + assert!(now.elapsed().unwrap().as_secs() > 1); + psql.execute("INSERT INTO t values (3, 'payload')", &[]).unwrap(); + + ap.wal_acceptors[1].stop(); + start_acceptor(&ap, 1); + psql.execute("INSERT INTO t values (4, 'payload')", &[]).unwrap(); + assert!(now.elapsed().unwrap().as_secs() > 2); + + psql.execute("INSERT INTO t values (5, 'payload')", &[]).unwrap(); + + let count: i64 = node + .safe_psql("postgres", "SELECT sum(key) FROM t") + .first() + .unwrap() + .get(0); + println!("sum = {}", count); + assert_eq!(count, 15); }