diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 1efc44dedf..450b93d85a 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -4,6 +4,7 @@ on: [push] jobs: regression-check: + timeout-minutes: 10 name: run regression test suite runs-on: ubuntu-latest @@ -76,10 +77,7 @@ jobs: target key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - # That build is only to build dependencies and can be skipped if Cargo.lock - # wasn't changed. Next steps need their own build - - name: Install cargo deps - if: steps.cache_cargo.outputs.cache-hit != 'true' + - name: Build run: | cargo build diff --git a/.gitignore b/.gitignore index 2f22547efd..768b75b413 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target /tmp_check /tmp_install +/tmp_check_cli +.vscode diff --git a/Cargo.lock b/Cargo.lock index fb694ec2f6..be86750a77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -420,6 +420,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "control_plane" +version = "0.1.0" +dependencies = [ + "home", + "lazy_static", + "postgres", + "rand 0.8.3", + "regex", + "serde", + "serde_derive", + "tokio-postgres", + "toml", +] + [[package]] name = "core-foundation" version = "0.9.1" @@ -843,6 +858,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2456aef2e6b6a9784192ae780c0f15bc57df0e918585282325e8c8ac27737654" +dependencies = [ + "winapi", +] + [[package]] name = "http" version = "0.2.3" @@ -894,7 +918,7 @@ dependencies = [ "httpdate", "itoa", "pin-project", - "socket2 0.4.0", + "socket2", "tokio", "tower-service", "tracing", @@ -948,12 +972,11 @@ dependencies = [ name = "integration_tests" version = "0.1.0" dependencies = [ + "control_plane", "lazy_static", - "pageserver", "postgres", "rand 0.8.3", "tokio-postgres", - "walkeeper", ] [[package]] @@ -1146,7 +1169,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a19900e7eee95eb2b3c2e26d12a874cc80aaf750e31be6fcbe743ead369fa45d" dependencies = [ "libc", - "socket2 0.4.0", + "socket2", ] [[package]] @@ -1276,6 +1299,7 @@ dependencies = [ "log", "postgres", "postgres-protocol", + "postgres-types", "rand 0.8.3", "regex", "rocksdb", @@ -1286,10 +1310,12 @@ dependencies = [ "slog-stdlog", "slog-term", "termion", + "thiserror", "tokio", "tokio-postgres", "tokio-stream", "tui", + "walkdir", ] [[package]] @@ -1406,8 +1432,8 @@ dependencies = [ [[package]] name = "postgres" -version = "0.19.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.19.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "bytes", "fallible-iterator", @@ -1419,8 +1445,8 @@ dependencies = [ [[package]] name = "postgres-protocol" -version = "0.6.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.6.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "base64", "byteorder", @@ -1436,8 +1462,8 @@ dependencies = [ [[package]] name = "postgres-types" -version = "0.2.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.2.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "bytes", "fallible-iterator", @@ -1701,7 +1727,7 @@ dependencies = [ [[package]] name = "rust-s3" version = "0.27.0-beta1" -source = "git+https://github.com/hlinnaka/rust-s3#7f15a24ec7daa0a5d9516da706212745f9042818" +source = "git+https://github.com/hlinnaka/rust-s3?rev=7f15a24ec7daa0a5d9516da706212745f9042818#7f15a24ec7daa0a5d9516da706212745f9042818" dependencies = [ "async-std", "async-trait", @@ -1756,6 +1782,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.19" @@ -1967,17 +2002,6 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" -[[package]] -name = "socket2" -version = "0.3.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "winapi", -] - [[package]] name = "socket2" version = "0.4.0" @@ -2171,8 +2195,8 @@ dependencies = [ [[package]] name = "tokio-postgres" -version = "0.7.0" -source = "git+https://github.com/kelvich/rust-postgres?branch=replication_rebase#f3425d991f75cb7b464a37e6b3d5d05f8bf51c02" +version = "0.7.1" +source = "git+https://github.com/zenithdb/rust-postgres.git?rev=a0d067b66447951d1276a53fb09886539c3fa094#a0d067b66447951d1276a53fb09886539c3fa094" dependencies = [ "async-trait", "byteorder", @@ -2183,10 +2207,10 @@ dependencies = [ "parking_lot", "percent-encoding", "phf", - "pin-project", + "pin-project-lite", "postgres-protocol", "postgres-types", - "socket2 0.3.19", + "socket2", "tokio", "tokio-util", ] @@ -2216,6 +2240,15 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa" +dependencies = [ + "serde", +] + [[package]] name = "tower-service" version = "0.3.1" @@ -2354,6 +2387,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "walkeeper" version = "0.1.0" @@ -2370,7 +2414,6 @@ dependencies = [ "futures", "lazy_static", "log", - "pageserver", "postgres", "postgres-protocol", "rand 0.8.3", @@ -2519,6 +2562,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" @@ -2539,3 +2591,11 @@ name = "xml-rs" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" + +[[package]] +name = "zenith" +version = "0.1.0" +dependencies = [ + "clap", + "control_plane", +] diff --git a/Cargo.toml b/Cargo.toml index b6809e2ad7..f4d6314283 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,4 +3,6 @@ members = [ "integration_tests", "pageserver", "walkeeper", + "zenith", + "control_plane", ] diff --git a/README.md b/README.md index a9e6633f45..b7c745bcb8 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,54 @@ Zenith substitutes PostgreSQL storage layer and redistributes data across a cluster of nodes +## Running local installation + +1. Build zenith and patched postgres +```sh +git clone --recursive https://github.com/libzenith/zenith.git +cd zenith +./pgbuild.sh # builds postgres and installs it to ./tmp_install +cargo build +``` + +2. Start pageserver and postggres on top of it (should be called from repo root): +```sh +# Create ~/.zenith with proper paths to binaries and data +# Later that would be responsibility of a package install script +>./target/debug/zenith init + +# start pageserver +> ./target/debug/zenith start +Starting pageserver at '127.0.0.1:64000' + +# create and configure postgres data dir +> ./target/debug/zenith pg create +Creating new postgres: path=/Users/user/code/zenith/tmp_check_cli/compute/pg1 port=55432 +Database initialized + +# start it +> ./target/debug/zenith pg start pg1 + +# look up status and connection info +> ./target/debug/zenith pg list +NODE ADDRESS STATUS +pg1 127.0.0.1:55432 running +``` + +3. Now it is possible to connect to postgres and run some queries: +``` +> psql -p55432 -h 127.0.0.1 postgres +postgres=# CREATE TABLE t(key int primary key, value text); +CREATE TABLE +postgres=# insert into t values(1,1); +INSERT 0 1 +postgres=# select * from t; + key | value +-----+------- + 1 | 1 +(1 row) +``` + ## Running tests ```sh diff --git a/control_plane/.gitignore b/control_plane/.gitignore new file mode 100644 index 0000000000..c1e54a6bcb --- /dev/null +++ b/control_plane/.gitignore @@ -0,0 +1 @@ +tmp_check/ diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml new file mode 100644 index 0000000000..7281595c18 --- /dev/null +++ b/control_plane/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "control_plane" +version = "0.1.0" +authors = ["Stas Kelvich "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +rand = "0.8.3" +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } + +serde = "" +serde_derive = "" +toml = "" +home = "0.5.3" +lazy_static = "" +regex = "1" diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs new file mode 100644 index 0000000000..b39d901be7 --- /dev/null +++ b/control_plane/src/compute.rs @@ -0,0 +1,431 @@ +use std::error; +use std::fs::File; +use std::fs::{self, OpenOptions}; +use std::net::TcpStream; +use std::process::{Command, Stdio}; +use std::sync::Arc; +use std::time::Duration; +use std::{collections::BTreeMap, path::PathBuf}; +use std::{io::Write, net::SocketAddr}; + +use lazy_static::lazy_static; +use postgres::{Client, NoTls}; +use regex::Regex; + +use crate::local_env::{self, LocalEnv}; +use crate::storage::{PageServerNode, WalProposerNode}; + +type Result = std::result::Result>; + +// +// ComputeControlPlane +// +pub struct ComputeControlPlane { + base_port: u16, + pageserver: Arc, + pub nodes: BTreeMap>, + env: LocalEnv, +} + +impl ComputeControlPlane { + // Load current nodes with ports from data directories on disk + pub fn load(env: LocalEnv) -> Result { + // TODO: since pageserver do not have config file yet we believe here that + // it is running on default port. Change that when pageserver will have config. + let pageserver = Arc::new(PageServerNode::from_env(&env)); + + let nodes: Result> = fs::read_dir(env.compute_dir()) + .map_err(|e| { + format!( + "failed to list {}: {}", + env.compute_dir().to_str().unwrap(), + e + ) + })? + .into_iter() + .map(|f| { + PostgresNode::from_dir_entry(f?, &env, &pageserver) + .map(|node| (node.name.clone(), Arc::new(node))) + }) + .collect(); + let nodes = nodes?; + + Ok(ComputeControlPlane { + base_port: 55431, + pageserver, + nodes, + env, + }) + } + + fn get_port(&mut self) -> u16 { + 1 + self + .nodes + .iter() + .map(|(_name, node)| node.address.port()) + .max() + .unwrap_or(self.base_port) + } + + pub fn local(pageserver: &Arc) -> ComputeControlPlane { + let env = local_env::test_env(); + ComputeControlPlane { + base_port: 65431, + pageserver: Arc::clone(pageserver), + nodes: BTreeMap::new(), + env, + } + } + + fn new_vanilla_node(&mut self, is_test: bool) -> Result> { + // allocate new node entry with generated port + let node_id = self.nodes.len() as u32 + 1; + let node = Arc::new(PostgresNode { + name: format!("pg{}", node_id), + address: SocketAddr::new("127.0.0.1".parse().unwrap(), self.get_port()), + env: self.env.clone(), + pageserver: Arc::clone(&self.pageserver), + is_test, + }); + node.init_vanilla()?; + self.nodes.insert(node.name.clone(), Arc::clone(&node)); + + Ok(node) + } + + pub fn new_test_node(&mut self) -> Arc { + let addr = self.pageserver.address().clone(); + let node = self.new_vanilla_node(true).unwrap(); + + // Configure that node to take pages from pageserver + node.append_conf( + "postgresql.conf", + format!( + "page_server_connstring = 'host={} port={}'\n", + addr.ip(), + addr.port() + ) + .as_str(), + ); + + node + } + + pub fn new_test_master_node(&mut self) -> Arc { + let node = self.new_vanilla_node(true).unwrap(); + println!("Create vanilla node at {:?}", node.address); + node.append_conf( + "postgresql.conf", + "synchronous_standby_names = 'safekeeper_proxy'\n", + ); + + node + } + + pub fn new_node(&mut self) -> Result> { + let addr = self.pageserver.address().clone(); + let node = self.new_vanilla_node(false)?; + + // Configure that node to take pages from pageserver + node.append_conf( + "postgresql.conf", + format!( + "page_server_connstring = 'host={} port={}'\n", + addr.ip(), + addr.port() + ) + .as_str(), + ); + + Ok(node) + } +} + +/////////////////////////////////////////////////////////////////////////////// + +pub struct PostgresNode { + pub address: SocketAddr, + name: String, + pub env: LocalEnv, + pageserver: Arc, + is_test: bool, +} + +impl PostgresNode { + fn from_dir_entry( + entry: std::fs::DirEntry, + env: &LocalEnv, + pageserver: &Arc, + ) -> Result { + if !entry.file_type()?.is_dir() { + let err_msg = format!( + "PostgresNode::from_dir_entry failed: '{}' is not a directory", + entry.path().to_str().unwrap() + ); + return Err(err_msg.into()); + } + + lazy_static! { + static ref CONF_PORT_RE: Regex = Regex::new(r"(?m)^\s*port\s*=\s*(\d+)\s*$").unwrap(); + } + + // parse data directory name + let fname = entry.file_name(); + let name = fname.to_str().unwrap().to_string(); + + // find out tcp port in config file + let cfg_path = entry.path().join("postgresql.conf"); + let config = fs::read_to_string(cfg_path.clone()).map_err(|e| { + format!( + "failed to read config file in {}: {}", + cfg_path.to_str().unwrap(), + e + ) + })?; + + let err_msg = format!( + "failed to find port definition in config file {}", + cfg_path.to_str().unwrap() + ); + let port: u16 = CONF_PORT_RE + .captures(config.as_str()) + .ok_or(err_msg.clone() + " 1")? + .iter() + .last() + .ok_or(err_msg.clone() + " 3")? + .ok_or(err_msg.clone() + " 3")? + .as_str() + .parse() + .map_err(|e| format!("{}: {}", err_msg, e))?; + + // ok now + Ok(PostgresNode { + address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), + name, + env: env.clone(), + pageserver: Arc::clone(pageserver), + is_test: false, + }) + } + + fn init_vanilla(&self) -> Result<()> { + println!( + "Creating new postgres: path={} port={}", + self.pgdata().to_str().unwrap(), + self.address.port() + ); + + // initialize data directory + + if self.is_test { + fs::remove_dir_all(self.pgdata().to_str().unwrap()).ok(); + } + + fs::create_dir_all(self.pgdata().to_str().unwrap())?; + + let initdb_path = self.env.pg_bin_dir().join("initdb"); + let initdb = Command::new(initdb_path) + .args(&["-D", self.pgdata().to_str().unwrap()]) + .arg("-N") + .arg("-A trust") + .arg("--no-instructions") + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .stdout(Stdio::null()) + .status()?; + + if !initdb.success() { + return Err("initdb failed".into()); + } + + // listen for selected port + self.append_conf( + "postgresql.conf", + format!( + "max_wal_senders = 10\n\ + max_replication_slots = 10\n\ + hot_standby = on\n\ + shared_buffers = 1MB\n\ + max_connections = 100\n\ + wal_level = replica\n\ + listen_addresses = '{address}'\n\ + port = {port}\n", + address = self.address.ip(), + port = self.address.port() + ) + .as_str(), + ); + + println!("Database initialized"); + Ok(()) + } + + pub fn pgdata(&self) -> PathBuf { + self.env.compute_dir().join(self.name.clone()) + } + + pub fn status(&self) -> &str { + let timeout = Duration::from_millis(300); + let has_pidfile = self.pgdata().join("postmaster.pid").exists(); + let can_connect = TcpStream::connect_timeout(&self.address, timeout).is_ok(); + + match (has_pidfile, can_connect) { + (true, true) => "running", + (false, false) => "stopped", + (true, false) => "crashed", + (false, true) => "running, no pidfile", + } + } + + pub fn append_conf(&self, config: &str, opts: &str) { + OpenOptions::new() + .append(true) + .open(self.pgdata().join(config).to_str().unwrap()) + .unwrap() + .write_all(opts.as_bytes()) + .unwrap(); + } + + fn pg_ctl(&self, args: &[&str]) -> Result<()> { + let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); + let pg_ctl = Command::new(pg_ctl_path) + .args( + [ + &[ + "-D", + self.pgdata().to_str().unwrap(), + "-l", + self.pgdata().join("log").to_str().unwrap(), + ], + args, + ] + .concat(), + ) + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status()?; + + if !pg_ctl.success() { + Err("pg_ctl failed".into()) + } else { + Ok(()) + } + } + + pub fn start(&self) -> Result<()> { + let _res = self + .pageserver + .page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); + println!("Starting postgres node at '{}'", self.connstr()); + self.pg_ctl(&["start"]) + } + + pub fn restart(&self) -> Result<()> { + self.pg_ctl(&["restart"]) + } + + pub fn stop(&self) -> Result<()> { + self.pg_ctl(&["-m", "immediate", "stop"]) + } + + pub fn connstr(&self) -> String { + format!( + "host={} port={} user={}", + self.address.ip(), + self.address.port(), + self.whoami() + ) + } + + // XXX: cache that in control plane + pub fn whoami(&self) -> String { + let output = Command::new("whoami") + .output() + .expect("failed to execute whoami"); + + if !output.status.success() { + panic!("whoami failed"); + } + + String::from_utf8(output.stdout).unwrap().trim().to_string() + } + + pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.address.ip(), + self.address.port(), + db, + self.whoami() + ); + let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); + + println!("Running {}", sql); + client.query(sql, &[]).unwrap() + } + + pub fn open_psql(&self, db: &str) -> Client { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.address.ip(), + self.address.port(), + db, + self.whoami() + ); + Client::connect(connstring.as_str(), NoTls).unwrap() + } + + /* Create stub controlfile and respective xlog to start computenode */ + pub fn setup_controlfile(&self) { + let filepath = format!("{}/global/pg_control", self.pgdata().to_str().unwrap()); + + { + File::create(filepath).unwrap(); + } + + let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); + + let pg_resetwal = Command::new(pg_resetwal_path) + .args(&["-D", self.pgdata().to_str().unwrap()]) + .arg("-f") + // TODO probably we will have to modify pg_resetwal + // .arg("--compute-node") + .status() + .expect("failed to execute pg_resetwal"); + + if !pg_resetwal.success() { + panic!("pg_resetwal failed"); + } + } + + pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode { + let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy"); + match Command::new(proxy_path.as_path()) + .args(&["-s", &wal_acceptors]) + .args(&["-h", &self.address.ip().to_string()]) + .args(&["-p", &self.address.port().to_string()]) + .arg("-v") + .stderr(OpenOptions::new() + .append(true) + .open(self.env.data_dir.join("safepkeeper_proxy.log")).unwrap()) + .spawn() + { + Ok(child) => WalProposerNode { pid: child.id() }, + Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e), + } + } + + // TODO + pub fn pg_bench() {} +} + +impl Drop for PostgresNode { + // destructor to clean up state after test is done + // XXX: we may detect failed test by setting some flag in catch_unwind() + // and checking it here. But let just clean datadirs on start. + fn drop(&mut self) { + if self.is_test { + let _ = self.stop(); + } + } +} diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs new file mode 100644 index 0000000000..a49d39150a --- /dev/null +++ b/control_plane/src/lib.rs @@ -0,0 +1,12 @@ +// +// Local control plane. +// +// Can start, cofigure and stop postgres instances running as a local processes. +// +// Intended to be used in integration tests and in CLI tools for +// local installations. +// + +pub mod compute; +pub mod local_env; +pub mod storage; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs new file mode 100644 index 0000000000..241fba2f62 --- /dev/null +++ b/control_plane/src/local_env.rs @@ -0,0 +1,187 @@ +// +// This module is responsible for locating and loading paths in a local setup. +// +// Now it also provides init method which acts like a stub for proper installation +// script which will use local paths. +// +use std::env; +use std::error; +use std::fs; +use std::path::{Path, PathBuf}; + +use serde_derive::{Deserialize, Serialize}; + +type Result = std::result::Result>; + +// +// This data structure represents deserialized zenith config, which should be +// located in ~/.zenith +// +// TODO: should we also support ZENITH_CONF env var? +// +#[derive(Serialize, Deserialize, Clone)] +pub struct LocalEnv { + // Here page server and compute nodes will create and store their data. + pub data_dir: PathBuf, + + // Path to postgres distribution. It expected that "bin", "include", + // "lib", "share" from postgres distribution will be there. If at some point + // in time we will be able to run against vanilla postgres we may split that + // to four separate paths and match OS-specific installation layout. + pub pg_distrib_dir: PathBuf, + + // Path to pageserver binary. + pub zenith_distrib_dir: PathBuf, +} + +impl LocalEnv { + // postgres installation + pub fn pg_bin_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("bin") + } + pub fn pg_lib_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("lib") + } + + // pageserver + pub fn pageserver_data_dir(&self) -> PathBuf { + self.data_dir.join("pageserver") + } + pub fn pageserver_log(&self) -> PathBuf { + self.pageserver_data_dir().join("pageserver.log") + } + pub fn pageserver_pidfile(&self) -> PathBuf { + self.pageserver_data_dir().join("pageserver.pid") + } + + // compute nodes + pub fn compute_dir(&self) -> PathBuf { + self.data_dir.join("compute") + } +} + +// +// Issues in rust-lang repo has several discussions about proper library to check +// home directory in a cross-platform way. Seems that current consensus is around +// home crate and cargo uses it. +// +fn get_home() -> Result { + home::home_dir().ok_or("can not determine home directory path".into()) +} + +pub fn init() -> Result<()> { + let home_dir = get_home()?; + + // check if config already exists + let cfg_path = home_dir.join(".zenith"); + if cfg_path.exists() { + let err_msg = format!( + "{} already exists. Perhaps already initialized?", + cfg_path.to_str().unwrap() + ); + return Err(err_msg.into()); + } + + // Now we can run init only from crate directory, so check that current dir is our crate. + // Use 'pageserver/Cargo.toml' existence as evidendce. + let cargo_path = env::current_dir()?; + if !cargo_path.join("pageserver/Cargo.toml").exists() { + let err_msg = "Current dirrectory does not look like a zenith repo. \ + Please, run 'init' from zenith repo root."; + return Err(err_msg.into()); + } + + // ok, now check that expected binaries are present + + // check postgres + let pg_distrib_dir = cargo_path.join("tmp_install"); + let pg_path = pg_distrib_dir.join("bin/postgres"); + if !pg_path.exists() { + let err_msg = format!( + "Can't find postres binary at {}. \ + Perhaps './pgbuild.sh' is needed to build it first.", + pg_path.to_str().unwrap() + ); + return Err(err_msg.into()); + } + + // check pageserver + let zenith_distrib_dir = cargo_path.join("target/debug/"); + let pageserver_path = zenith_distrib_dir.join("pageserver"); + if !pageserver_path.exists() { + let err_msg = format!( + "Can't find pageserver binary at {}. Please build it.", + pageserver_path.to_str().unwrap() + ); + return Err(err_msg.into()); + } + + // ok, we are good to go + + // create dirs + let data_dir = cargo_path.join("tmp_check_cli"); + + for &dir in &["compute", "pageserver"] { + fs::create_dir_all(data_dir.join(dir)).map_err(|e| { + format!( + "Failed to create directory in '{}': {}", + data_dir.to_str().unwrap(), + e + ) + })?; + } + + // write config + let conf = LocalEnv { + data_dir, + pg_distrib_dir, + zenith_distrib_dir, + }; + let toml = toml::to_string(&conf)?; + fs::write(cfg_path, toml)?; + + Ok(()) +} + +// check that config file is present +pub fn load_config() -> Result { + // home + let home_dir = get_home()?; + + // check file exists + let cfg_path = home_dir.join(".zenith"); + if !cfg_path.exists() { + let err_msg = format!( + "Zenith config is not found in {}. You need to run 'zenith init' first", + cfg_path.to_str().unwrap() + ); + return Err(err_msg.into()); + } + + // load and parse file + let config = fs::read_to_string(cfg_path)?; + toml::from_str(config.as_str()).map_err(|e| e.into()) +} + +// local env for tests +pub fn test_env() -> LocalEnv { + let data_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_check"); + fs::create_dir_all(data_dir.clone()).unwrap(); + LocalEnv { + data_dir, + pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"), + zenith_distrib_dir: cargo_bin_dir(), + } +} + +// Find the directory where the binaries were put (i.e. target/debug/) +pub fn cargo_bin_dir() -> PathBuf { + let mut pathbuf = std::env::current_exe().unwrap(); + + pathbuf.pop(); + if pathbuf.ends_with("deps") { + pathbuf.pop(); + } + + return pathbuf; +} diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs new file mode 100644 index 0000000000..eba2966849 --- /dev/null +++ b/control_plane/src/storage.rs @@ -0,0 +1,398 @@ +use std::error; +use std::fs; +use std::io; +use std::net::SocketAddr; +use std::net::TcpStream; +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use postgres::{Client, NoTls}; + +use crate::compute::PostgresNode; +use crate::local_env::{self, LocalEnv}; + +type Result = std::result::Result>; + +// +// Collection of several example deployments useful for tests. +// +// I'm intendedly modelling storage and compute control planes as a separate entities +// as it is closer to the actual setup. +// +pub struct TestStorageControlPlane { + pub wal_acceptors: Vec, + pub pageserver: Arc, + pub test_done: AtomicBool, +} + +impl TestStorageControlPlane { + // postgres <-> page_server + pub fn one_page_server(pgdata_base_path: String) -> TestStorageControlPlane { + let env = local_env::test_env(); + + let pserver = Arc::new(PageServerNode { + env: env.clone(), + kill_on_exit: true, + listen_address: None, + }); + pserver.init(); + + if pgdata_base_path.is_empty() { + pserver.start().unwrap(); + } else { + pserver.start_fromdatadir(pgdata_base_path).unwrap(); + } + + TestStorageControlPlane { + wal_acceptors: Vec::new(), + pageserver: pserver, + test_done: AtomicBool::new(false), + } + } + + pub fn one_page_server_no_start() -> TestStorageControlPlane { + let env = local_env::test_env(); + + let pserver = Arc::new(PageServerNode { + env, + kill_on_exit: true, + listen_address: None, + }); + pserver.init(); + + TestStorageControlPlane { + wal_acceptors: Vec::new(), + pageserver: pserver, + test_done: AtomicBool::new(false), + } + } + + // postgres <-> {wal_acceptor1, wal_acceptor2, ...} + pub fn fault_tolerant(redundancy: usize) -> TestStorageControlPlane { + let env = local_env::test_env(); + let mut cplane = TestStorageControlPlane { + wal_acceptors: Vec::new(), + pageserver: Arc::new(PageServerNode { + env: env.clone(), + kill_on_exit: true, + listen_address: None, + }), + test_done: AtomicBool::new(false), + }; + cplane.pageserver.init(); + cplane.pageserver.start().unwrap(); + + const WAL_ACCEPTOR_PORT: usize = 54321; + + for i in 0..redundancy { + let wal_acceptor = WalAcceptorNode { + listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i) + .parse() + .unwrap(), + data_dir: env.data_dir.join(format!("wal_acceptor_{}", i)), + env: env.clone(), + }; + wal_acceptor.init(); + wal_acceptor.start(); + cplane.wal_acceptors.push(wal_acceptor); + } + cplane + } + + pub fn stop(&self) { + self.test_done.store(true, Ordering::Relaxed); + } + + pub fn get_wal_acceptor_conn_info(&self) -> String { + self.wal_acceptors + .iter() + .map(|wa| wa.listen.to_string()) + .collect::>() + .join(",") + } + + pub fn is_running(&self) -> bool { + self.test_done.load(Ordering::Relaxed) + } +} + +impl Drop for TestStorageControlPlane { + fn drop(&mut self) { + self.stop(); + } +} + +// +// Control routines for pageserver. +// +// Used in CLI and tests. +// +pub struct PageServerNode { + kill_on_exit: bool, + listen_address: Option, + pub env: LocalEnv, +} + +impl PageServerNode { + pub fn from_env(env: &LocalEnv) -> PageServerNode { + PageServerNode { + kill_on_exit: false, + listen_address: None, // default + env: env.clone(), + } + } + + pub fn address(&self) -> SocketAddr { + match self.listen_address { + Some(addr) => addr, + None => "127.0.0.1:64000".parse().unwrap(), + } + } + + pub fn init(&self) { + fs::create_dir_all(self.env.pageserver_data_dir()).unwrap(); + } + + pub fn start(&self) -> Result<()> { + println!("Starting pageserver at '{}'", self.address()); + + let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method + .args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()]) + .args(&["-l", self.address().to_string().as_str()]) + .arg("-d") + .env_clear() + .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status()?; + + if !status.success() { + return Err(Box::::from(format!( + "Pageserver failed to start. See '{}' for details.", + self.env.pageserver_log().to_str().unwrap() + ))); + } else { + return Ok(()); + } + } + + pub fn start_fromdatadir(&self, pgdata_base_path: String) -> Result<()> { + println!("Starting pageserver at '{}'", self.address()); + + let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method + .args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()]) + .args(&["-l", self.address().to_string().as_str()]) + .arg("-d") + .args(&["--restore-from", "local"]) + .env_clear() + .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("PGDATA_BASE_PATH", pgdata_base_path) + .status()?; + + if !status.success() { + return Err(Box::::from(format!( + "Pageserver failed to start. See '{}' for details.", + self.env.pageserver_log().to_str().unwrap() + ))); + } else { + return Ok(()); + } + } + + pub fn stop(&self) -> Result<()> { + let pidfile = self.env.pageserver_pidfile(); + let pid = read_pidfile(&pidfile)?; + + let status = Command::new("kill") + .arg(&pid) + .env_clear() + .status() + .expect("failed to execute kill"); + + if !status.success() { + return Err(Box::::from(format!( + "Failed to kill pageserver with pid {}", + pid + ))); + } + + // await for pageserver stop + for _ in 0..5 { + let stream = TcpStream::connect(self.address()); + if let Err(_e) = stream { + return Ok(()); + } + println!("Stopping pageserver on {}", self.address()); + thread::sleep(Duration::from_secs(1)); + } + + // ok, we failed to stop pageserver, let's panic + if !status.success() { + return Err(Box::::from(format!( + "Failed to stop pageserver with pid {}", + pid + ))); + } else { + return Ok(()); + } + } + + pub fn page_server_psql(&self, sql: &str) -> Vec { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.address().ip(), + self.address().port(), + "no_db", + "no_user", + ); + let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); + + println!("Pageserver query: '{}'", sql); + client.simple_query(sql).unwrap() + } +} + +impl Drop for PageServerNode { + fn drop(&mut self) { + if self.kill_on_exit { + let _ = self.stop(); + } + } +} + +// +// Control routines for WalAcceptor. +// +// Now used only in test setups. +// +pub struct WalAcceptorNode { + listen: SocketAddr, + data_dir: PathBuf, + env: LocalEnv, +} + +impl WalAcceptorNode { + pub fn init(&self) { + if self.data_dir.exists() { + fs::remove_dir_all(self.data_dir.clone()).unwrap(); + } + fs::create_dir_all(self.data_dir.clone()).unwrap(); + } + + pub fn start(&self) { + println!( + "Starting wal_acceptor in {} listening '{}'", + self.data_dir.to_str().unwrap(), + self.listen + ); + + let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor")) + .args(&["-D", self.data_dir.to_str().unwrap()]) + .args(&["-l", self.listen.to_string().as_str()]) + .arg("-d") + .arg("-n") + .status() + .expect("failed to start wal_acceptor"); + + if !status.success() { + panic!("wal_acceptor start failed"); + } + } + + pub fn stop(&self) -> std::result::Result<(), io::Error> { + println!("Stopping wal acceptor on {}", self.listen); + let pidfile = self.data_dir.join("wal_acceptor.pid"); + let pid = read_pidfile(&pidfile)?; + // Ignores any failures when running this command + let _status = Command::new("kill") + .arg(pid) + .env_clear() + .status() + .expect("failed to execute kill"); + + Ok(()) + } +} + +impl Drop for WalAcceptorNode { + fn drop(&mut self) { + self.stop().unwrap(); + } +} + +/////////////////////////////////////////////////////////////////////////////// + +pub struct WalProposerNode { + pub pid: u32, +} + +impl WalProposerNode { + pub fn stop(&self) { + let status = Command::new("kill") + .arg(self.pid.to_string()) + .env_clear() + .status() + .expect("failed to execute kill"); + + if !status.success() { + panic!("kill start failed"); + } + } +} + +impl Drop for WalProposerNode { + fn drop(&mut self) { + self.stop(); + } +} + +/////////////////////////////////////////////////////////////////////////////// + +pub fn regress_check(pg: &PostgresNode) { + pg.safe_psql("postgres", "CREATE DATABASE regression"); + + let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress"); + fs::create_dir_all(regress_run_path.clone()).unwrap(); + std::env::set_current_dir(regress_run_path).unwrap(); + + let regress_build_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); + let regress_src_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); + + let _regress_check = Command::new(regress_build_path.join("pg_regress")) + .args(&[ + "--bindir=''", + "--use-existing", + format!("--bindir={}", pg.env.pg_bin_dir().to_str().unwrap()).as_str(), + format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), + format!( + "--schedule={}", + regress_src_path.join("parallel_schedule").to_str().unwrap() + ) + .as_str(), + format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), + ]) + .env_clear() + .env("LD_LIBRARY_PATH", pg.env.pg_lib_dir().to_str().unwrap()) + .env("PGHOST", pg.address.ip().to_string()) + .env("PGPORT", pg.address.port().to_string()) + .env("PGUSER", pg.whoami()) + .status() + .expect("pg_regress failed"); +} + +/// Read a PID file +/// +/// This should contain an unsigned integer, but we return it as a String +/// because our callers only want to pass it back into a subcommand. +fn read_pidfile(pidfile: &Path) -> std::result::Result { + fs::read_to_string(pidfile).map_err(|err| { + eprintln!("failed to read pidfile {:?}: {:?}", pidfile, err); + err + }) +} diff --git a/integration_tests/.gitignore b/integration_tests/.gitignore new file mode 100644 index 0000000000..80006e4280 --- /dev/null +++ b/integration_tests/.gitignore @@ -0,0 +1 @@ +tmp_check/ \ No newline at end of file diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 6481bdbe29..b201b1849e 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -9,8 +9,7 @@ edition = "2018" [dependencies] lazy_static = "1.4.0" rand = "0.8.3" -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } -pageserver = { path = "../pageserver" } -walkeeper = { path = "../walkeeper" } +control_plane = { path = "../control_plane" } diff --git a/integration_tests/tests/control_plane/mod.rs b/integration_tests/tests/control_plane/mod.rs deleted file mode 100644 index 7ad9bbd320..0000000000 --- a/integration_tests/tests/control_plane/mod.rs +++ /dev/null @@ -1,692 +0,0 @@ -// -// Local control plane. -// -// Can start, cofigure and stop postgres instances running as a local processes. -// -// Intended to be used in integration tests and in CLI tools for -// local installations. -// - -use std::fs::File; -use std::fs::{self, OpenOptions}; -use std::path::{Path, PathBuf}; -use std::process::Command; -use std::str; -use std::sync::Arc; -use std::{ - io::Write, - net::{IpAddr, Ipv4Addr, SocketAddr}, -}; - -use lazy_static::lazy_static; -use postgres::{Client, NoTls}; - -lazy_static! { - // postgres would be there if it was build by 'make postgres' here in the repo - pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_install/bin"); - pub static ref PG_LIB_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_install/lib"); - - pub static ref BIN_DIR : PathBuf = cargo_bin_dir(); - - pub static ref TEST_WORKDIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("tmp_check"); -} - -// Find the directory where the binaries were put (i.e. target/debug/) -pub fn cargo_bin_dir() -> PathBuf { - let mut pathbuf = std::env::current_exe().ok().unwrap(); - - pathbuf.pop(); - if pathbuf.ends_with("deps") { - pathbuf.pop(); - } - - return pathbuf; -} - -// -// I'm intendedly modelling storage and compute control planes as a separate entities -// as it is closer to the actual setup. -// -pub struct StorageControlPlane { - pub wal_acceptors: Vec, - pub page_servers: Vec, -} - -impl StorageControlPlane { - // postgres <-> page_server - pub fn one_page_server() -> StorageControlPlane { - let mut cplane = StorageControlPlane { - wal_acceptors: Vec::new(), - page_servers: Vec::new(), - }; - - let pserver = PageServerNode { - page_service_addr: "127.0.0.1:65200".parse().unwrap(), - data_dir: TEST_WORKDIR.join("pageserver"), - }; - pserver.init(); - pserver.start(); - - cplane.page_servers.push(pserver); - cplane - } - - pub fn fault_tolerant(redundancy: usize) -> StorageControlPlane { - let mut cplane = StorageControlPlane { - wal_acceptors: Vec::new(), - page_servers: Vec::new(), - }; - const WAL_ACCEPTOR_PORT: usize = 54321; - - for i in 0..redundancy { - let wal_acceptor = WalAcceptorNode { - listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i) - .parse() - .unwrap(), - data_dir: TEST_WORKDIR.join(format!("wal_acceptor_{}", i)), - }; - wal_acceptor.init(); - wal_acceptor.start(); - cplane.wal_acceptors.push(wal_acceptor); - } - cplane - } - - pub fn stop(&self) { - for wa in self.wal_acceptors.iter() { - wa.stop(); - } - } - - // // postgres <-> wal_acceptor x3 <-> page_server - // fn local(&mut self) -> StorageControlPlane { - // } - - pub fn page_server_addr(&self) -> &SocketAddr { - &self.page_servers[0].page_service_addr - } - - pub fn get_wal_acceptor_conn_info(&self) -> String { - self.wal_acceptors - .iter() - .map(|wa| wa.listen.to_string().to_string()) - .collect::>() - .join(",") - } - - pub fn page_server_psql(&self, sql: &str) -> Vec { - let addr = &self.page_servers[0].page_service_addr; - - let connstring = format!( - "host={} port={} dbname={} user={}", - addr.ip(), - addr.port(), - "no_db", - "no_user", - ); - let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); - - println!("Pageserver query: '{}'", sql); - client.simple_query(sql).unwrap() - } -} - -impl Drop for StorageControlPlane { - fn drop(&mut self) { - self.stop(); - } -} - -pub struct PageServerNode { - page_service_addr: SocketAddr, - data_dir: PathBuf, -} - -impl PageServerNode { - // TODO: method to force redo on a specific relation - - // TODO: make wal-redo-postgres workable without data directory? - pub fn init(&self) { - fs::create_dir_all(self.data_dir.clone()).unwrap(); - - let datadir_path = self.data_dir.join("wal_redo_pgdata"); - fs::remove_dir_all(datadir_path.to_str().unwrap()).ok(); - - let initdb = Command::new(PG_BIN_DIR.join("initdb")) - .args(&["-D", datadir_path.to_str().unwrap()]) - .arg("-N") - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("failed to execute initdb"); - if !initdb.success() { - panic!("initdb failed"); - } - } - - pub fn start(&self) { - println!("Starting pageserver at '{}'", self.page_service_addr); - - let status = Command::new(BIN_DIR.join("pageserver")) - .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-l", self.page_service_addr.to_string().as_str()]) - .arg("-d") - .arg("--skip-recovery") - .env_clear() - .env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("failed to start pageserver"); - - if !status.success() { - panic!("pageserver start failed"); - } - } - - pub fn stop(&self) { - let pidfile = self.data_dir.join("pageserver.pid"); - let pid = fs::read_to_string(pidfile).unwrap(); - let status = Command::new("kill") - .arg(pid) - .env_clear() - .status() - .expect("failed to execute kill"); - - if !status.success() { - panic!("kill start failed"); - } - } -} - -impl Drop for PageServerNode { - fn drop(&mut self) { - self.stop(); - // fs::remove_dir_all(self.data_dir.clone()).unwrap(); - } -} - -pub struct WalAcceptorNode { - listen: SocketAddr, - data_dir: PathBuf, -} - -impl WalAcceptorNode { - pub fn init(&self) { - if self.data_dir.exists() { - fs::remove_dir_all(self.data_dir.clone()).unwrap(); - } - fs::create_dir_all(self.data_dir.clone()).unwrap(); - } - - pub fn start(&self) { - println!( - "Starting wal_acceptor in {} listening '{}'", - self.data_dir.to_str().unwrap(), - self.listen - ); - - let status = Command::new(BIN_DIR.join("wal_acceptor")) - .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-l", self.listen.to_string().as_str()]) - .arg("-d") - .arg("-n") - .status() - .expect("failed to start wal_acceptor"); - - if !status.success() { - panic!("wal_acceptor start failed"); - } - } - - pub fn stop(&self) { - let pidfile = self.data_dir.join("wal_acceptor.pid"); - if let Ok(pid) = fs::read_to_string(pidfile) { - let _status = Command::new("kill") - .arg(pid) - .env_clear() - .status() - .expect("failed to execute kill"); - } - } -} - -impl Drop for WalAcceptorNode { - fn drop(&mut self) { - self.stop(); - // fs::remove_dir_all(self.data_dir.clone()).unwrap(); - } -} - -/////////////////////////////////////////////////////////////////////////////// - -// -// ComputeControlPlane -// -pub struct ComputeControlPlane<'a> { - pg_bin_dir: PathBuf, - work_dir: PathBuf, - last_assigned_port: u16, - storage_cplane: &'a StorageControlPlane, - nodes: Vec>, -} - -impl ComputeControlPlane<'_> { - pub fn local(storage_cplane: &StorageControlPlane) -> ComputeControlPlane { - ComputeControlPlane { - pg_bin_dir: PG_BIN_DIR.to_path_buf(), - work_dir: TEST_WORKDIR.to_path_buf(), - last_assigned_port: 65431, - storage_cplane: storage_cplane, - nodes: Vec::new(), - } - } - - // TODO: check port availability and - fn get_port(&mut self) -> u16 { - let port = self.last_assigned_port + 1; - self.last_assigned_port += 1; - port - } - - pub fn new_vanilla_node<'a>(&mut self) -> &Arc { - // allocate new node entry with generated port - let node_id = self.nodes.len() + 1; - let node = PostgresNode { - _node_id: node_id, - port: self.get_port(), - ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), - pg_bin_dir: self.pg_bin_dir.clone(), - }; - self.nodes.push(Arc::new(node)); - let node = self.nodes.last().unwrap(); - - // initialize data directory - fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok(); - let initdb_path = self.pg_bin_dir.join("initdb"); - println!("initdb_path: {}", initdb_path.to_str().unwrap()); - let initdb = Command::new(initdb_path) - .args(&["-D", node.pgdata.to_str().unwrap()]) - .arg("-N") - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("failed to execute initdb"); - - if !initdb.success() { - panic!("initdb failed"); - } - - // // allow local replication connections - // node.append_conf("pg_hba.conf", format!("\ - // host replication all {}/32 sspi include_realm=1 map=regress\n\ - // ", node.ip).as_str()); - - // listen for selected port - node.append_conf( - "postgresql.conf", - format!( - "\ - max_wal_senders = 10\n\ - max_replication_slots = 10\n\ - hot_standby = on\n\ - shared_buffers = 1MB\n\ - fsync = off\n\ - max_connections = 100\n\ - wal_level = replica\n\ - wal_sender_timeout = 0\n\ - listen_addresses = '{address}'\n\ - port = {port}\n\ - ", - address = node.ip, - port = node.port - ) - .as_str(), - ); - - node - } - - // Init compute node without files, only datadir structure - // use initdb --compute-node flag and GUC 'computenode_mode' - // to distinguish the node - pub fn new_minimal_node(&mut self) -> &PostgresNode { - // allocate new node entry with generated port - let node_id = self.nodes.len() + 1; - let node = PostgresNode { - _node_id: node_id, - port: self.get_port(), - ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), - pg_bin_dir: self.pg_bin_dir.clone(), - }; - self.nodes.push(Arc::new(node)); - let node = self.nodes.last().unwrap(); - - // initialize data directory w/o files - fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok(); - let initdb_path = self.pg_bin_dir.join("initdb"); - println!("initdb_path: {}", initdb_path.to_str().unwrap()); - let initdb = Command::new(initdb_path) - .args(&["-D", node.pgdata.to_str().unwrap()]) - .arg("-N") - .arg("--no-instructions") - .arg("--compute-node") - .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("failed to execute initdb"); - - if !initdb.success() { - panic!("initdb failed"); - } - - // listen for selected port - node.append_conf( - "postgresql.conf", - format!( - "\ - max_wal_senders = 10\n\ - max_replication_slots = 10\n\ - hot_standby = on\n\ - shared_buffers = 1MB\n\ - fsync = off\n\ - max_connections = 100\n\ - wal_level = replica\n\ - listen_addresses = '{address}'\n\ - port = {port}\n\ - computenode_mode = true\n\ - ", - address = node.ip, - port = node.port - ) - .as_str(), - ); - - node - } - - pub fn new_node(&mut self) -> Arc { - let storage_cplane = self.storage_cplane; - let node = self.new_vanilla_node(); - - let pserver = storage_cplane.page_server_addr(); - - // Configure that node to take pages from pageserver - node.append_conf( - "postgresql.conf", - format!( - "\ - page_server_connstring = 'host={} port={}'\n\ - ", - pserver.ip(), - pserver.port() - ) - .as_str(), - ); - - node.clone() - } - - pub fn new_master_node(&mut self) -> Arc { - let node = self.new_vanilla_node(); - - node.append_conf( - "postgresql.conf", - "synchronous_standby_names = 'safekeeper_proxy'\n\ - ", - ); - node.clone() - } -} - -/////////////////////////////////////////////////////////////////////////////// - -pub struct WalProposerNode { - pid: u32, -} - -impl WalProposerNode { - pub fn stop(&self) { - let status = Command::new("kill") - .arg(self.pid.to_string()) - .env_clear() - .status() - .expect("failed to execute kill"); - - if !status.success() { - panic!("kill start failed"); - } - } -} - -impl Drop for WalProposerNode { - fn drop(&mut self) { - self.stop(); - } -} - -/////////////////////////////////////////////////////////////////////////////// - -pub struct PostgresNode { - _node_id: usize, - pub port: u16, - pub ip: IpAddr, - pgdata: PathBuf, - pg_bin_dir: PathBuf, -} - -impl PostgresNode { - pub fn append_conf(&self, config: &str, opts: &str) { - OpenOptions::new() - .append(true) - .open(self.pgdata.join(config).to_str().unwrap()) - .unwrap() - .write_all(opts.as_bytes()) - .unwrap(); - } - - fn pg_ctl(&self, args: &[&str], check_ok: bool) { - let pg_ctl_path = self.pg_bin_dir.join("pg_ctl"); - let pg_ctl = Command::new(pg_ctl_path) - .args( - [ - &[ - "-D", - self.pgdata.to_str().unwrap(), - "-l", - self.pgdata.join("log").to_str().unwrap(), - ], - args, - ] - .concat(), - ) - .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("failed to execute pg_ctl"); - - if check_ok && !pg_ctl.success() { - panic!("pg_ctl failed"); - } - } - - pub fn start(&self, storage_cplane: &StorageControlPlane) { - if storage_cplane.page_servers.len() != 0 { - let _res = - storage_cplane.page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); - } - println!("Starting postgres node at '{}'", self.connstr()); - self.pg_ctl(&["start"], true); - } - - pub fn restart(&self) { - self.pg_ctl(&["restart"], true); - } - - pub fn stop(&self) { - self.pg_ctl(&["-m", "immediate", "stop"], true); - } - - pub fn connstr(&self) -> String { - format!("host={} port={} user={}", self.ip, self.port, self.whoami()) - } - - // XXX: cache that in control plane - pub fn whoami(&self) -> String { - let output = Command::new("whoami") - .output() - .expect("failed to execute whoami"); - - if !output.status.success() { - panic!("whoami failed"); - } - - String::from_utf8(output.stdout).unwrap().trim().to_string() - } - - pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.ip, - self.port, - db, - self.whoami() - ); - let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); - - println!("Running {}", sql); - client.query(sql, &[]).unwrap() - } - - pub fn open_psql(&self, db: &str) -> Client { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.ip, - self.port, - db, - self.whoami() - ); - Client::connect(connstring.as_str(), NoTls).unwrap() - } - - pub fn get_pgdata(&self) -> Option<&str> { - self.pgdata.to_str() - } - - /* Create stub controlfile and respective xlog to start computenode */ - pub fn setup_controlfile(&self) { - let filepath = format!("{}/global/pg_control", self.pgdata.to_str().unwrap()); - - { - File::create(filepath).unwrap(); - } - - let pg_resetwal_path = self.pg_bin_dir.join("pg_resetwal"); - - let pg_resetwal = Command::new(pg_resetwal_path) - .args(&["-D", self.pgdata.to_str().unwrap()]) - .arg("-f") - // TODO probably we will have to modify pg_resetwal - // .arg("--compute-node") - .status() - .expect("failed to execute pg_resetwal"); - - if !pg_resetwal.success() { - panic!("pg_resetwal failed"); - } - } - - pub fn start_proxy(&self, wal_acceptors: String) -> WalProposerNode { - let proxy_path = PG_BIN_DIR.join("safekeeper_proxy"); - match Command::new(proxy_path.as_path()) - .args(&["-s", &wal_acceptors]) - .args(&["-h", &self.ip.to_string()]) - .args(&["-p", &self.port.to_string()]) - .arg("-v") - .stderr(File::create(TEST_WORKDIR.join("safepkeeper_proxy.log")).unwrap()) - .spawn() - { - Ok(child) => WalProposerNode { pid: child.id() }, - Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e), - } - } - - pub fn pg_regress(&self) { - self.safe_psql("postgres", "CREATE DATABASE regression"); - - let regress_run_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_check/regress"); - fs::create_dir_all(regress_run_path.clone()).unwrap(); - fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); - std::env::set_current_dir(regress_run_path).unwrap(); - - let regress_build_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); - let regress_src_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); - - let _regress_check = Command::new(regress_build_path.join("pg_regress")) - .args(&[ - "--bindir=''", - "--use-existing", - format!("--bindir={}", PG_BIN_DIR.to_str().unwrap()).as_str(), - format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), - format!( - "--schedule={}", - regress_src_path.join("parallel_schedule").to_str().unwrap() - ) - .as_str(), - format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), - ]) - .env_clear() - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .env("PGPORT", self.port.to_string()) - .env("PGUSER", self.whoami()) - .env("PGHOST", self.ip.to_string()) - .status() - .expect("pg_regress failed"); - } - - pub fn pg_bench(&self, clients: u32, seconds: u32) { - let port = self.port.to_string(); - let clients = clients.to_string(); - let seconds = seconds.to_string(); - let _pg_bench_init = Command::new(PG_BIN_DIR.join("pgbench")) - .args(&["-i", "-p", port.as_str(), "postgres"]) - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("pgbench -i"); - let _pg_bench_run = Command::new(PG_BIN_DIR.join("pgbench")) - .args(&[ - "-p", - port.as_str(), - "-T", - seconds.as_str(), - "-P", - "1", - "-c", - clients.as_str(), - "-M", - "prepared", - "postgres", - ]) - .env("LD_LIBRARY_PATH", PG_LIB_DIR.to_str().unwrap()) - .status() - .expect("pgbench run"); - } -} - -impl Drop for PostgresNode { - // destructor to clean up state after test is done - // XXX: we may detect failed test by setting some flag in catch_unwind() - // and checking it here. But let just clean datadirs on start. - fn drop(&mut self) { - self.stop(); - // fs::remove_dir_all(self.pgdata.clone()).unwrap(); - } -} diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 93fa1db861..a7e389455e 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -1,8 +1,9 @@ -#[allow(dead_code)] -mod control_plane; +// mod control_plane; +use control_plane::compute::ComputeControlPlane; +use control_plane::storage::TestStorageControlPlane; -use control_plane::ComputeControlPlane; -use control_plane::StorageControlPlane; +use std::thread::sleep; +use std::time::Duration; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff @@ -12,12 +13,12 @@ use control_plane::StorageControlPlane; #[test] fn test_redo_cases() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres - let node = compute_cplane.new_node(); - node.start(&storage_cplane); + let node = compute_cplane.new_test_node(); + node.start().unwrap(); // check basic work with table node.safe_psql( @@ -49,15 +50,17 @@ fn test_redo_cases() { // Runs pg_regress on a compute node #[test] +#[ignore] fn test_regress() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres - let node = compute_cplane.new_node(); - node.start(&storage_cplane); + let node = compute_cplane.new_test_node(); + node.start().unwrap(); +<<<<<<< HEAD node.pg_regress(); } @@ -73,20 +76,23 @@ fn pgbench() { node.start(&storage_cplane); node.pg_bench(10, 100); +======= + control_plane::storage::regress_check(&node); +>>>>>>> main } // Run two postgres instances on one pageserver #[test] fn test_pageserver_multitenancy() { // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::one_page_server(String::new()); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // Allocate postgres instance, but don't start - let node1 = compute_cplane.new_node(); - let node2 = compute_cplane.new_node(); - node1.start(&storage_cplane); - node2.start(&storage_cplane); + let node1 = compute_cplane.new_test_node(); + let node2 = compute_cplane.new_test_node(); + node1.start().unwrap(); + node2.start().unwrap(); // check node1 node1.safe_psql( @@ -122,3 +128,36 @@ fn test_pageserver_multitenancy() { println!("sum = {}", count); assert_eq!(count, 15000150000); } + +#[test] +fn test_upload_pageserver_local() { + // Init pageserver that reads WAL directly from that postgres + // Don't start yet + + let storage_cplane = TestStorageControlPlane::one_page_server_no_start(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); + + // init postgres node + let node = compute_cplane.new_test_node(); + + //upload data to pageserver & start it + &storage_cplane + .pageserver + .start_fromdatadir(node.pgdata().to_str().unwrap().to_string()) + .unwrap(); + + sleep(Duration::from_secs(10)); + + // start postgres node + node.start().unwrap(); + + // check basic work with table + node.safe_psql( + "postgres", + "CREATE TABLE t(key int primary key, value text)", + ); + node.safe_psql( + "postgres", + "INSERT INTO t SELECT generate_series(1,100000), 'payload'", + ); +} diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index df0d55413d..f4f7675b07 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -1,8 +1,6 @@ // Restart acceptors one by one while compute is under the load. -#[allow(dead_code)] -mod control_plane; -use control_plane::ComputeControlPlane; -use control_plane::StorageControlPlane; +use control_plane::compute::ComputeControlPlane; +use control_plane::storage::TestStorageControlPlane; use rand::Rng; use std::sync::Arc; @@ -13,13 +11,13 @@ use std::{thread, time}; fn test_acceptors_normal_work() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgre - let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + // start postgres + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -43,6 +41,53 @@ fn test_acceptors_normal_work() { // check wal files equality } +#[test] +fn test_multitenancy() { + // Start pageserver that reads WAL directly from that postgres + const REDUNDANCY: usize = 3; + const N_NODES: usize = 5; + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); + let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); + + // start postgres + let mut nodes = Vec::new(); + let mut proxies = Vec::new(); + for _ in 0..N_NODES { + let node = compute_cplane.new_test_master_node(); + nodes.push(node); + nodes.last().unwrap().start().unwrap(); + proxies.push(nodes.last().unwrap().start_proxy(wal_acceptors.clone())); + } + + // create schema + for node in &nodes { + node.safe_psql( + "postgres", + "CREATE TABLE t(key int primary key, value text)", + ); + } + + // Populate data + for node in &nodes { + node.safe_psql( + "postgres", + "INSERT INTO t SELECT generate_series(1,100000), 'payload'", + ); + } + + // Check data + for node in &nodes { + let count: i64 = node + .safe_psql("postgres", "SELECT sum(key) FROM t") + .first() + .unwrap() + .get(0); + println!("sum = {}", count); + assert_eq!(count, 5000050000); + } +} + // Majority is always alive #[test] fn test_acceptors_restarts() { @@ -50,14 +95,14 @@ fn test_acceptors_restarts() { const REDUNDANCY: usize = 3; const FAULT_PROBABILITY: f32 = 0.01; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); let mut rng = rand::thread_rng(); - // start postgre - let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + // start postgres + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -80,7 +125,7 @@ fn test_acceptors_restarts() { } else { let node: usize = rng.gen_range(0..REDUNDANCY); failed_node = Some(node); - storage_cplane.wal_acceptors[node].stop(); + storage_cplane.wal_acceptors[node].stop().unwrap(); } } } @@ -93,7 +138,7 @@ fn test_acceptors_restarts() { assert_eq!(count, 500500); } -fn start_acceptor(cplane: &Arc, no: usize) { +fn start_acceptor(cplane: &Arc, no: usize) { let cp = cplane.clone(); thread::spawn(move || { thread::sleep(time::Duration::from_secs(1)); @@ -109,13 +154,13 @@ fn test_acceptors_unavalability() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 2; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = TestStorageControlPlane::fault_tolerant(REDUNDANCY); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgre - let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + // start postgres + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -129,7 +174,7 @@ fn test_acceptors_unavalability() { psql.execute("INSERT INTO t values (1, 'payload')", &[]) .unwrap(); - storage_cplane.wal_acceptors[0].stop(); + storage_cplane.wal_acceptors[0].stop().unwrap(); let cp = Arc::new(storage_cplane); start_acceptor(&cp, 0); let now = SystemTime::now(); @@ -139,7 +184,7 @@ fn test_acceptors_unavalability() { psql.execute("INSERT INTO t values (3, 'payload')", &[]) .unwrap(); - cp.wal_acceptors[1].stop(); + cp.wal_acceptors[1].stop().unwrap(); start_acceptor(&cp, 1); psql.execute("INSERT INTO t values (4, 'payload')", &[]) .unwrap(); @@ -157,16 +202,16 @@ fn test_acceptors_unavalability() { assert_eq!(count, 15); } -fn simulate_failures(cplane: &Arc) { +fn simulate_failures(cplane: Arc) { let mut rng = rand::thread_rng(); let n_acceptors = cplane.wal_acceptors.len(); let failure_period = time::Duration::from_secs(1); - loop { + while cplane.is_running() { thread::sleep(failure_period); let mask: u32 = rng.gen_range(0..(1 << n_acceptors)); for i in 0..n_acceptors { if (mask & (1 << i)) != 0 { - cplane.wal_acceptors[i].stop(); + cplane.wal_acceptors[i].stop().unwrap(); } } thread::sleep(failure_period); @@ -184,13 +229,13 @@ fn test_race_conditions() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; - let storage_cplane = StorageControlPlane::fault_tolerant(REDUNDANCY); - let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant(REDUNDANCY)); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); - // start postgre - let node = compute_cplane.new_master_node(); - node.start(&storage_cplane); + // start postgres + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -200,10 +245,10 @@ fn test_race_conditions() { "postgres", "CREATE TABLE t(key int primary key, value text)", ); - let cplane = Arc::new(storage_cplane); - let cp = cplane.clone(); - thread::spawn(move || { - simulate_failures(&cp); + + let cp = storage_cplane.clone(); + let failures_thread = thread::spawn(move || { + simulate_failures(cp); }); let mut psql = node.open_psql("postgres"); @@ -218,5 +263,7 @@ fn test_race_conditions() { .get(0); println!("sum = {}", count); assert_eq!(count, 500500); - cplane.stop(); + + storage_cplane.stop(); + failures_thread.join().unwrap(); } diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8d629deabc..17a5f48d18 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -26,7 +26,7 @@ clap = "2.33.0" termion = "1.5.6" tui = "0.14.0" daemonize = "0.4.1" -rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] } +rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } @@ -35,3 +35,5 @@ postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replica rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb.git" } anyhow = "1.0" crc32c = "0.6.0" +walkdir = "2" +thiserror = "1.0" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index bb920d46de..9e46e3d0a9 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -6,24 +6,24 @@ use log::*; use std::fs; use std::io; use std::path::PathBuf; +use std::process::exit; use std::thread; -use std::{fs::OpenOptions, str::FromStr}; +use std::fs::OpenOptions; +use anyhow::{Context, Result}; use clap::{App, Arg}; use daemonize::Daemonize; -use slog; use slog::Drain; -use slog_scope; -use slog_stdlog; use pageserver::page_service; +use pageserver::restore_datadir; use pageserver::restore_s3; use pageserver::tui; use pageserver::walreceiver; use pageserver::PageServerConf; -fn main() -> Result<(), io::Error> { +fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") .about("Materializes WAL stream to pages and serves them to the postgres") .arg(Arg::with_name("datadir") @@ -51,10 +51,10 @@ fn main() -> Result<(), io::Error> { .long("daemonize") .takes_value(false) .help("Run in the background")) - .arg(Arg::with_name("skip_recovery") - .long("skip-recovery") - .takes_value(false) - .help("Skip S3 recovery procedy and start empty")) + .arg(Arg::with_name("restore_from") + .long("restore-from") + .takes_value(true) + .help("Upload data from s3 or datadir")) .get_matches(); let mut conf = PageServerConf { @@ -63,7 +63,7 @@ fn main() -> Result<(), io::Error> { interactive: false, wal_producer_connstr: None, listen_addr: "127.0.0.1:5430".parse().unwrap(), - skip_recovery: false, + restore_from: String::new(), }; if let Some(dir) = arg_matches.value_of("datadir") { @@ -79,31 +79,29 @@ fn main() -> Result<(), io::Error> { } if conf.daemonize && conf.interactive { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "--daemonize is not allowed with --interactive: choose one", - )); + eprintln!("--daemonize is not allowed with --interactive: choose one"); + exit(1); } - if arg_matches.is_present("skip_recovery") { - conf.skip_recovery = true; + if let Some(restore_from) = arg_matches.value_of("restore_from") { + conf.restore_from = String::from(restore_from); } if let Some(addr) = arg_matches.value_of("wal_producer") { - conf.wal_producer_connstr = Some(String::from_str(addr).unwrap()); + conf.wal_producer_connstr = Some(String::from(addr)); } if let Some(addr) = arg_matches.value_of("listen") { - conf.listen_addr = addr.parse().unwrap(); + conf.listen_addr = addr.parse()?; } - start_pageserver(conf) + start_pageserver(&conf) } -fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { +fn start_pageserver(conf: &PageServerConf) -> Result<()> { // Initialize logger - let _scope_guard = init_logging(&conf); - let _log_guard = slog_stdlog::init().unwrap(); + let _scope_guard = init_logging(&conf)?; + let _log_guard = slog_stdlog::init()?; // Note: this `info!(...)` macro comes from `log` crate info!("standard logging redirected to slog"); @@ -127,18 +125,15 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { if conf.daemonize { info!("daemonizing..."); - // There should'n be any logging to stdin/stdout. Redirect it to the main log so - // that we will see any accidental manual fpritf's or backtraces. + // There shouldn't be any logging to stdin/stdout. Redirect it to the main log so + // that we will see any accidental manual fprintf's or backtraces. + let log_filename = conf.data_dir.join("pageserver.log"); let stdout = OpenOptions::new() .create(true) .append(true) - .open(conf.data_dir.join("pageserver-stdout.log")) - .unwrap(); - let stderr = OpenOptions::new() - .create(true) - .append(true) - .open(conf.data_dir.join("pageserver-stderr.log")) - .unwrap(); + .open(&log_filename) + .with_context(|| format!("failed to open {:?}", log_filename))?; + let stderr = stdout.try_clone()?; let daemonize = Daemonize::new() .pid_file(conf.data_dir.join("pageserver.pid")) @@ -154,13 +149,17 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { let mut threads = Vec::new(); - info!("starting..."); + info!("starting... {}", conf.restore_from); // Before opening up for connections, restore the latest base backup from S3. // (We don't persist anything to local disk at the moment, so we need to do // this at every startup) - if !conf.skip_recovery { + if conf.restore_from.eq("s3") { + info!("restore-from s3..."); restore_s3::restore_main(&conf); + } else if conf.restore_from.eq("local") { + info!("restore-from local..."); + restore_datadir::restore_main(&conf); } // Create directory for wal-redo datadirs @@ -169,7 +168,7 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { Err(e) => match e.kind() { io::ErrorKind::AlreadyExists => {} _ => { - panic!("Failed to create wal-redo data directory: {}", e); + anyhow::bail!("Failed to create wal-redo data directory: {}", e); } }, } @@ -181,13 +180,13 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { // // All other wal receivers are started on demand by "callmemaybe" command // sent to pageserver. - let conf_copy = conf.clone(); - if let Some(wal_producer) = conf.wal_producer_connstr { - let conf = conf_copy.clone(); + if let Some(wal_producer) = &conf.wal_producer_connstr { + let conf_copy = conf.clone(); + let wal_producer = wal_producer.clone(); let walreceiver_thread = thread::Builder::new() .name("static WAL receiver thread".into()) .spawn(move || { - walreceiver::thread_main(conf, &wal_producer); + walreceiver::thread_main(&conf_copy, &wal_producer); }) .unwrap(); threads.push(walreceiver_thread); @@ -195,12 +194,12 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) - let conf = conf_copy.clone(); + let conf_copy = conf.clone(); let page_server_thread = thread::Builder::new() .name("Page Service thread".into()) - .spawn(|| { + .spawn(move || { // thread code - page_service::thread_main(conf); + page_service::thread_main(&conf_copy); }) .unwrap(); threads.push(page_server_thread); @@ -217,16 +216,20 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { Ok(()) } -fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard { +fn init_logging(conf: &PageServerConf) -> Result { if conf.interactive { - tui::init_logging() + Ok(tui::init_logging()) } else if conf.daemonize { let log = conf.data_dir.join("pageserver.log"); let log_file = OpenOptions::new() .create(true) .append(true) .open(log) - .unwrap_or_else(|_| panic!("Could not create log file")); + .unwrap_or_else(|_| { + eprintln!("Could not create log file {:?}: {}", log, err); + err + })?; + let decorator = slog_term::PlainSyncDecorator::new(log_file); let drain = slog_term::CompactFormat::new(decorator).build(); let drain = slog::Filter::new(drain, |record: &slog::Record| { @@ -237,7 +240,7 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard { }); let drain = std::sync::Mutex::new(drain).fuse(); let logger = slog::Logger::root(drain, slog::o!()); - slog_scope::set_global_logger(logger) + Ok(slog_scope::set_global_logger(logger)) } else { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); @@ -255,6 +258,6 @@ fn init_logging(conf: &PageServerConf) -> slog_scope::GlobalLoggerGuard { }) .fuse(); let logger = slog::Logger::root(drain, slog::o!()); - slog_scope::set_global_logger(logger) + Ok(slog_scope::set_global_logger(logger)) } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index b504308e6b..b3f07f608c 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -3,6 +3,8 @@ use std::path::PathBuf; pub mod page_cache; pub mod page_service; +pub mod pg_constants; +pub mod restore_datadir; pub mod restore_s3; pub mod tui; pub mod tui_event; @@ -11,7 +13,6 @@ pub mod waldecoder; pub mod walreceiver; pub mod walredo; -#[allow(dead_code)] #[derive(Debug, Clone)] pub struct PageServerConf { pub data_dir: PathBuf, @@ -19,5 +20,5 @@ pub struct PageServerConf { pub interactive: bool, pub wal_producer_connstr: Option, pub listen_addr: SocketAddr, - pub skip_recovery: bool, + pub restore_from: String, } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 536678d194..9d39554721 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -6,24 +6,24 @@ // per-entry mutex. // +use crate::{walredo, PageServerConf}; +use anyhow::bail; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use std::error::Error; +use core::ops::Bound::Included; +use crossbeam_channel::unbounded; +use crossbeam_channel::{Receiver, Sender}; +use lazy_static::lazy_static; +use log::*; +use rand::Rng; +use std::collections::{BTreeMap, HashMap}; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; use std::{convert::TryInto, ops::AddAssign}; -// use tokio::sync::RwLock; use lazy_static::lazy_static; -use log::*; use rocksdb::*; -use std::collections::HashMap; - -use crate::{walredo, PageServerConf}; - -use crossbeam_channel::unbounded; -use crossbeam_channel::{Receiver, Sender}; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -112,7 +112,7 @@ lazy_static! { pub static ref PAGECACHES: Mutex>> = Mutex::new(HashMap::new()); } -pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { +pub fn get_pagecache(conf: &PageServerConf, sys_id: u64) -> Arc { let mut pcaches = PAGECACHES.lock().unwrap(); if !pcaches.contains_key(&sys_id) { @@ -123,10 +123,11 @@ pub fn get_pagecache(conf: PageServerConf, sys_id: u64) -> Arc { // Now join_handle is not saved any where and we won'try restart tharead // if it is dead. We may later stop that treads after some inactivity period // and restart them on demand. + let conf = conf.clone(); let _walredo_thread = thread::Builder::new() .name("WAL redo thread".into()) .spawn(move || { - walredo::wal_redo_main(conf, sys_id); + walredo::wal_redo_main(&conf, sys_id); }) .unwrap(); } @@ -252,6 +253,19 @@ impl CacheEntryContent { } } +impl CacheEntry { + fn new(key: CacheKey) -> CacheEntry { + CacheEntry { + key, + content: Mutex::new(CacheEntryContent { + page_image: None, + wal_record: None, + apply_pending: false, + } + } + } +} + impl CacheEntry { fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry { CacheEntry { @@ -345,7 +359,7 @@ impl PageCache { // // Returns an 8k page image // - pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result> { + pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); // Look up cache entry. If it's a page image, return that. If it's a WAL record, @@ -372,15 +386,11 @@ impl PageCache { shared = wait_result.0; if wait_result.1.timed_out() { - error!( - "Timed out while waiting for WAL record at LSN {} to arrive", - lsn - ); - return Err(format!( + bail!( "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", lsn >> 32, lsn & 0xffff_ffff - ))?; + ); } } if waited { @@ -388,11 +398,23 @@ impl PageCache { } if lsn < shared.first_valid_lsn { - return Err(format!( + bail!( "LSN {:X}/{:X} has already been removed", lsn >> 32, lsn & 0xffff_ffff - ))?; + ); + } + + let pagecache = &shared.pagecache; + + let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); + + let entry_opt = entries.next_back(); + + if entry_opt.is_none() { + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + return Ok(Bytes::from_static(&ZERO_PAGE)); + /* return Err("could not find page image")?; */ } } let mut buf = BytesMut::new(); @@ -440,14 +462,16 @@ impl PageCache { page_img = match &entry_content.page_image { Some(p) => p.clone(), None => { - error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); - return Err("could not apply WAL to reconstruct page image".into()); + error!( + "could not apply WAL to reconstruct page image for GetPage@LSN request" + ); + bail!("could not apply WAL to reconstruct page image"); } }; self.put_page_image(tag, lsn, page_img.clone()); } else { // No base image, and no WAL record. Huh? - panic!("no page image or WAL record for requested page"); + bail!("no page image or WAL record for requested page"); } // FIXME: assumes little-endian. Only used for the debugging log though diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b23c65e44a..84e155c940 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -215,7 +215,7 @@ impl FeMessage { /////////////////////////////////////////////////////////////////////////////// -pub fn thread_main(conf: PageServerConf) { +pub fn thread_main(conf: &PageServerConf) { // Create a new thread pool // // FIXME: keep it single-threaded for now, make it easier to debug with gdb, @@ -260,7 +260,7 @@ impl Connection { stream: BufWriter::new(socket), buffer: BytesMut::with_capacity(10 * 1024), init_done: false, - conf: conf, + conf, } } @@ -459,7 +459,7 @@ impl Connection { let _walreceiver_thread = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(move || { - walreceiver::thread_main(conf_copy, &connstr); + walreceiver::thread_main(&conf_copy, &connstr); }) .unwrap(); @@ -504,7 +504,7 @@ impl Connection { self.stream.write_i16(0).await?; /* numAttributes */ self.stream.flush().await?; - let pcache = page_cache::get_pagecache(self.conf.clone(), sysid); + let pcache = page_cache::get_pagecache(&self.conf, sysid); loop { let message = self.read_message().await?; @@ -561,7 +561,7 @@ impl Connection { self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, - n_blocks: n_blocks, + n_blocks, })) .await? } diff --git a/pageserver/src/pg_constants.rs b/pageserver/src/pg_constants.rs new file mode 100644 index 0000000000..b59ddb5396 --- /dev/null +++ b/pageserver/src/pg_constants.rs @@ -0,0 +1,11 @@ +// From pg_tablespace_d.h +// +pub const DEFAULTTABLESPACE_OID: u32 = 1663; +pub const GLOBALTABLESPACE_OID: u32 = 1664; +//Special values for non-rel files' tags +//TODO maybe use enum? +pub const PG_CONTROLFILE_FORKNUM: u32 = 42; +pub const PG_FILENODEMAP_FORKNUM: u32 = 43; +pub const PG_XACT_FORKNUM: u32 = 44; +pub const PG_MXACT_OFFSETS_FORKNUM: u32 = 45; +pub const PG_MXACT_MEMBERS_FORKNUM: u32 = 46; diff --git a/pageserver/src/restore_datadir.rs b/pageserver/src/restore_datadir.rs new file mode 100644 index 0000000000..985f5e3905 --- /dev/null +++ b/pageserver/src/restore_datadir.rs @@ -0,0 +1,339 @@ +// +// Restore chunks from S3 +// +// This runs once at Page Server startup. It loads all the "base images" from +// S3 into the in-memory page cache. It also initializes the "last valid LSN" +// in the page cache to the LSN of the base image, so that when the WAL receiver +// is started, it starts streaming from that LSN. +// + +use bytes::{Buf, BytesMut}; +use log::*; +use regex::Regex; +use std::env; +use std::fmt; + +use tokio::runtime; + +use futures::future; + +use crate::{page_cache, pg_constants, PageServerConf}; +use std::fs; +use walkdir::WalkDir; + +pub fn restore_main(conf: &PageServerConf) { + // Create a new thread pool + let runtime = runtime::Runtime::new().unwrap(); + + runtime.block_on(async { + let result = restore_chunk(conf).await; + + match result { + Ok(_) => { + return; + } + Err(err) => { + error!("error: {}", err); + return; + } + } + }); +} + +async fn restore_chunk(conf: &PageServerConf) -> Result<(), FilePathError> { + let pgdata_base_path = env::var("PGDATA_BASE_PATH").unwrap(); + info!("Restoring from local dir..."); + + let sys_id: u64 = 42; + let control_lsn = 0; //TODO get it from sysid + let mut slurp_futures: Vec<_> = Vec::new(); + + for e in WalkDir::new(pgdata_base_path.clone()) { + let entry = e.unwrap(); + + if !entry.path().is_dir() { + let path = entry.path().to_str().unwrap(); + + let relpath = path + .strip_prefix(&format!("{}/", pgdata_base_path)) + .unwrap(); + info!( + "Restoring file {} relpath {}", + entry.path().display(), + relpath + ); + + let parsed = parse_rel_file_path(&relpath); + + match parsed { + Ok(mut p) => { + p.lsn = control_lsn; + + let f = slurp_base_file(conf, sys_id, path.to_string(), p); + + slurp_futures.push(f); + } + Err(e) => { + warn!("unrecognized file: {} ({})", relpath, e); + } + }; + } + } + + let pcache = page_cache::get_pagecache(conf, sys_id); + pcache.init_valid_lsn(control_lsn); + + info!("{} files to restore...", slurp_futures.len()); + + future::join_all(slurp_futures).await; + info!("restored!"); + Ok(()) +} + +#[derive(Debug)] +struct FilePathError { + msg: String, +} + +impl FilePathError { + fn new(msg: &str) -> FilePathError { + FilePathError { + msg: msg.to_string(), + } + } +} + +impl From for FilePathError { + fn from(e: core::num::ParseIntError) -> Self { + return FilePathError { + msg: format!("invalid filename: {}", e), + }; + } +} + +impl fmt::Display for FilePathError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "invalid filename") + } +} + +fn forkname_to_forknum(forkname: Option<&str>) -> Result { + match forkname { + // "main" is not in filenames, it's implicit if the fork name is not present + None => Ok(0), + Some("fsm") => Ok(1), + Some("vm") => Ok(2), + Some("init") => Ok(3), + Some(_) => Err(FilePathError::new("invalid forkname")), + } +} + +#[derive(Debug)] +struct ParsedBaseImageFileName { + pub spcnode: u32, + pub dbnode: u32, + pub relnode: u32, + pub forknum: u32, + pub segno: u32, + + pub lsn: u64, +} + +// formats: +// +// _ +// . +// _. +fn parse_filename(fname: &str) -> Result<(u32, u32, u32, u64), FilePathError> { + let re = Regex::new(r"^(?P\d+)(_(?P[a-z]+))?(\.(?P\d+))?$").unwrap(); + + let caps = re + .captures(fname) + .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; + + let relnode_str = caps.name("relnode").unwrap().as_str(); + let relnode = u32::from_str_radix(relnode_str, 10)?; + + let forkname_match = caps.name("forkname"); + let forkname = if forkname_match.is_none() { + None + } else { + Some(forkname_match.unwrap().as_str()) + }; + let forknum = forkname_to_forknum(forkname)?; + + let segno_match = caps.name("segno"); + let segno = if segno_match.is_none() { + 0 + } else { + u32::from_str_radix(segno_match.unwrap().as_str(), 10)? + }; + return Ok((relnode, forknum, segno, 0)); +} + +fn parse_rel_file_path(path: &str) -> Result { + /* + * Relation data files can be in one of the following directories: + * + * global/ + * shared relations + * + * base// + * regular relations, default tablespace + * + * pg_tblspc/// + * within a non-default tablespace (the name of the directory + * depends on version) + * + * And the relation data files themselves have a filename like: + * + * . + */ + if let Some(fname) = path.strip_prefix("global/") { + if fname.contains("pg_control") { + return Ok(ParsedBaseImageFileName { + spcnode: pg_constants::GLOBALTABLESPACE_OID, + dbnode: 0, + relnode: 0, + forknum: pg_constants::PG_CONTROLFILE_FORKNUM, + segno: 0, + lsn: 0, + }); + } + + if fname.contains("pg_filenode") { + return Ok(ParsedBaseImageFileName { + spcnode: pg_constants::GLOBALTABLESPACE_OID, + dbnode: 0, + relnode: 0, + forknum: pg_constants::PG_FILENODEMAP_FORKNUM, + segno: 0, + lsn: 0, + }); + } + + let (relnode, forknum, segno, lsn) = parse_filename(fname)?; + + return Ok(ParsedBaseImageFileName { + spcnode: pg_constants::GLOBALTABLESPACE_OID, + dbnode: 0, + relnode, + forknum, + segno, + lsn, + }); + } else if let Some(dbpath) = path.strip_prefix("base/") { + let mut s = dbpath.split("/"); + let dbnode_str = s + .next() + .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; + let dbnode = u32::from_str_radix(dbnode_str, 10)?; + let fname = s + .next() + .ok_or_else(|| FilePathError::new("invalid relation data file name"))?; + if s.next().is_some() { + return Err(FilePathError::new("invalid relation data file name")); + }; + + if fname.contains("pg_filenode") { + return Ok(ParsedBaseImageFileName { + spcnode: pg_constants::DEFAULTTABLESPACE_OID, + dbnode: dbnode, + relnode: 0, + forknum: pg_constants::PG_FILENODEMAP_FORKNUM, + segno: 0, + lsn: 0, + }); + } + + let (relnode, forknum, segno, lsn) = parse_filename(fname)?; + + return Ok(ParsedBaseImageFileName { + spcnode: pg_constants::DEFAULTTABLESPACE_OID, + dbnode, + relnode, + forknum, + segno, + lsn, + }); + } else if let Some(fname) = path.strip_prefix("pg_xact/") { + return Ok(ParsedBaseImageFileName { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: pg_constants::PG_XACT_FORKNUM, + segno: u32::from_str_radix(fname, 10).unwrap(), + lsn: 0, + }); + } else if let Some(fname) = path.strip_prefix("pg_multixact/members/") { + return Ok(ParsedBaseImageFileName { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: pg_constants::PG_MXACT_MEMBERS_FORKNUM, + segno: u32::from_str_radix(fname, 10).unwrap(), + lsn: 0, + }); + } else if let Some(fname) = path.strip_prefix("pg_multixact/offsets/") { + return Ok(ParsedBaseImageFileName { + spcnode: 0, + dbnode: 0, + relnode: 0, + forknum: pg_constants::PG_MXACT_OFFSETS_FORKNUM, + segno: u32::from_str_radix(fname, 10).unwrap(), + lsn: 0, + }); + } else if let Some(_) = path.strip_prefix("pg_tblspc/") { + // TODO + return Err(FilePathError::new("tablespaces not supported")); + } else { + return Err(FilePathError::new("invalid relation data file name")); + } +} + +async fn slurp_base_file( + conf: &PageServerConf, + sys_id: u64, + file_path: String, + parsed: ParsedBaseImageFileName, +) { + info!("slurp_base_file local path {}", file_path); + + let mut data = fs::read(file_path).unwrap(); + + // pg_filenode.map has non-standard size - 512 bytes + // enlarge it to treat as a regular page + if parsed.forknum == pg_constants::PG_FILENODEMAP_FORKNUM { + data.resize(8192, 0); + } + + let data_bytes: &[u8] = &data; + let mut bytes = BytesMut::from(data_bytes).freeze(); + + // FIXME: use constants (BLCKSZ) + let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); + + let pcache = page_cache::get_pagecache(conf, sys_id); + + let reltag = page_cache::RelTag { + spcnode: parsed.spcnode, + dbnode: parsed.dbnode, + relnode: parsed.relnode, + forknum: parsed.forknum as u8, + }; + + while bytes.remaining() >= 8192 { + let tag = page_cache::BufferTag { + spcnode: parsed.spcnode, + dbnode: parsed.dbnode, + relnode: parsed.relnode, + forknum: parsed.forknum as u8, + blknum: blknum, + }; + + pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); + + pcache.relsize_inc(&reltag, Some(blknum)); + blknum += 1; + } +} diff --git a/pageserver/src/restore_s3.rs b/pageserver/src/restore_s3.rs index 0884f17453..253e6e589b 100644 --- a/pageserver/src/restore_s3.rs +++ b/pageserver/src/restore_s3.rs @@ -60,8 +60,8 @@ pub fn restore_main(conf: &PageServerConf) { async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { let backend = Storage { region: Region::Custom { - region: env::var("S3_REGION").unwrap().into(), - endpoint: env::var("S3_ENDPOINT").unwrap().into(), + region: env::var("S3_REGION").unwrap(), + endpoint: env::var("S3_ENDPOINT").unwrap(), }, credentials: Credentials::new( Some(&env::var("S3_ACCESSKEY").unwrap()), @@ -119,7 +119,7 @@ async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { panic!("no base backup found"); } - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); pcache.init_valid_lsn(oldest_lsn); info!("{} files to restore...", slurp_futures.len()); @@ -305,7 +305,7 @@ async fn slurp_base_file( // FIXME: use constants (BLCKSZ) let mut blknum: u32 = parsed.segno * (1024 * 1024 * 1024 / 8192); - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); while bytes.remaining() >= 8192 { let tag = page_cache::BufferTag { @@ -315,7 +315,7 @@ async fn slurp_base_file( relnode: parsed.relnode, forknum: parsed.forknum as u8, }, - blknum: blknum, + blknum, }; pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); diff --git a/pageserver/src/tui.rs b/pageserver/src/tui.rs index 653600b82e..4e35cc76c9 100644 --- a/pageserver/src/tui.rs +++ b/pageserver/src/tui.rs @@ -14,7 +14,6 @@ use tui::text::{Span, Spans, Text}; use tui::widgets::{Block, BorderType, Borders, Paragraph, Widget}; use tui::Terminal; -use slog; use slog::Drain; lazy_static! { @@ -188,6 +187,7 @@ pub fn ui_main<'b>() -> Result<(), Box> { Ok(()) } +#[allow(dead_code)] struct LogWidget<'a> { logger: &'a TuiLogger, title: &'a str, diff --git a/pageserver/src/tui_event.rs b/pageserver/src/tui_event.rs index c0e25da864..5546b680ee 100644 --- a/pageserver/src/tui_event.rs +++ b/pageserver/src/tui_event.rs @@ -10,7 +10,6 @@ use std::time::Duration; use termion::event::Key; use termion::input::TermRead; -#[allow(dead_code)] pub enum Event { Input(I), Tick, diff --git a/pageserver/src/tui_logger.rs b/pageserver/src/tui_logger.rs index 0b49dcc388..e59ce15a56 100644 --- a/pageserver/src/tui_logger.rs +++ b/pageserver/src/tui_logger.rs @@ -10,7 +10,6 @@ // use chrono::offset::Local; use chrono::DateTime; -use slog; use slog::{Drain, Level, OwnedKVList, Record}; use slog_async::AsyncRecord; use std::collections::VecDeque; @@ -81,7 +80,7 @@ impl<'b> TuiLoggerWidget<'b> { style_trace: None, style_info: None, show_module: true, - logger: logger, + logger, } } } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index fa79403563..33d8f09693 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -44,6 +44,7 @@ struct XLogLongPageHeaderData { #[allow(non_upper_case_globals)] const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4; +#[allow(dead_code)] pub struct WalStreamDecoder { lsn: u64, @@ -63,7 +64,7 @@ pub struct WalStreamDecoder { impl WalStreamDecoder { pub fn new(lsn: u64) -> WalStreamDecoder { WalStreamDecoder { - lsn: lsn, + lsn, startlsn: 0, contlen: 0, @@ -253,6 +254,7 @@ const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */ const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */ +#[allow(dead_code)] pub struct DecodedBkpBlock { /* Is this block ref in use? */ //in_use: bool, diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 129a80dd32..c11a00fc78 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -1,28 +1,30 @@ -// -// WAL receiver -// -// The WAL receiver connects to the WAL safekeeper service, and streams WAL. -// For each WAL record, it decodes the record to figure out which data blocks -// the record affects, and adds the records to the page cache. -// -use log::*; - -use tokio::runtime; -use tokio::time::{sleep, Duration}; -use tokio_stream::StreamExt; +//! +//! WAL receiver +//! +//! The WAL receiver connects to the WAL safekeeper service, and streams WAL. +//! For each WAL record, it decodes the record to figure out which data blocks +//! the record affects, and adds the records to the page cache. +//! use crate::page_cache; use crate::page_cache::{BufferTag, RelTag}; -use crate::waldecoder::*; +use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; - +use anyhow::Error; +use log::*; use postgres_protocol::message::backend::ReplicationMessage; -use tokio_postgres::{connect_replication, Error, NoTls, ReplicationMode}; +use postgres_types::PgLsn; +use std::str::FromStr; +use tokio::runtime; +use tokio::time::{sleep, Duration}; +use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; +use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; +use tokio_stream::StreamExt; // // This is the entry point for the WAL receiver thread. // -pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) { +pub fn thread_main(conf: &PageServerConf, wal_producer_connstr: &str) { info!("WAL receiver thread started: '{}'", wal_producer_connstr); let runtime = runtime::Builder::new_current_thread() @@ -32,31 +34,25 @@ pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) { runtime.block_on(async { loop { - let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await; + let res = walreceiver_main(conf, wal_producer_connstr).await; - // TODO: print/log the error - info!( - "WAL streaming connection failed, retrying in 1 second...: {:?}", - _res - ); - sleep(Duration::from_secs(1)).await; + if let Err(e) = res { + info!( + "WAL streaming connection failed ({}), retrying in 1 second", + e + ); + sleep(Duration::from_secs(1)).await; + } } }); } -async fn walreceiver_main( - conf: PageServerConf, - wal_producer_connstr: &String, -) -> Result<(), Error> { +async fn walreceiver_main(conf: &PageServerConf, wal_producer_connstr: &str) -> Result<(), Error> { // Connect to the database in replication mode. - debug!("connecting to {}...", wal_producer_connstr); - let (mut rclient, connection) = connect_replication( - wal_producer_connstr.as_str(), - NoTls, - ReplicationMode::Physical, - ) - .await?; - debug!("connected!"); + info!("connecting to {:?}", wal_producer_connstr); + let connect_cfg = format!("{} replication=true", wal_producer_connstr); + let (rclient, connection) = tokio_postgres::connect(&connect_cfg, NoTls).await?; + info!("connected!"); // The connection object performs the actual communication with the database, // so spawn it off to run on its own. @@ -66,28 +62,28 @@ async fn walreceiver_main( } }); - let identify_system = rclient.identify_system().await?; - let end_of_wal = u64::from(identify_system.xlogpos()); + let identify = identify_system(&rclient).await?; + info!("{:?}", identify); + let end_of_wal = u64::from(identify.xlogpos); let mut caught_up = false; - let sysid: u64 = identify_system.systemid().parse().unwrap(); - let pcache = page_cache::get_pagecache(conf, sysid); + let pcache = page_cache::get_pagecache(conf, identify.systemid); // // Start streaming the WAL, from where we left off previously. // let mut startpoint = pcache.get_last_valid_lsn(); if startpoint == 0 { - // If we start here with identify_system.xlogpos() we will have race condition with + // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn - // smaller than identify_system.xlogpos(). + // smaller than identify.xlogpos. // // Current procedure for starting postgres will anyway be changed to something // different like having 'initdb' method on a pageserver (or importing some shared // empty database snapshot), so for now I just put start of first segment which // seems to be a valid record. pcache.init_valid_lsn(0x_1_000_000_u64); - startpoint = u64::from(0x_1_000_000_u64); + startpoint = 0x_1_000_000_u64; } else { // There might be some padding after the last full record, skip it. // @@ -105,10 +101,14 @@ async fn walreceiver_main( (end_of_wal >> 32), (end_of_wal & 0xffffffff) ); - let startpoint = tokio_postgres::types::Lsn::from(startpoint); - let mut physical_stream = rclient - .start_physical_replication(None, startpoint, None) - .await?; + + let startpoint = PgLsn::from(startpoint); + let query = format!("START_REPLICATION PHYSICAL {}", startpoint); + let copy_stream = rclient.copy_both_simple::(&query).await?; + + let physical_stream = ReplicationStream::new(copy_stream); + tokio::pin!(physical_stream); + let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); while let Some(replication_message) = physical_stream.next().await { @@ -132,8 +132,7 @@ async fn walreceiver_main( loop { if let Some((lsn, recdata)) = waldecoder.poll_decode() { - let decoded = - crate::waldecoder::decode_wal_record(startlsn, recdata.clone()); + let decoded = decode_wal_record(startlsn, recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in @@ -151,7 +150,7 @@ async fn walreceiver_main( }; let rec = page_cache::WALRecord { - lsn: lsn, + lsn, will_init: blk.will_init || blk.apply_image, truncate: false, rec: recdata.clone(), @@ -209,12 +208,81 @@ async fn walreceiver_main( } } - ReplicationMessage::PrimaryKeepAlive(_keepalive) => { - trace!("received PrimaryKeepAlive"); - // FIXME: Reply, or the connection will time out + ReplicationMessage::PrimaryKeepAlive(keepalive) => { + let wal_end = keepalive.wal_end(); + let timestamp = keepalive.timestamp(); + let reply_requested: bool = keepalive.reply() != 0; + + trace!( + "received PrimaryKeepAlive(wal_end: {}, timestamp: {} reply: {})", + wal_end, + timestamp, + reply_requested, + ); + if reply_requested { + // TODO: More thought should go into what values are sent here. + let last_lsn = PgLsn::from(pcache.get_last_valid_lsn()); + let write_lsn = last_lsn; + let flush_lsn = last_lsn; + let apply_lsn = PgLsn::INVALID; + let ts = PgTimestamp::now()?; + const NO_REPLY: u8 = 0u8; + + physical_stream + .as_mut() + .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY) + .await?; + } } _ => (), } } return Ok(()); } + +/// Data returned from the postgres `IDENTIFY_SYSTEM` command +/// +/// See the [postgres docs] for more details. +/// +/// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html +#[derive(Debug)] +pub struct IdentifySystem { + systemid: u64, + timeline: u32, + xlogpos: PgLsn, + dbname: Option, +} + +/// There was a problem parsing the response to +/// a postgres IDENTIFY_SYSTEM command. +#[derive(Debug, thiserror::Error)] +#[error("IDENTIFY_SYSTEM parse error")] +pub struct IdentifyError; + +/// Run the postgres `IDENTIFY_SYSTEM` command +pub async fn identify_system(client: &tokio_postgres::Client) -> Result { + let query_str = "IDENTIFY_SYSTEM"; + let response = client.simple_query(query_str).await?; + + // get(N) from row, then parse it as some destination type. + fn get_parse(row: &SimpleQueryRow, idx: usize) -> Result + where + T: FromStr, + { + let val = row.get(idx).ok_or(IdentifyError)?; + val.parse::().or(Err(IdentifyError)) + } + + // extract the row contents into an IdentifySystem struct. + // written as a closure so I can use ? for Option here. + if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) { + Ok(IdentifySystem { + systemid: get_parse(first_row, 0)?, + timeline: get_parse(first_row, 1)?, + xlogpos: get_parse(first_row, 2)?, + dbname: get_parse(first_row, 3).ok(), + }) + } else { + Err(IdentifyError)? + } +} diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 713efd9045..b80af79862 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -43,7 +43,7 @@ static TIMEOUT: Duration = Duration::from_secs(20); // // Main entry point for the WAL applicator thread. // -pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { +pub fn wal_redo_main(conf: &PageServerConf, sys_id: u64) { info!("WAL redo thread started {}", sys_id); // We block on waiting for requests on the walredo request channel, but @@ -54,7 +54,7 @@ pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { .build() .unwrap(); - let pcache = page_cache::get_pagecache(conf.clone(), sys_id); + let pcache = page_cache::get_pagecache(conf, sys_id); // Loop forever, handling requests as they come. let walredo_channel_receiver = &pcache.walredo_receiver; @@ -215,7 +215,7 @@ impl WalRedoProcess { tokio::spawn(f_stderr); Ok(WalRedoProcess { - child: child, + child, stdin: RefCell::new(stdin), stdout: RefCell::new(stdout), }) diff --git a/pgbuild.sh b/pgbuild.sh index 0514a5868a..9d4c0baa65 100755 --- a/pgbuild.sh +++ b/pgbuild.sh @@ -10,6 +10,10 @@ # # 2) installs postgres to REPO_ROOT/tmp_install/ # + +# Halt immediately if any command fails +set -e + REPO_ROOT=$(dirname "$0") REPO_ROOT="`( cd \"$REPO_ROOT\" && pwd )`" diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index 76dcd12582..98c63c434f 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -26,13 +26,11 @@ clap = "2.33.0" termion = "1.5.6" tui = "0.14.0" daemonize = "0.4.1" -rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] } +rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", rev="7f15a24ec7daa0a5d9516da706212745f9042818", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } -tokio-postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres-protocol = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } -postgres = { git = "https://github.com/kelvich/rust-postgres", branch = "replication_rebase" } +tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } +postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b66447951d1276a53fb09886539c3fa094" } anyhow = "1.0" crc32c = "0.6.0" - -pageserver = { path = "../pageserver" } diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index 75466bc328..d50467ba49 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -11,10 +11,7 @@ use std::{fs::File, fs::OpenOptions}; use clap::{App, Arg}; -use slog; use slog::Drain; -use slog_scope; -use slog_stdlog; use walkeeper::wal_service; use walkeeper::WalAcceptorConf; @@ -92,7 +89,7 @@ fn main() -> Result<(), io::Error> { fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> { // Initialize logger - let _scope_guard = init_logging(&conf); + let _scope_guard = init_logging(&conf)?; let _log_guard = slog_stdlog::init().unwrap(); // Note: this `info!(...)` macro comes from `log` crate info!("standard logging redirected to slog"); @@ -141,20 +138,24 @@ fn start_wal_acceptor(conf: WalAcceptorConf) -> Result<(), io::Error> { Ok(()) } -fn init_logging(conf: &WalAcceptorConf) -> slog_scope::GlobalLoggerGuard { +fn init_logging(conf: &WalAcceptorConf) -> Result { if conf.daemonize { let log = conf.data_dir.join("wal_acceptor.log"); - let log_file = File::create(log).unwrap_or_else(|_| panic!("Could not create log file")); + let log_file = File::create(&log).map_err(|err| { + // We failed to initialize logging, so we can't log this message with error! + eprintln!("Could not create log file {:?}: {}", log, err); + err + })?; let decorator = slog_term::PlainSyncDecorator::new(log_file); let drain = slog_term::CompactFormat::new(decorator).build(); let drain = std::sync::Mutex::new(drain).fuse(); let logger = slog::Logger::root(drain, slog::o!()); - slog_scope::set_global_logger(logger) + Ok(slog_scope::set_global_logger(logger)) } else { let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); let drain = slog_async::Async::new(drain).chan_size(1000).build().fuse(); let logger = slog::Logger::root(drain, slog::o!()); - return slog_scope::set_global_logger(logger); + Ok(slog_scope::set_global_logger(logger)) } } diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 7dbb0b17f3..7e890cf98a 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -6,7 +6,6 @@ mod pq_protocol; pub mod wal_service; pub mod xlog_utils; -#[allow(dead_code)] #[derive(Debug, Clone)] pub struct WalAcceptorConf { pub data_dir: PathBuf, diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index d70c10ce22..5570781123 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -402,7 +402,7 @@ impl System { }, }; System { - id: id, + id, mutex: Mutex::new(shared_state), cond: Notify::new(), } @@ -989,7 +989,7 @@ impl Connection { }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} Err(e) => { - return Err(e.into()); + return Err(e); } } @@ -1018,12 +1018,19 @@ impl Connection { Ok(opened_file) => file = opened_file, Err(e) => { error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); + return Err(e); } } } } - let send_size = min((end_pos - start_pos) as usize, MAX_SEND_SIZE); + let xlogoff = XLogSegmentOffset(start_pos, wal_seg_size) as usize; + + // How much to read and send in message? We cannot cross the WAL file + // boundary, and we don't want send more than MAX_SEND_SIZE. + let send_size = (end_pos - start_pos) as usize; + let send_size = min(send_size, wal_seg_size - xlogoff); + let send_size = min(send_size, MAX_SEND_SIZE); + let msg_size = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE + send_size; let data_start = LIBPQ_HDR_SIZE + XLOG_HDR_SIZE; let data_end = data_start + send_size; @@ -1130,7 +1137,7 @@ impl Connection { } Err(e) => { error!("Failed to open log file {:?}: {}", &wal_file_path, e); - return Err(e.into()); + return Err(e); } } } diff --git a/zenith/Cargo.toml b/zenith/Cargo.toml new file mode 100644 index 0000000000..2d1f7c922c --- /dev/null +++ b/zenith/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "zenith" +version = "0.1.0" +authors = ["Stas Kelvich "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +clap = "2.33.0" +control_plane = { path = "../control_plane" } diff --git a/zenith/src/main.rs b/zenith/src/main.rs new file mode 100644 index 0000000000..f6690dd8d7 --- /dev/null +++ b/zenith/src/main.rs @@ -0,0 +1,134 @@ +use clap::{App, Arg, ArgMatches, SubCommand}; +use std::error; +use std::process::exit; + +use control_plane::{compute::ComputeControlPlane, local_env, storage}; + +type Result = std::result::Result>; + +fn main() { + let name_arg = Arg::with_name("NAME") + .short("n") + .index(1) + .help("name of this postgres instance") + .required(true); + let matches = App::new("zenith") + .subcommand(SubCommand::with_name("init")) + .subcommand(SubCommand::with_name("start")) + .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("status")) + .subcommand( + SubCommand::with_name("pg") + .about("Manage postgres instances") + .subcommand( + SubCommand::with_name("create"), // .arg(name_arg.clone() + // .required(false) + // .help("name of this postgres instance (will be pgN if omitted)")) + ) + .subcommand(SubCommand::with_name("list")) + .subcommand(SubCommand::with_name("start").arg(name_arg.clone())) + .subcommand(SubCommand::with_name("stop").arg(name_arg.clone())) + .subcommand(SubCommand::with_name("destroy").arg(name_arg.clone())), + ) + .subcommand( + SubCommand::with_name("snapshot") + .about("Manage database snapshots") + .subcommand(SubCommand::with_name("create")) + .subcommand(SubCommand::with_name("start")) + .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("destroy")), + ) + .get_matches(); + + // handle init separately and exit + if let Some("init") = matches.subcommand_name() { + match local_env::init() { + Ok(_) => { + println!("Initialization complete! You may start zenith with 'zenith start' now."); + exit(0); + } + Err(e) => { + eprintln!("Error during init: {}", e); + exit(1); + } + } + } + + // all other commands would need config + let env = match local_env::load_config() { + Ok(conf) => conf, + Err(e) => { + eprintln!("Error loading config from ~/.zenith: {}", e); + exit(1); + } + }; + + match matches.subcommand() { + ("init", Some(_)) => { + panic!() /* Should not happen. Init was handled before */ + } + + ("start", Some(_sub_m)) => { + let pageserver = storage::PageServerNode::from_env(&env); + + if let Err(e) = pageserver.start() { + eprintln!("pageserver start: {}", e); + exit(1); + } + } + + ("stop", Some(_sub_m)) => { + let pageserver = storage::PageServerNode::from_env(&env); + if let Err(e) = pageserver.stop() { + eprintln!("pageserver stop: {}", e); + exit(1); + } + } + + ("status", Some(_sub_m)) => {} + + ("pg", Some(pg_match)) => { + if let Err(e) = handle_pg(pg_match, &env) { + eprintln!("pg operation failed: {}", e); + exit(1); + } + } + _ => {} + } +} + +fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let mut cplane = ComputeControlPlane::load(env.clone())?; + + match pg_match.subcommand() { + ("create", Some(_sub_m)) => { + cplane.new_node()?; + } + ("list", Some(_sub_m)) => { + println!("NODE\tADDRESS\tSTATUS"); + for (node_name, node) in cplane.nodes.iter() { + println!("{}\t{}\t{}", node_name, node.address, node.status()); + } + } + ("start", Some(sub_m)) => { + let name = sub_m.value_of("NAME").unwrap(); + let node = cplane + .nodes + .get(name) + .ok_or(format!("postgres {} is not found", name))?; + node.start()?; + } + ("stop", Some(sub_m)) => { + let name = sub_m.value_of("NAME").unwrap(); + let node = cplane + .nodes + .get(name) + .ok_or(format!("postgres {} is not found", name))?; + node.stop()?; + } + + _ => {} + } + + Ok(()) +}