diff --git a/.gitignore b/.gitignore index 2f22547efd..20348359a4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target -/tmp_check +/tmp_check/ /tmp_install diff --git a/Cargo.lock b/Cargo.lock index 0ac61eb60d..04d3842934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -378,6 +378,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "control_plane" +version = "0.1.0" +dependencies = [ + "home", + "pageserver", + "postgres", + "rand 0.8.3", + "serde", + "serde_derive", + "tokio-postgres", + "toml", + "walkeeper", +] + [[package]] name = "core-foundation" version = "0.9.1" @@ -795,6 +810,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2456aef2e6b6a9784192ae780c0f15bc57df0e918585282325e8c8ac27737654" +dependencies = [ + "winapi", +] + [[package]] name = "http" version = "0.2.3" @@ -900,6 +924,7 @@ dependencies = [ name = "integration_tests" version = "0.1.0" dependencies = [ + "control_plane", "lazy_static", "pageserver", "postgres", @@ -2094,6 +2119,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +dependencies = [ + "serde", +] + [[package]] name = "tower-service" version = "0.3.1" @@ -2417,3 +2451,11 @@ name = "xml-rs" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" + +[[package]] +name = "zenith" +version = "0.1.0" +dependencies = [ + "clap", + "control_plane", +] diff --git a/Cargo.toml b/Cargo.toml index b6809e2ad7..f4d6314283 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,6 @@ members = [ "integration_tests", "pageserver", "walkeeper", + "zenith", + "control_plane", ] diff --git a/control_plane/.gitignore b/control_plane/.gitignore new file mode 100644 index 0000000000..c1e54a6bcb --- /dev/null +++ b/control_plane/.gitignore @@ -0,0 +1 @@ +tmp_check/ diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml new file mode 100644 index 0000000000..dac78ea356 --- /dev/null +++ b/control_plane/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "control_plane" +version = "0.1.0" +authors = ["Stas Kelvich "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rand = "0.8.3" +postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } + +serde = "" +serde_derive = "" +toml = "" +home = "0.5.3" + +pageserver = { path = "../pageserver" } +walkeeper = { path = "../walkeeper" } diff --git a/integration_tests/tests/control_plane/mod.rs b/control_plane/src/lib.rs similarity index 69% rename from integration_tests/tests/control_plane/mod.rs rename to control_plane/src/lib.rs index eab3f345af..09e172ec4a 100644 --- a/integration_tests/tests/control_plane/mod.rs +++ b/control_plane/src/lib.rs @@ -9,76 +9,68 @@ use std::fs::File; use std::fs::{self, OpenOptions}; +use std::net::TcpStream; use std::path::{Path, PathBuf}; use std::process::Command; use std::str; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::thread; +use std::time::Duration; use std::{ io::Write, net::{IpAddr, Ipv4Addr, SocketAddr}, }; -use lazy_static::lazy_static; +pub mod local_env; +use local_env::LocalEnv; use postgres::{Client, NoTls}; -lazy_static! { - // postgres would be there if it was build by 'make postgres' here in the repo - pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_install/bin"); - pub static ref PG_LIB_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_install/lib"); - - pub static ref BIN_DIR : PathBuf = cargo_bin_dir(); - - pub static ref TEST_WORKDIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("tmp_check"); -} - -// Find the directory where the binaries were put (i.e. target/debug/) -pub fn cargo_bin_dir() -> PathBuf { - let mut pathbuf = std::env::current_exe().ok().unwrap(); - - pathbuf.pop(); - if pathbuf.ends_with("deps") { - pathbuf.pop(); - } - - return pathbuf; -} - +// +// Collection of several example deployments useful for tests. // // I'm intendedly modelling storage and compute control planes as a separate entities // as it is closer to the actual setup. // -pub struct StorageControlPlane { +pub struct TestStorageControlPlane { pub wal_acceptors: Vec, - pub page_servers: Vec, + pub pageserver: Arc, + pub test_done: AtomicBool, } -impl StorageControlPlane { +impl TestStorageControlPlane { // postgres <-> page_server - pub fn one_page_server() -> StorageControlPlane { - let mut cplane = StorageControlPlane { - wal_acceptors: Vec::new(), - page_servers: Vec::new(), - }; + pub fn one_page_server() -> TestStorageControlPlane { + let env = local_env::test_env(); - let pserver = PageServerNode { - page_service_addr: "127.0.0.1:65200".parse().unwrap(), - data_dir: TEST_WORKDIR.join("pageserver"), - }; + let pserver = Arc::new(PageServerNode { + env: env.clone(), + kill_on_exit: true, + }); pserver.init(); pserver.start(); - cplane.page_servers.push(pserver); - cplane + TestStorageControlPlane { + wal_acceptors: Vec::new(), + pageserver: pserver, + test_done: AtomicBool::new(false), + } } - pub fn fault_tolerant(redundancy: usize) -> StorageControlPlane { - let mut cplane = StorageControlPlane { + // postgres <-> {wal_acceptor1, wal_acceptor2, ...} + pub fn fault_tolerant(redundancy: usize) -> TestStorageControlPlane { + let env = local_env::test_env(); + let mut cplane = TestStorageControlPlane { wal_acceptors: Vec::new(), - page_servers: Vec::new(), + pageserver: Arc::new(PageServerNode { + env: env.clone(), + kill_on_exit: true, + }), + test_done: AtomicBool::new(false), }; + cplane.pageserver.init(); + cplane.pageserver.start(); + const WAL_ACCEPTOR_PORT: usize = 54321; for i in 0..redundancy { @@ -86,7 +78,8 @@ impl StorageControlPlane { listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i) .parse() .unwrap(), - data_dir: TEST_WORKDIR.join(format!("wal_acceptor_{}", i)), + data_dir: env.data_dir.join(format!("wal_acceptor_{}", i)), + env: env.clone(), }; wal_acceptor.init(); wal_acceptor.start(); @@ -96,17 +89,7 @@ impl StorageControlPlane { } pub fn stop(&self) { - for wa in self.wal_acceptors.iter() { - wa.stop(); - } - } - - // // postgres <-> wal_acceptor x3 <-> page_server - // fn local(&mut self) -> StorageControlPlane { - // } - - pub fn page_server_addr(&self) -> &SocketAddr { - &self.page_servers[0].page_service_addr + self.test_done.store(true, Ordering::Relaxed); } pub fn get_wal_acceptor_conn_info(&self) -> String { @@ -117,13 +100,99 @@ impl StorageControlPlane { .join(",") } + pub fn is_running(&self) -> bool { + self.test_done.load(Ordering::Relaxed) + } +} + +impl Drop for TestStorageControlPlane { + fn drop(&mut self) { + self.stop(); + } +} + +// +// Control routines for pageserver. +// +// Used in CLI and tests. +// +pub struct PageServerNode { + kill_on_exit: bool, + env: LocalEnv, +} + +impl PageServerNode { + pub fn init(&self) { + fs::create_dir_all(self.env.pageserver_data_dir()).unwrap(); + } + + pub fn start(&self) { + println!( + "Starting pageserver at '{}'", + self.env.pageserver.listen_address + ); + + let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method + .args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()]) + .args(&[ + "-l", + self.env.pageserver.listen_address.to_string().as_str(), + ]) + .arg("-d") + .arg("--skip-recovery") + .env_clear() + .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("failed to start pageserver"); + + if !status.success() { + panic!("pageserver start failed"); + } + } + + pub fn stop(&self) { + let pidfile = self.env.pageserver_pidfile(); + let pid = fs::read_to_string(pidfile).unwrap(); + + let status = Command::new("kill") + .arg(pid) + .env_clear() + .status() + .expect("failed to execute kill"); + + if !status.success() { + panic!("kill start failed"); + } + + // await for pageserver stop + for _ in 0..5 { + let stream = TcpStream::connect(self.env.pageserver.listen_address); + if let Err(_e) = stream { + return; + } + println!( + "Stopping pageserver on {}", + self.env.pageserver.listen_address + ); + thread::sleep(Duration::from_secs(1)); + } + + // ok, we failed to stop pageserver, let's panic + panic!("Failed to stop pageserver"); + } + + pub fn address(&self) -> &std::net::SocketAddr { + &self.env.pageserver.listen_address + } + pub fn page_server_psql(&self, sql: &str) -> Vec { - let addr = &self.page_servers[0].page_service_addr; + // let addr = &self.page_servers[0].env.pageserver.listen_address; let connstring = format!( "host={} port={} dbname={} user={}", - addr.ip(), - addr.port(), + self.address().ip(), + self.address().port(), "no_db", "no_user", ); @@ -134,83 +203,23 @@ impl StorageControlPlane { } } -impl Drop for StorageControlPlane { - fn drop(&mut self) { - self.stop(); - } -} - -pub struct PageServerNode { - page_service_addr: SocketAddr, - data_dir: PathBuf, -} - -impl PageServerNode { - // TODO: method to force redo on a specific relation - - // TODO: make wal-redo-postgres workable without data directory? - pub fn init(&self) { - fs::create_dir_all(self.data_dir.clone()).unwrap(); - - let datadir_path = self.data_dir.join("wal_redo_pgdata"); - fs::remove_dir_all(datadir_path.to_str().unwrap()).ok(); - - let initdb = Command::new(PG_BIN_DIR.join("initdb")) - .args(&["-D", datadir_path.to_str().unwrap()]) - .arg("-N") - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("failed to execute initdb"); - if !initdb.success() { - panic!("initdb failed"); - } - } - - pub fn start(&self) { - println!("Starting pageserver at '{}'", self.page_service_addr); - - let status = Command::new(BIN_DIR.join("pageserver")) - .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-l", self.page_service_addr.to_string().as_str()]) - .arg("-d") - .arg("--skip-recovery") - .env_clear() - .env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary - .status() - .expect("failed to start pageserver"); - - if !status.success() { - panic!("pageserver start failed"); - } - } - - pub fn stop(&self) { - let pidfile = self.data_dir.join("pageserver.pid"); - let pid = fs::read_to_string(pidfile).unwrap(); - let status = Command::new("kill") - .arg(pid) - .env_clear() - .status() - .expect("failed to execute kill"); - - if !status.success() { - panic!("kill start failed"); - } - } -} - impl Drop for PageServerNode { fn drop(&mut self) { - self.stop(); - // fs::remove_dir_all(self.data_dir.clone()).unwrap(); + if self.kill_on_exit { + self.stop(); + } } } +// +// Control routines for WalAcceptor. +// +// Now used only in test setups. +// pub struct WalAcceptorNode { listen: SocketAddr, data_dir: PathBuf, + env: LocalEnv, } impl WalAcceptorNode { @@ -228,7 +237,7 @@ impl WalAcceptorNode { self.listen ); - let status = Command::new(BIN_DIR.join("wal_acceptor")) + let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor")) .args(&["-D", self.data_dir.to_str().unwrap()]) .args(&["-l", self.listen.to_string().as_str()]) .arg("-d") @@ -242,6 +251,7 @@ impl WalAcceptorNode { } pub fn stop(&self) { + println!("Stopping wal acceptor on {}", self.listen); let pidfile = self.data_dir.join("wal_acceptor.pid"); if let Ok(pid) = fs::read_to_string(pidfile) { let _status = Command::new("kill") @@ -256,7 +266,6 @@ impl WalAcceptorNode { impl Drop for WalAcceptorNode { fn drop(&mut self) { self.stop(); - // fs::remove_dir_all(self.data_dir.clone()).unwrap(); } } @@ -265,22 +274,25 @@ impl Drop for WalAcceptorNode { // // ComputeControlPlane // -pub struct ComputeControlPlane<'a> { +pub struct ComputeControlPlane { pg_bin_dir: PathBuf, work_dir: PathBuf, last_assigned_port: u16, - storage_cplane: &'a StorageControlPlane, + pageserver: Arc, nodes: Vec>, + env: LocalEnv, } -impl ComputeControlPlane<'_> { - pub fn local(storage_cplane: &StorageControlPlane) -> ComputeControlPlane { +impl ComputeControlPlane { + pub fn local(pageserver: &Arc) -> ComputeControlPlane { + let env = local_env::test_env(); ComputeControlPlane { - pg_bin_dir: PG_BIN_DIR.to_path_buf(), - work_dir: TEST_WORKDIR.to_path_buf(), + pg_bin_dir: env.pg_bin_dir(), + work_dir: env.data_dir.clone(), last_assigned_port: 65431, - storage_cplane: storage_cplane, + pageserver: Arc::clone(pageserver), nodes: Vec::new(), + env: env.clone(), } } @@ -296,24 +308,29 @@ impl ComputeControlPlane<'_> { let node_id = self.nodes.len() + 1; let node = PostgresNode { _node_id: node_id, - port: self.get_port(), - ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.get_port()), pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), - pg_bin_dir: self.pg_bin_dir.clone(), + env: self.env.clone(), + pageserver: Arc::clone(&self.pageserver), }; self.nodes.push(Arc::new(node)); let node = self.nodes.last().unwrap(); + println!( + "Creating new postgres: path={} port={}", + node.pgdata.to_str().unwrap(), + node.address.port() + ); + // initialize data directory fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok(); let initdb_path = self.pg_bin_dir.join("initdb"); - println!("initdb_path: {}", initdb_path.to_str().unwrap()); let initdb = Command::new(initdb_path) .args(&["-D", node.pgdata.to_str().unwrap()]) .arg("-N") .arg("--no-instructions") .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .status() .expect("failed to execute initdb"); @@ -340,8 +357,8 @@ impl ComputeControlPlane<'_> { listen_addresses = '{address}'\n\ port = {port}\n\ ", - address = node.ip, - port = node.port + address = node.address.ip(), + port = node.address.port() ) .as_str(), ); @@ -357,10 +374,10 @@ impl ComputeControlPlane<'_> { let node_id = self.nodes.len() + 1; let node = PostgresNode { _node_id: node_id, - port: self.get_port(), - ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.get_port()), pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), - pg_bin_dir: self.pg_bin_dir.clone(), + env: self.env.clone(), + pageserver: Arc::clone(&self.pageserver), }; self.nodes.push(Arc::new(node)); let node = self.nodes.last().unwrap(); @@ -375,7 +392,7 @@ impl ComputeControlPlane<'_> { .arg("--no-instructions") .arg("--compute-node") .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .status() .expect("failed to execute initdb"); @@ -398,8 +415,8 @@ impl ComputeControlPlane<'_> { port = {port}\n\ computenode_mode = true\n\ ", - address = node.ip, - port = node.port + address = node.address.ip(), + port = node.address.port() ) .as_str(), ); @@ -408,20 +425,18 @@ impl ComputeControlPlane<'_> { } pub fn new_node(&mut self) -> Arc { - let storage_cplane = self.storage_cplane; + let addr = self.pageserver.address().clone(); let node = self.new_vanilla_node(); - let pserver = storage_cplane.page_server_addr(); - // Configure that node to take pages from pageserver node.append_conf( "postgresql.conf", format!( "\ - page_server_connstring = 'host={} port={}'\n\ - ", - pserver.ip(), - pserver.port() + page_server_connstring = 'host={} port={}'\n\ + ", + addr.ip(), + addr.port() ) .as_str(), ); @@ -434,8 +449,7 @@ impl ComputeControlPlane<'_> { node.append_conf( "postgresql.conf", - "synchronous_standby_names = 'safekeeper_proxy'\n\ - ", + "synchronous_standby_names = 'safekeeper_proxy'\n", ); node.clone() } @@ -470,11 +484,11 @@ impl Drop for WalProposerNode { /////////////////////////////////////////////////////////////////////////////// pub struct PostgresNode { + pub address: SocketAddr, _node_id: usize, - pub port: u16, - pub ip: IpAddr, pgdata: PathBuf, - pg_bin_dir: PathBuf, + pub env: LocalEnv, + pageserver: Arc, } impl PostgresNode { @@ -488,7 +502,7 @@ impl PostgresNode { } fn pg_ctl(&self, args: &[&str], check_ok: bool) { - let pg_ctl_path = self.pg_bin_dir.join("pg_ctl"); + let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); let pg_ctl = Command::new(pg_ctl_path) .args( [ @@ -503,7 +517,7 @@ impl PostgresNode { .concat(), ) .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .status() .expect("failed to execute pg_ctl"); @@ -512,11 +526,10 @@ impl PostgresNode { } } - pub fn start(&self, storage_cplane: &StorageControlPlane) { - if storage_cplane.page_servers.len() != 0 { - let _res = - storage_cplane.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); - } + pub fn start(&self) { + let _res = self + .pageserver + .page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl(&["start"], true); } @@ -530,7 +543,12 @@ impl PostgresNode { } pub fn connstr(&self) -> String { - format!("host={} port={} user={}", self.ip, self.port, self.whoami()) + format!( + "host={} port={} user={}", + self.address.ip(), + self.address.port(), + self.whoami() + ) } // XXX: cache that in control plane @@ -549,8 +567,8 @@ impl PostgresNode { pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { let connstring = format!( "host={} port={} dbname={} user={}", - self.ip, - self.port, + self.address.ip(), + self.address.port(), db, self.whoami() ); @@ -563,8 +581,8 @@ impl PostgresNode { pub fn open_psql(&self, db: &str) -> Client { let connstring = format!( "host={} port={} dbname={} user={}", - self.ip, - self.port, + self.address.ip(), + self.address.port(), db, self.whoami() ); @@ -583,7 +601,7 @@ impl PostgresNode { File::create(filepath).unwrap(); } - let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal"); + let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); let pg_resetwal = Command::new(pg_resetwal_path) .args(&["-D", self.pgdata.to_str().unwrap()]) @@ -599,13 +617,13 @@ impl PostgresNode { } pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode { - let proxy_path = PG_BIN_DIR.join("safekeeper_proxy"); + let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy"); match Command::new(proxy_path.as_path()) .args(&["-s", &wal_acceptors]) - .args(&["-h", &self.ip.to_string()]) - .args(&["-p", &self.port.to_string()]) + .args(&["-h", &self.address.ip().to_string()]) + .args(&["-p", &self.address.port().to_string()]) .arg("-v") - .stderr(File::create(TEST_WORKDIR.join("safepkeeper_proxy.log")).unwrap()) + .stderr(File::create(self.env.data_dir.join("safepkeeper_proxy.log")).unwrap()) .spawn() { Ok(child) => WalProposerNode { pid: child.id() }, @@ -644,7 +662,7 @@ pub fn regress_check(pg: &PostgresNode) { .args(&[ "--bindir=''", "--use-existing", - format!("--bindir={}", PG_BIN_DIR.to_str().unwrap()).as_str(), + format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(), format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), format!( "--schedule={}", @@ -654,10 +672,10 @@ pub fn regress_check(pg: &PostgresNode) { format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), ]) .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .env("PGPORT", pg.port.to_string()) + .env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap()) + .env("PGHOST", pg.address.ip().to_string()) + .env("PGPORT", pg.address.port().to_string()) .env("PGUSER", pg.whoami()) - .env("PGHOST", pg.ip.to_string()) .status() .expect("pg_regress failed"); } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs new file mode 100644 index 0000000000..27f79a644c --- /dev/null +++ b/control_plane/src/local_env.rs @@ -0,0 +1,210 @@ +// +// This module is responsible for locating and loading paths in a local setup. +// +// Now it also provides init method which acts like a stub for proper installation +// script which will use local paths. +// +use std::env; +use std::error; +use std::fs; +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; + +use home; +use serde_derive::{Deserialize, Serialize}; + +type Result = std::result::Result>; + +// +// This data structure represents deserialized zenith config, which should be +// located in ~/.zenith +// +// TODO: should we also support ZENITH_CONF env var? +// +#[derive(Serialize, Deserialize, Clone)] +pub struct LocalEnv { + // Here page server and compute nodes will create and store their data. + pub data_dir: PathBuf, + + // Path to postgres distribution. It expected that "bin", "include", + // "lib", "share" from postgres distribution will be there. If at some point + // in time we will be able to run against vanilla postgres we may split that + // to four separate paths and match OS-specific installation layout. + pub pg_distrib_dir: PathBuf, + + // Path to pageserver binary. + pub zenith_distrib_dir: PathBuf, + + // PageServer-specific options. + pub pageserver: PageServerConf, +} + +impl LocalEnv { + pub fn pg_bin_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("bin") + } + pub fn pg_lib_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("lib") + } + + pub fn pageserver_data_dir(&self) -> PathBuf { + self.data_dir.join("pageserver") + } + pub fn pageserver_log(&self) -> PathBuf { + self.pageserver_data_dir().join("pageserver.log") + } + pub fn pageserver_pidfile(&self) -> PathBuf { + self.pageserver_data_dir().join("pageserver.pid") + } +} + +#[derive(Serialize, Deserialize, Clone)] +pub struct PageServerConf { + pub listen_address: SocketAddr, +} + +// +// Issues in rust-lang repo has several discussions about proper library to check +// home directory in a cross-platform way. Seems that current consensus is around +// home crate and cargo uses it. +// +fn get_home() -> Result { + match home::home_dir() { + Some(path) => Ok(path), + None => { + return Err(Box::::from( + "can not determine home directory path", + )); + } + } +} + +pub fn init() -> Result<()> { + let home_dir = get_home()?; + + // check if config already exists + let cfg_path = home_dir.join(".zenith"); + if cfg_path.exists() { + let err_msg = format!( + "{} already exists. Perhaps already initialized?", + cfg_path.to_str().unwrap() + ); + return Err(Box::::from(err_msg)); + } + + // Now we can run init only from crate directory, so check that current dir is our crate. + // Use 'pageserver/Cargo.toml' existence as evidendce. + let cargo_path = env::current_dir()?; + if !cargo_path.join("pageserver/Cargo.toml").exists() { + let err_msg = "Current dirrectory does not look like a zenith repo. \ + Please, run 'init' from zenith repo root."; + return Err(Box::::from(err_msg)); + } + + // ok, now check that expected binaries are present + + // check postgres + let pg_distrib_dir = cargo_path.join("tmp_install"); + let pg_path = pg_distrib_dir.join("bin/postgres"); + if !pg_path.exists() { + let err_msg = format!( + "Can't find postres binary at {}. \ + Perhaps './pgbuild.sh' is needed to build it first.", + pg_path.to_str().unwrap() + ); + return Err(Box::::from(err_msg)); + } + + // check pageserver + let zenith_distrib_dir = cargo_path.join("target/debug/"); + let pageserver_path = zenith_distrib_dir.join("pageserver"); + if !pageserver_path.exists() { + let err_msg = format!( + "Can't find pageserver binary at {}. Please build it.", + pageserver_path.to_str().unwrap() + ); + return Err(Box::::from(err_msg)); + } + + // ok, we are good to go + + // create data dir + let data_dir = cargo_path.join("tmp_install"); + match fs::create_dir(data_dir.clone()) { + Ok(_) => {} + Err(e) => match e.kind() { + std::io::ErrorKind::AlreadyExists => {} + _ => { + let err_msg = format!( + "Failed to create data directory in '{}': {}", + data_dir.to_str().unwrap(), + e + ); + return Err(Box::::from(err_msg)); + } + }, + } + + // write config + let conf = LocalEnv { + data_dir, + pg_distrib_dir, + zenith_distrib_dir, + pageserver: PageServerConf { + listen_address: "127.0.0.1:5430".parse().unwrap(), + }, + }; + let toml = toml::to_string(&conf)?; + fs::write(cfg_path, toml)?; + + Ok(()) +} + +// check that config file is present +pub fn load_config() -> Result { + // home + let home_dir = get_home()?; + + // check file exists + let cfg_path = home_dir.join(".zenith"); + if !cfg_path.exists() { + let err_msg = format!( + "Zenith config is not found in {}. You need to run 'zenith init' first", + cfg_path.to_str().unwrap() + ); + return Err(Box::::from(err_msg)); + } + + // load and parse file + let config = fs::read_to_string(cfg_path)?; + match toml::from_str(config.as_str()) { + Ok(cfg) => Ok(cfg), + Err(e) => Err(Box::::from(e)), + } +} + +// local env for tests +pub fn test_env() -> LocalEnv { + let data_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check"); + fs::create_dir_all(data_dir.clone()).unwrap(); + LocalEnv { + data_dir, + pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"), + zenith_distrib_dir: cargo_bin_dir(), + pageserver: PageServerConf { + listen_address: "127.0.0.1:65200".parse().unwrap(), + }, + } +} + +// Find the directory where the binaries were put (i.e. target/debug/) +pub fn cargo_bin_dir() -> PathBuf { + let mut pathbuf = std::env::current_exe().ok().unwrap(); + + pathbuf.pop(); + if pathbuf.ends_with("deps") { + pathbuf.pop(); + } + + return pathbuf; +} diff --git a/integration_tests/.gitignore b/integration_tests/.gitignore new file mode 100644 index 0000000000..80006e4280 --- /dev/null +++ b/integration_tests/.gitignore @@ -0,0 +1 @@ +tmp_check/ \ No newline at end of file diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 6481bdbe29..ad7913cc5c 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -14,3 +14,4 @@ tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "r pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } +control_plane = { path = "../control_plane" } diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 8adacb3c54..824da68b1b 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -1,8 +1,7 @@ #[allow(dead_code)] -mod control_plane; - +// mod control_plane; use control_plane::ComputeControlPlane; -use control_plane::StorageControlPlane; +use control_plane::TestStorageControlPlane; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff @@ -12,12 +11,12 @@ use control_plane::StorageControlPlane; #[test] fn test_redo_cases() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres let node = compute_cplane.new_node(); - node.start(&storage_cplane); + node.start(); // check basic work with table node.safe_psql( @@ -49,14 +48,15 @@ fn test_redo_cases() { // Runs pg_regress on a compute node #[test] +#[ignore] fn test_regress() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres let node = compute_cplane.new_node(); - node.start(&storage_cplane); + node.start(); control_plane::regress_check(&node); } @@ -65,14 +65,14 @@ fn test_regress() { #[test] fn test_pageserver_multitenancy() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // Allocate postgres instance, but don't start let node1 = compute_cplane.new_node(); let node2 = compute_cplane.new_node(); - node1.start(&storage_cplane); - node2.start(&storage_cplane); + node1.start(); + node2.start(); // check node1 node1.safe_psql( diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index df0d55413d..fbf474f27f 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -1,8 +1,6 @@ // Restart acceptors one by one while compute is under the load. -#[allow(dead_code)] -mod control_plane; use control_plane::ComputeControlPlane; -use control_plane::StorageControlPlane; +use control_plane::TestStorageControlPlane; use rand::Rng; use std::sync::Arc; @@ -13,13 +11,13 @@ use std::{thread, time}; fn test_acceptors_normal_work() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgre + // start postgres let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + node.start(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -50,14 +48,14 @@ fn test_acceptors_restarts() { const REDUNDANCY: usize = 3; const FAULT_PROBABILITY: f32 = 0.01; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let mut rng = rand::thread_rng(); - // start postgre + // start postgres let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + node.start(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -93,7 +91,7 @@ fn test_acceptors_restarts() { assert_eq!(count, 500500); } -fn start_acceptor(cplane: &Arc, no: usize) { +fn start_acceptor(cplane: &Arc, no: usize) { let cp = cplane.clone(); thread::spawn(move || { thread::sleep(time::Duration::from_secs(1)); @@ -109,13 +107,13 @@ fn test_acceptors_unavalability() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 2; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgre + // start postgres let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + node.start(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -157,11 +155,11 @@ fn test_acceptors_unavalability() { assert_eq!(count, 15); } -fn simulate_failures(cplane: &Arc) { +fn simulate_failures(cplane: Arc) { let mut rng = rand::thread_rng(); let n_acceptors = cplane.wal_acceptors.len(); let failure_period = time::Duration::from_secs(1); - loop { + while cplane.is_running() { thread::sleep(failure_period); let mask: u32 = rng.gen_range(0..(1 << n_acceptors)); for i in 0..n_acceptors { @@ -184,13 +182,13 @@ fn test_race_conditions() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant(REDUNDANCY)); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgre + // start postgres let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + node.start(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -200,10 +198,10 @@ fn test_race_conditions() { "postgres", "CREATE TABLE t(key int primary key, value text)", ); - let cplane = Arc::new(storage_cplane); - let cp = cplane.clone(); - thread::spawn(move || { - simulate_failures(&cp); + + let cp = storage_cplane.clone(); + let failures_thread = thread::spawn(move || { + simulate_failures(cp); }); let mut psql = node.open_psql("postgres"); @@ -218,5 +216,7 @@ fn test_race_conditions() { .get(0); println!("sum = {}", count); assert_eq!(count, 500500); - cplane.stop(); + + storage_cplane.stop(); + failures_thread.join().unwrap(); } diff --git a/zenith/Cargo.toml b/zenith/Cargo.toml new file mode 100644 index 0000000000..2d1f7c922c --- /dev/null +++ b/zenith/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "zenith" +version = "0.1.0" +authors = ["Stas Kelvich "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = "2.33.0" +control_plane = { path = "../control_plane" } diff --git a/zenith/src/main.rs b/zenith/src/main.rs new file mode 100644 index 0000000000..20b48ba5f1 --- /dev/null +++ b/zenith/src/main.rs @@ -0,0 +1,119 @@ +use clap::{App, SubCommand}; +use std::fs; +use std::process::exit; +use std::process::Command; + +use control_plane::local_env; + +fn main() { + let matches = App::new("zenith") + .subcommand(SubCommand::with_name("init")) + .subcommand(SubCommand::with_name("start")) + .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("status")) + .subcommand( + SubCommand::with_name("pg") + .about("Manage postgres instances") + .subcommand(SubCommand::with_name("create")) + .subcommand(SubCommand::with_name("start")) + .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("destroy")), + ) + .subcommand( + SubCommand::with_name("snapshot") + .about("Manage database snapshots") + .subcommand(SubCommand::with_name("create")) + .subcommand(SubCommand::with_name("start")) + .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("destroy")), + ) + .get_matches(); + + // handle init separately and exit + if let Some("init") = matches.subcommand_name() { + match local_env::init() { + Ok(_) => { + println!("Initialization complete! You may start zenith with 'zenith start' now."); + exit(0); + } + Err(e) => { + eprintln!("Error during init: {}", e); + exit(1); + } + } + } + + // all other commands would need config + let conf = match local_env::load_config() { + Ok(conf) => conf, + Err(e) => { + eprintln!("Error loading config from ~/.zenith: {}", e); + exit(1); + } + }; + + match matches.subcommand() { + ("init", Some(_)) => { + panic!() /* init was handled before */ + } + + ("start", Some(_sub_m)) => { + println!( + "Starting pageserver at '{}'", + conf.pageserver.listen_address + ); + + let status = Command::new(conf.zenith_distrib_dir.join("pageserver")) + .args(&["-D", conf.data_dir.to_str().unwrap()]) + .args(&["-l", conf.pageserver.listen_address.to_string().as_str()]) + .arg("-d") + .arg("--skip-recovery") + .env_clear() + .env("PATH", conf.pg_bin_dir().to_str().unwrap()) // pageserver needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) + .status() + .expect("failed to start pageserver"); + + if !status.success() { + eprintln!( + "Pageserver failed to start. See '{}' for details.", + conf.pageserver_log().to_str().unwrap() + ); + exit(1); + } + + // TODO: check it's actually started, or run status + + println!("Done!"); + } + + ("stop", Some(_sub_m)) => { + let pid = fs::read_to_string(conf.pageserver_pidfile()).unwrap(); + let status = Command::new("kill") + .arg(pid) + .env_clear() + .status() + .expect("failed to execute kill"); + + if !status.success() { + eprintln!("Failed to kill pageserver"); + exit(1); + } + + println!("Done!"); + } + + ("status", Some(_sub_m)) => {} + + ("pg", Some(pg_match)) => { + match pg_match.subcommand() { + ("start", Some(_sub_m)) => { + println!("xxx: pg start"); + // Ok(()) + } + _ => {} + } + } + _ => {} + } +}