diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index 1efc44dedf..2f8acc4385 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -4,6 +4,7 @@ on: [push] jobs: regression-check: + timeout-minutes: 10 name: run regression test suite runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 04d3842934..921924bed7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,9 +383,11 @@ name = "control_plane" version = "0.1.0" dependencies = [ "home", + "lazy_static", "pageserver", "postgres", "rand 0.8.3", + "regex", "serde", "serde_derive", "tokio-postgres", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index dac78ea356..4699a4da40 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -15,6 +15,8 @@ serde = "" serde_derive = "" toml = "" home = "0.5.3" +lazy_static = "" +regex = "1" pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index bb7a177893..9de4a3b8c6 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,199 +1,133 @@ -use std::path::PathBuf; +use std::{collections::BTreeMap, path::PathBuf}; use std::sync::Arc; use std::fs::{self, OpenOptions}; -use std::process::Command; +use std::process::{Command, Stdio}; use std::fs::File; -use std::{ - io::Write, - net::{IpAddr, Ipv4Addr, SocketAddr}, -}; +use std::time::Duration; +use std::{io::Write, net::SocketAddr}; +use std::error; +use std::net::TcpStream; use postgres::{Client, NoTls}; +use lazy_static::lazy_static; +use regex::Regex; use crate::local_env::{self, LocalEnv}; use crate::storage::{PageServerNode, WalProposerNode}; +type Result = std::result::Result>; + // // ComputeControlPlane // pub struct ComputeControlPlane { - pg_bin_dir: PathBuf, - work_dir: PathBuf, - last_assigned_port: u16, + base_port: u16, pageserver: Arc, - nodes: Vec>, + pub nodes: BTreeMap>, env: LocalEnv, } impl ComputeControlPlane { + + // Load current nodes with ports from data directories on disk + pub fn load(env: LocalEnv) -> Result { + // TODO: since pageserver do not have config file yet we believe here that + // it is running on default port. Change that when pageserver will have config. + let pageserver = Arc::new(PageServerNode::from_env(&env)); + + let nodes: Result> = fs::read_dir(env.compute_dir()) + .map_err(|e| format!("failed to list {}: {}", env.compute_dir().to_str().unwrap(), e))? + .into_iter() + .map(|f| { + PostgresNode::from_dir_entry(f?, &env, &pageserver) + .map(|node| (node.name.clone(), Arc::new(node)) ) + }) + .collect(); + let nodes = nodes?; + + Ok(ComputeControlPlane { + base_port: 55431, + pageserver, + nodes, + env + }) + } + + fn get_port(&mut self) -> u16 { + 1 + self.nodes + .iter() + .map(|(_name, node)| node.address.port()) + .max() + .unwrap_or(self.base_port) + } + pub fn local(pageserver: &Arc) -> ComputeControlPlane { let env = local_env::test_env(); ComputeControlPlane { - pg_bin_dir: env.pg_bin_dir(), - work_dir: env.data_dir.clone(), - last_assigned_port: 65431, + base_port: 65431, pageserver: Arc::clone(pageserver), - nodes: Vec::new(), + nodes: BTreeMap::new(), env: env.clone(), } } - // TODO: check port availability and - fn get_port(&mut self) -> u16 { - let port = self.last_assigned_port + 1; - self.last_assigned_port += 1; - port - } - - pub fn new_vanilla_node<'a>(&mut self) -> &Arc { + fn new_vanilla_node(&mut self, is_test: bool) -> Result> { // allocate new node entry with generated port - let node_id = self.nodes.len() + 1; - let node = PostgresNode { - _node_id: node_id, - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.get_port()), - pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), + let node_id = self.nodes.len() as u32 + 1; + let node = Arc::new(PostgresNode { + name: format!("pg{}", node_id), + address: SocketAddr::new("127.0.0.1".parse().unwrap(), self.get_port()), env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), - }; - self.nodes.push(Arc::new(node)); - let node = self.nodes.last().unwrap(); + is_test + }); + node.init_vanilla()?; + self.nodes.insert(node.name.clone(), Arc::clone(&node)); - println!( - "Creating new postgres: path={} port={}", - node.pgdata.to_str().unwrap(), - node.address.port() - ); - - // initialize data directory - fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok(); - let initdb_path = self.pg_bin_dir.join("initdb"); - let initdb = Command::new(initdb_path) - .args(&["-D", node.pgdata.to_str().unwrap()]) - .arg("-N") - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("failed to execute initdb"); - - if !initdb.success() { - panic!("initdb failed"); - } - - // // allow local replication connections - // node.append_conf("pg_hba.conf", format!("\ - // host replication all {}/32 sspi include_realm=1 map=regress\n\ - // ", node.ip).as_str()); - - // listen for selected port - node.append_conf( - "postgresql.conf", - format!( - "\ - max_wal_senders = 10\n\ - max_replication_slots = 10\n\ - hot_standby = on\n\ - shared_buffers = 1MB\n\ - max_connections = 100\n\ - wal_level = replica\n\ - listen_addresses = '{address}'\n\ - port = {port}\n\ - ", - address = node.address.ip(), - port = node.address.port() - ) - .as_str(), - ); - - node + Ok(node) } - // Init compute node without files, only datadir structure - // use initdb --compute-node flag and GUC 'computenode_mode' - // to distinguish the node - pub fn new_minimal_node(&mut self) -> &PostgresNode { - // allocate new node entry with generated port - let node_id = self.nodes.len() + 1; - let node = PostgresNode { - _node_id: node_id, - address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), self.get_port()), - pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), - env: self.env.clone(), - pageserver: Arc::clone(&self.pageserver), - }; - self.nodes.push(Arc::new(node)); - let node = self.nodes.last().unwrap(); - - // initialize data directory w/o files - fs::remove_dir_all(node.pgdata.to_str().unwrap()).ok(); - let initdb_path = self.pg_bin_dir.join("initdb"); - println!("initdb_path: {}", initdb_path.to_str().unwrap()); - let initdb = Command::new(initdb_path) - .args(&["-D", node.pgdata.to_str().unwrap()]) - .arg("-N") - .arg("--no-instructions") - .arg("--compute-node") - .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("failed to execute initdb"); - - if !initdb.success() { - panic!("initdb failed"); - } - - // listen for selected port - node.append_conf( - "postgresql.conf", - format!( - "\ - max_wal_senders = 10\n\ - max_replication_slots = 10\n\ - hot_standby = on\n\ - shared_buffers = 1MB\n\ - max_connections = 100\n\ - wal_level = replica\n\ - listen_addresses = '{address}'\n\ - port = {port}\n\ - computenode_mode = true\n\ - ", - address = node.address.ip(), - port = node.address.port() - ) - .as_str(), - ); - - node - } - - pub fn new_node(&mut self) -> Arc { + pub fn new_test_node(&mut self) -> Arc { let addr = self.pageserver.address().clone(); - let node = self.new_vanilla_node(); + let node = self.new_vanilla_node(true).unwrap(); // Configure that node to take pages from pageserver - node.append_conf( - "postgresql.conf", - format!( - "\ - page_server_connstring = 'host={} port={}'\n\ - ", + node.append_conf("postgresql.conf", + format!("page_server_connstring = 'host={} port={}'\n", addr.ip(), addr.port() ) .as_str(), ); - node.clone() + node } - pub fn new_master_node(&mut self) -> Arc { - let node = self.new_vanilla_node(); + pub fn new_test_master_node(&mut self) -> Arc { + let node = self.new_vanilla_node(true).unwrap(); node.append_conf( "postgresql.conf", "synchronous_standby_names = 'safekeeper_proxy'\n", ); - node.clone() + + node + } + + pub fn new_node(&mut self) -> Result> { + let addr = self.pageserver.address().clone(); + let node = self.new_vanilla_node(false)?; + + // Configure that node to take pages from pageserver + node.append_conf("postgresql.conf", + format!("page_server_connstring = 'host={} port={}'\n", + addr.ip(), + addr.port() + ) + .as_str(), + ); + + Ok(node) } } @@ -201,32 +135,144 @@ impl ComputeControlPlane { pub struct PostgresNode { pub address: SocketAddr, - _node_id: usize, - pgdata: PathBuf, + name: String, pub env: LocalEnv, pageserver: Arc, + is_test: bool, } impl PostgresNode { + fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv, pageserver: &Arc) -> Result { + if !entry.file_type()?.is_dir() { + let err_msg = format!( + "PostgresNode::from_dir_entry failed: '{}' is not a directory", + entry.path().to_str().unwrap() + ); + return Err(err_msg.into()); + } + + lazy_static! { + static ref CONF_PORT_RE: Regex = Regex::new(r"(?m)^\s*port\s*=\s*(\d+)\s*$").unwrap(); + } + + // parse data directory name + let fname = entry.file_name(); + let name = fname.to_str().unwrap().to_string(); + + // find out tcp port in config file + let cfg_path = entry.path().join("postgresql.conf"); + let config = fs::read_to_string(cfg_path.clone()) + .map_err(|e| { + format!("failed to read config file in {}: {}", cfg_path.to_str().unwrap(), e) + })?; + + let err_msg = format!("failed to find port definition in config file {}", cfg_path.to_str().unwrap()); + let port: u16 = CONF_PORT_RE + .captures(config.as_str()) + .ok_or(err_msg.clone() + " 1")? + .iter() + .last() + .ok_or(err_msg.clone() + " 3")? + .ok_or(err_msg.clone() + " 3")? + .as_str() + .parse() + .map_err(|e| format!("{}: {}", err_msg, e))?; + + // ok now + Ok(PostgresNode { + address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), + name, + env: env.clone(), + pageserver: Arc::clone(pageserver), + is_test: false + }) + } + + fn init_vanilla(&self) -> Result<()> { + println!( + "Creating new postgres: path={} port={}", + self.pgdata().to_str().unwrap(), + self.address.port() + ); + + // initialize data directory + + if self.is_test { + fs::remove_dir_all(self.pgdata().to_str().unwrap())?; + } + + let initdb_path = self.env.pg_bin_dir().join("initdb"); + let initdb = Command::new(initdb_path) + .args(&["-D", self.pgdata().to_str().unwrap()]) + .arg("-N") + .arg("-A trust") + .arg("--no-instructions") + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .stdout(Stdio::null()) + .status()?; + + if !initdb.success() { + return Err("initdb failed".into()); + } + + // listen for selected port + self.append_conf( + "postgresql.conf", + format!("max_wal_senders = 10\n\ + max_replication_slots = 10\n\ + hot_standby = on\n\ + shared_buffers = 1MB\n\ + max_connections = 100\n\ + wal_level = replica\n\ + listen_addresses = '{address}'\n\ + port = {port}\n", + address = self.address.ip(), + port = self.address.port() + ) + .as_str(), + ); + + println!("Database initialized"); + Ok(()) + } + + fn pgdata(&self) -> PathBuf { + self.env.compute_dir().join(self.name.clone()) + } + + pub fn status(&self) -> &str { + let timeout = Duration::from_millis(300); + let has_pidfile = self.pgdata().join("postmaster.pid").exists(); + let can_connect = TcpStream::connect_timeout(&self.address, timeout).is_ok(); + + match (has_pidfile, can_connect) { + (true, true) => "running", + (false, false) => "stopped", + (true, false) => "crashed", + (false, true) => "running, no pidfile", + } + } + pub fn append_conf(&self, config: &str, opts: &str) { OpenOptions::new() .append(true) - .open(self.pgdata.join(config).to_str().unwrap()) + .open(self.pgdata().join(config).to_str().unwrap()) .unwrap() .write_all(opts.as_bytes()) .unwrap(); } - fn pg_ctl(&self, args: &[&str], check_ok: bool) { + fn pg_ctl(&self, args: &[&str]) -> Result<()> { let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); let pg_ctl = Command::new(pg_ctl_path) .args( [ &[ "-D", - self.pgdata.to_str().unwrap(), + self.pgdata().to_str().unwrap(), "-l", - self.pgdata.join("log").to_str().unwrap(), + self.pgdata().join("log").to_str().unwrap(), ], args, ] @@ -234,28 +280,29 @@ impl PostgresNode { ) .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("failed to execute pg_ctl"); + .status()?; - if check_ok && !pg_ctl.success() { - panic!("pg_ctl failed"); + if !pg_ctl.success() { + Err("pg_ctl failed".into()) + } else { + Ok(()) } } - pub fn start(&self) { + pub fn start(&self) -> Result<()> { let _res = self .pageserver .page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); println!("Starting postgres node at '{}'", self.connstr()); - self.pg_ctl(&["start"], true); + self.pg_ctl(&["start"]) } - pub fn restart(&self) { - self.pg_ctl(&["restart"], true); + pub fn restart(&self) -> Result<()> { + self.pg_ctl(&["restart"]) } - pub fn stop(&self) { - self.pg_ctl(&["-m", "immediate", "stop"], true); + pub fn stop(&self) -> Result<()> { + self.pg_ctl(&["-m", "immediate", "stop"]) } pub fn connstr(&self) -> String { @@ -305,13 +352,9 @@ impl PostgresNode { Client::connect(connstring.as_str(), NoTls).unwrap() } - pub fn get_pgdata(&self) -> Option<&str> { - self.pgdata.to_str() - } - /* Create stub controlfile and respective xlog to start computenode */ pub fn setup_controlfile(&self) { - let filepath = format!("{}/global/pg_control", self.pgdata.to_str().unwrap()); + let filepath = format!("{}/global/pg_control", self.pgdata().to_str().unwrap()); { File::create(filepath).unwrap(); @@ -320,7 +363,7 @@ impl PostgresNode { let pg_resetwal_path = self.env.pg_bin_dir().join("pg_resetwal"); let pg_resetwal = Command::new(pg_resetwal_path) - .args(&["-D", self.pgdata.to_str().unwrap()]) + .args(&["-D", self.pgdata().to_str().unwrap()]) .arg("-f") // TODO probably we will have to modify pg_resetwal // .arg("--compute-node") @@ -349,7 +392,6 @@ impl PostgresNode { // TODO pub fn pg_bench() {} - pub fn pg_regress() {} } impl Drop for PostgresNode { @@ -357,7 +399,8 @@ impl Drop for PostgresNode { // XXX: we may detect failed test by setting some flag in catch_unwind() // and checking it here. But let just clean datadirs on start. fn drop(&mut self) { - self.stop(); - // fs::remove_dir_all(self.pgdata.clone()).unwrap(); + if self.is_test { + let _ = self.stop(); + } } } diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index aea884026a..4ec4a82a50 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -36,6 +36,7 @@ pub struct LocalEnv { } impl LocalEnv { + // postgres installation pub fn pg_bin_dir(&self) -> PathBuf { self.pg_distrib_dir.join("bin") } @@ -43,6 +44,7 @@ impl LocalEnv { self.pg_distrib_dir.join("lib") } + // pageserver pub fn pageserver_data_dir(&self) -> PathBuf { self.data_dir.join("pageserver") } @@ -52,6 +54,11 @@ impl LocalEnv { pub fn pageserver_pidfile(&self) -> PathBuf { self.pageserver_data_dir().join("pageserver.pid") } + + // compute nodes + pub fn compute_dir(&self) -> PathBuf { + self.data_dir.join("compute") + } } // @@ -60,14 +67,7 @@ impl LocalEnv { // home crate and cargo uses it. // fn get_home() -> Result { - match home::home_dir() { - Some(path) => Ok(path), - None => { - return Err(Box::::from( - "can not determine home directory path", - )); - } - } + home::home_dir().ok_or("can not determine home directory path".into()) } pub fn init() -> Result<()> { @@ -80,7 +80,7 @@ pub fn init() -> Result<()> { "{} already exists. Perhaps already initialized?", cfg_path.to_str().unwrap() ); - return Err(Box::::from(err_msg)); + return Err(err_msg.into()); } // Now we can run init only from crate directory, so check that current dir is our crate. @@ -89,7 +89,7 @@ pub fn init() -> Result<()> { if !cargo_path.join("pageserver/Cargo.toml").exists() { let err_msg = "Current dirrectory does not look like a zenith repo. \ Please, run 'init' from zenith repo root."; - return Err(Box::::from(err_msg)); + return Err(err_msg.into()); } // ok, now check that expected binaries are present @@ -103,7 +103,7 @@ pub fn init() -> Result<()> { Perhaps './pgbuild.sh' is needed to build it first.", pg_path.to_str().unwrap() ); - return Err(Box::::from(err_msg)); + return Err(err_msg.into()); } // check pageserver @@ -114,26 +114,23 @@ pub fn init() -> Result<()> { "Can't find pageserver binary at {}. Please build it.", pageserver_path.to_str().unwrap() ); - return Err(Box::::from(err_msg)); + return Err(err_msg.into()); } // ok, we are good to go - // create data dir - let data_dir = cargo_path.join("tmp_install"); - match fs::create_dir(data_dir.clone()) { - Ok(_) => {} - Err(e) => match e.kind() { - std::io::ErrorKind::AlreadyExists => {} - _ => { - let err_msg = format!( - "Failed to create data directory in '{}': {}", + // create dirs + let data_dir = cargo_path.join("tmp_check"); + + for &dir in &["compute", "pageserver"] { + fs::create_dir_all(data_dir.join(dir)) + .map_err(|e| { + format!( + "Failed to create directory in '{}': {}", data_dir.to_str().unwrap(), e - ); - return Err(Box::::from(err_msg)); - } - }, + ) + })?; } // write config @@ -160,15 +157,13 @@ pub fn load_config() -> Result { "Zenith config is not found in {}. You need to run 'zenith init' first", cfg_path.to_str().unwrap() ); - return Err(Box::::from(err_msg)); + return Err(err_msg.into()); } // load and parse file let config = fs::read_to_string(cfg_path)?; - match toml::from_str(config.as_str()) { - Ok(cfg) => Ok(cfg), - Err(e) => Err(Box::::from(e)), - } + toml::from_str(config.as_str()) + .map_err(|e| e.into()) } // local env for tests @@ -184,7 +179,7 @@ pub fn test_env() -> LocalEnv { // Find the directory where the binaries were put (i.e. target/debug/) pub fn cargo_bin_dir() -> PathBuf { - let mut pathbuf = std::env::current_exe().ok().unwrap(); + let mut pathbuf = std::env::current_exe().unwrap(); pathbuf.pop(); if pathbuf.ends_with("deps") { diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 2322f4d61e..51f9ba9ddb 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -15,8 +15,8 @@ fn test_redo_cases() { let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres - let node = compute_cplane.new_node(); - node.start(); + let node = compute_cplane.new_test_node(); + node.start().unwrap(); // check basic work with table node.safe_psql( @@ -55,8 +55,8 @@ fn test_regress() { let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // start postgres - let node = compute_cplane.new_node(); - node.start(); + let node = compute_cplane.new_test_node(); + node.start().unwrap(); control_plane::storage::regress_check(&node); } @@ -69,10 +69,10 @@ fn test_pageserver_multitenancy() { let mut compute_cplane = ComputeControlPlane::local(&storage_cplane.pageserver); // Allocate postgres instance, but don't start - let node1 = compute_cplane.new_node(); - let node2 = compute_cplane.new_node(); - node1.start(); - node2.start(); + let node1 = compute_cplane.new_test_node(); + let node2 = compute_cplane.new_test_node(); + node1.start().unwrap(); + node2.start().unwrap(); // check node1 node1.safe_psql( diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index c0fe6c44b9..b826bd32ea 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -16,8 +16,8 @@ fn test_acceptors_normal_work() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_master_node(); - node.start(); + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -54,8 +54,8 @@ fn test_acceptors_restarts() { let mut rng = rand::thread_rng(); // start postgres - let node = compute_cplane.new_master_node(); - node.start(); + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -112,8 +112,8 @@ fn test_acceptors_unavalability() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_master_node(); - node.start(); + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); @@ -187,8 +187,8 @@ fn test_race_conditions() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let node = compute_cplane.new_master_node(); - node.start(); + let node = compute_cplane.new_test_master_node(); + node.start().unwrap(); // start proxy let _proxy = node.start_proxy(wal_acceptors); diff --git a/zenith/src/main.rs b/zenith/src/main.rs index d646a1ac29..3c3785b748 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -1,9 +1,17 @@ -use clap::{App, SubCommand}; +use clap::{App, ArgMatches, SubCommand, Arg}; use std::process::exit; +use std::error; -use control_plane::{local_env, storage}; +use control_plane::{compute::ComputeControlPlane, local_env, storage}; + +type Result = std::result::Result>; fn main() { + let name_arg = Arg::with_name("NAME") + .short("n") + .index(1) + .help("name of this postgres instance") + .required(true); let matches = App::new("zenith") .subcommand(SubCommand::with_name("init")) .subcommand(SubCommand::with_name("start")) @@ -12,10 +20,18 @@ fn main() { .subcommand( SubCommand::with_name("pg") .about("Manage postgres instances") - .subcommand(SubCommand::with_name("create")) - .subcommand(SubCommand::with_name("start")) - .subcommand(SubCommand::with_name("stop")) - .subcommand(SubCommand::with_name("destroy")), + .subcommand(SubCommand::with_name("create") + // .arg(name_arg.clone() + // .required(false) + // .help("name of this postgres instance (will be pgN if omitted)")) + ) + .subcommand(SubCommand::with_name("list")) + .subcommand(SubCommand::with_name("start") + .arg(name_arg.clone())) + .subcommand(SubCommand::with_name("stop") + .arg(name_arg.clone())) + .subcommand(SubCommand::with_name("destroy") + .arg(name_arg.clone())) ) .subcommand( SubCommand::with_name("snapshot") @@ -52,43 +68,70 @@ fn main() { match matches.subcommand() { ("init", Some(_)) => { - panic!() /* init was handled before */ + panic!() /* Should not happen. Init was handled before */ } ("start", Some(_sub_m)) => { let pageserver = storage::PageServerNode::from_env(&env); if let Err(e) = pageserver.start() { - eprintln!("start: {}", e); + eprintln!("pageserver start: {}", e); exit(1); } - - // TODO: check and await actual start - println!("Done!"); } ("stop", Some(_sub_m)) => { let pageserver = storage::PageServerNode::from_env(&env); - if let Err(e) = pageserver.stop() { - eprintln!("stop: {}", e); + eprintln!("pageserver stop: {}", e); exit(1); } - - println!("Done!"); } ("status", Some(_sub_m)) => {} ("pg", Some(pg_match)) => { - match pg_match.subcommand() { - ("start", Some(_sub_m)) => { - println!("xxx: pg start"); - // Ok(()) - } - _ => {} + if let Err(e) = handle_pg(pg_match, &env){ + eprintln!("pg operation failed: {}", e); + exit(1); } } _ => {} } } + +fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { + let mut cplane = ComputeControlPlane::load(env.clone())?; + + match pg_match.subcommand() { + ("create", Some(_sub_m)) => { + cplane.new_node()?; + } + ("list", Some(_sub_m)) => { + println!("NODE\tADDRESS\tSTATUS"); + for (node_name, node) in cplane.nodes.iter() { + println!("{}\t{}\t{}", node_name, node.address, node.status()); + } + } + ("start", Some(sub_m)) => { + let name = sub_m.value_of("NAME").unwrap(); + let node = cplane + .nodes + .get(name) + .ok_or(format!("postgres {} is not found", name))?; + node.start()?; + } + ("stop", Some(sub_m)) => { + let name = sub_m.value_of("NAME").unwrap(); + let node = cplane + .nodes + .get(name) + .ok_or(format!("postgres {} is not found", name))?; + node.stop()?; + } + + _ => {} + } + + Ok(()) +}