diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 10e75ec5cb..205a1889e2 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,15 +1,15 @@ -use std::{collections::BTreeMap, path::PathBuf}; -use std::sync::Arc; -use std::fs::{self, OpenOptions}; -use std::process::{Command, Stdio}; -use std::fs::File; -use std::time::Duration; -use std::{io::Write, net::SocketAddr}; use std::error; +use std::fs::File; +use std::fs::{self, OpenOptions}; use std::net::TcpStream; +use std::process::{Command, Stdio}; +use std::sync::Arc; +use std::time::Duration; +use std::{collections::BTreeMap, path::PathBuf}; +use std::{io::Write, net::SocketAddr}; -use postgres::{Client, NoTls}; use lazy_static::lazy_static; +use postgres::{Client, NoTls}; use regex::Regex; use crate::local_env::{self, LocalEnv}; @@ -28,7 +28,6 @@ pub struct ComputeControlPlane { } impl ComputeControlPlane { - // Load current nodes with ports from data directories on disk pub fn load(env: LocalEnv) -> Result { // TODO: since pageserver do not have config file yet we believe here that @@ -36,11 +35,17 @@ impl ComputeControlPlane { let pageserver = Arc::new(PageServerNode::from_env(&env)); let nodes: Result> = fs::read_dir(env.compute_dir()) - .map_err(|e| format!("failed to list {}: {}", env.compute_dir().to_str().unwrap(), e))? + .map_err(|e| { + format!( + "failed to list {}: {}", + env.compute_dir().to_str().unwrap(), + e + ) + })? .into_iter() .map(|f| { PostgresNode::from_dir_entry(f?, &env, &pageserver) - .map(|node| (node.name.clone(), Arc::new(node)) ) + .map(|node| (node.name.clone(), Arc::new(node))) }) .collect(); let nodes = nodes?; @@ -49,12 +54,13 @@ impl ComputeControlPlane { base_port: 55431, pageserver, nodes, - env + env, }) } fn get_port(&mut self) -> u16 { - 1 + self.nodes + 1 + self + .nodes .iter() .map(|(_name, node)| node.address.port()) .max() @@ -79,7 +85,7 @@ impl ComputeControlPlane { address: SocketAddr::new("127.0.0.1".parse().unwrap(), self.get_port()), env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), - is_test + is_test, }); node.init_vanilla()?; self.nodes.insert(node.name.clone(), Arc::clone(&node)); @@ -92,8 +98,10 @@ impl ComputeControlPlane { let node = self.new_vanilla_node(true).unwrap(); // Configure that node to take pages from pageserver - node.append_conf("postgresql.conf", - format!("page_server_connstring = 'host={} port={}'\n", + node.append_conf( + "postgresql.conf", + format!( + "page_server_connstring = 'host={} port={}'\n", addr.ip(), addr.port() ) @@ -119,8 +127,10 @@ impl ComputeControlPlane { let node = self.new_vanilla_node(false)?; // Configure that node to take pages from pageserver - node.append_conf("postgresql.conf", - format!("page_server_connstring = 'host={} port={}'\n", + node.append_conf( + "postgresql.conf", + format!( + "page_server_connstring = 'host={} port={}'\n", addr.ip(), addr.port() ) @@ -142,7 +152,11 @@ pub struct PostgresNode { } impl PostgresNode { - fn from_dir_entry(entry: std::fs::DirEntry, env: &LocalEnv, pageserver: &Arc) -> Result { + fn from_dir_entry( + entry: std::fs::DirEntry, + env: &LocalEnv, + pageserver: &Arc, + ) -> Result { if !entry.file_type()?.is_dir() { let err_msg = format!( "PostgresNode::from_dir_entry failed: '{}' is not a directory", @@ -161,12 +175,18 @@ impl PostgresNode { // find out tcp port in config file let cfg_path = entry.path().join("postgresql.conf"); - let config = fs::read_to_string(cfg_path.clone()) - .map_err(|e| { - format!("failed to read config file in {}: {}", cfg_path.to_str().unwrap(), e) - })?; + let config = fs::read_to_string(cfg_path.clone()).map_err(|e| { + format!( + "failed to read config file in {}: {}", + cfg_path.to_str().unwrap(), + e + ) + })?; - let err_msg = format!("failed to find port definition in config file {}", cfg_path.to_str().unwrap()); + let err_msg = format!( + "failed to find port definition in config file {}", + cfg_path.to_str().unwrap() + ); let port: u16 = CONF_PORT_RE .captures(config.as_str()) .ok_or(err_msg.clone() + " 1")? @@ -184,7 +204,7 @@ impl PostgresNode { name, env: env.clone(), pageserver: Arc::clone(pageserver), - is_test: false + is_test: false, }) } @@ -221,7 +241,8 @@ impl PostgresNode { // listen for selected port self.append_conf( "postgresql.conf", - format!("max_wal_senders = 10\n\ + format!( + "max_wal_senders = 10\n\ max_replication_slots = 10\n\ hot_standby = on\n\ shared_buffers = 1MB\n\ diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index 2c2857cb64..a49d39150a 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -7,6 +7,6 @@ // local installations. // -pub mod local_env; pub mod compute; +pub mod local_env; pub mod storage; diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 265d28629e..03cf982aa8 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -123,14 +123,13 @@ pub fn init() -> Result<()> { let data_dir = cargo_path.join("tmp_check_cli"); for &dir in &["compute", "pageserver"] { - fs::create_dir_all(data_dir.join(dir)) - .map_err(|e| { - format!( - "Failed to create directory in '{}': {}", - data_dir.to_str().unwrap(), - e - ) - })?; + fs::create_dir_all(data_dir.join(dir)).map_err(|e| { + format!( + "Failed to create directory in '{}': {}", + data_dir.to_str().unwrap(), + e + ) + })?; } // write config @@ -162,8 +161,7 @@ pub fn load_config() -> Result { // load and parse file let config = fs::read_to_string(cfg_path)?; - toml::from_str(config.as_str()) - .map_err(|e| e.into()) + toml::from_str(config.as_str()).map_err(|e| e.into()) } // local env for tests diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 3cbd3945ab..4dc214840c 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,19 +1,19 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::error; use std::fs; use std::io; -use std::process::Command; +use std::net::SocketAddr; use std::net::TcpStream; +use std::path::{Path, PathBuf}; +use std::process::Command; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::thread; use std::time::Duration; -use std::path::{Path, PathBuf}; -use std::net::SocketAddr; -use std::error; use postgres::{Client, NoTls}; +use crate::compute::PostgresNode; use crate::local_env::{self, LocalEnv}; -use crate::compute::{PostgresNode}; type Result = std::result::Result>; @@ -127,7 +127,7 @@ impl PageServerNode { pub fn address(&self) -> SocketAddr { match self.listen_address { Some(addr) => addr, - None => "127.0.0.1:64000".parse().unwrap() + None => "127.0.0.1:64000".parse().unwrap(), } } @@ -140,10 +140,7 @@ impl PageServerNode { let status = Command::new(self.env.zenith_distrib_dir.join("pageserver")) // XXX -> method .args(&["-D", self.env.pageserver_data_dir().to_str().unwrap()]) - .args(&[ - "-l", - self.address().to_string().as_str(), - ]) + .args(&["-l", self.address().to_string().as_str()]) .arg("-d") .arg("--skip-recovery") .env_clear() @@ -152,10 +149,10 @@ impl PageServerNode { .status()?; if !status.success() { - return Err(Box::::from( - format!("Pageserver failed to start. See '{}' for details.", - self.env.pageserver_log().to_str().unwrap()) - )); + return Err(Box::::from(format!( + "Pageserver failed to start. See '{}' for details.", + self.env.pageserver_log().to_str().unwrap() + ))); } else { return Ok(()); } @@ -172,8 +169,8 @@ impl PageServerNode { .expect("failed to execute kill"); if !status.success() { - return Err(Box::::from( - format!("Failed to kill pageserver with pid {}", + return Err(Box::::from(format!( + "Failed to kill pageserver with pid {}", pid ))); } @@ -190,8 +187,8 @@ impl PageServerNode { // ok, we failed to stop pageserver, let's panic if !status.success() { - return Err(Box::::from( - format!("Failed to stop pageserver with pid {}", + return Err(Box::::from(format!( + "Failed to stop pageserver with pid {}", pid ))); } else { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8df35ea534..c514292340 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -277,7 +277,8 @@ impl PageCache { if wait_result.1.timed_out() { return Err(format!( "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, lsn & 0xffff_ffff + lsn >> 32, + lsn & 0xffff_ffff ))?; } } @@ -286,8 +287,11 @@ impl PageCache { } if lsn < shared.first_valid_lsn { - return Err(format!("LSN {:X}/{:X} has already been removed", - lsn >> 32, lsn & 0xffff_ffff))?; + return Err(format!( + "LSN {:X}/{:X} has already been removed", + lsn >> 32, + lsn & 0xffff_ffff + ))?; } let pagecache = &shared.pagecache; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 2eb1ea3a57..685e771f4a 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -160,9 +160,11 @@ impl WalRedoProcess { .expect("failed to execute initdb"); if !initdb.status.success() { - panic!("initdb failed: {}\nstderr:\n{}", - std::str::from_utf8(&initdb.stdout).unwrap(), - std::str::from_utf8(&initdb.stderr).unwrap()); + panic!( + "initdb failed: {}\nstderr:\n{}", + std::str::from_utf8(&initdb.stdout).unwrap(), + std::str::from_utf8(&initdb.stderr).unwrap() + ); } // Start postgres itself diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 3c3785b748..f6690dd8d7 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -1,6 +1,6 @@ -use clap::{App, ArgMatches, SubCommand, Arg}; -use std::process::exit; +use clap::{App, Arg, ArgMatches, SubCommand}; use std::error; +use std::process::exit; use control_plane::{compute::ComputeControlPlane, local_env, storage}; @@ -20,18 +20,15 @@ fn main() { .subcommand( SubCommand::with_name("pg") .about("Manage postgres instances") - .subcommand(SubCommand::with_name("create") - // .arg(name_arg.clone() - // .required(false) - // .help("name of this postgres instance (will be pgN if omitted)")) - ) + .subcommand( + SubCommand::with_name("create"), // .arg(name_arg.clone() + // .required(false) + // .help("name of this postgres instance (will be pgN if omitted)")) + ) .subcommand(SubCommand::with_name("list")) - .subcommand(SubCommand::with_name("start") - .arg(name_arg.clone())) - .subcommand(SubCommand::with_name("stop") - .arg(name_arg.clone())) - .subcommand(SubCommand::with_name("destroy") - .arg(name_arg.clone())) + .subcommand(SubCommand::with_name("start").arg(name_arg.clone())) + .subcommand(SubCommand::with_name("stop").arg(name_arg.clone())) + .subcommand(SubCommand::with_name("destroy").arg(name_arg.clone())), ) .subcommand( SubCommand::with_name("snapshot") @@ -91,7 +88,7 @@ fn main() { ("status", Some(_sub_m)) => {} ("pg", Some(pg_match)) => { - if let Err(e) = handle_pg(pg_match, &env){ + if let Err(e) = handle_pg(pg_match, &env) { eprintln!("pg operation failed: {}", e); exit(1); }