diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 119e0fd56d..2ee377e0fb 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -103,8 +103,7 @@ pub fn init(remote_pageserver: Option<&str>) -> Result<()> { let conf = if let Some(addr) = remote_pageserver { // check that addr is parsable - let _uri = Url::parse(addr) - .map_err(|e| anyhow!("{}: {}", addr, e))?; + let _uri = Url::parse(addr).map_err(|e| anyhow!("{}: {}", addr, e))?; LocalEnv { pageserver_connstring: format!("postgresql://{}/", addr), diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index fc45911bdd..92e9eafaba 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,18 +1,18 @@ +use std::collections::HashMap; use std::net::{SocketAddr, TcpStream}; use std::path::PathBuf; use std::process::Command; use std::thread; use std::time::Duration; -use std::collections::HashMap; use anyhow::{anyhow, bail, Result}; use nix::sys::signal::{kill, Signal}; use nix::unistd::Pid; use postgres::{Client, NoTls}; -use pageserver::branches::BranchInfo; use crate::local_env::LocalEnv; use crate::read_pidfile; +use pageserver::branches::BranchInfo; // // Control routines for pageserver. @@ -43,10 +43,14 @@ impl PageServerNode { pub fn init(&self) -> Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); - let status = cmd.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()]) + let status = cmd + .args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()]) .env_clear() .env("RUST_BACKTRACE", "1") - .env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap()) + .env( + "POSTGRES_DISTRIB_DIR", + self.env.pg_distrib_dir.to_str().unwrap(), + ) .env("ZENITH_REPO_DIR", self.repo_path()) .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) @@ -77,15 +81,23 @@ impl PageServerNode { ); let mut cmd = Command::new(self.env.pageserver_bin()?); - cmd.args(&["-l", self.address().to_string().as_str(), "-D", self.repo_path().to_str().unwrap()]) - .arg("-d") - .env_clear() - .env("RUST_BACKTRACE", "1") - .env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap()) - .env("ZENITH_REPO_DIR", self.repo_path()) - .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); + cmd.args(&[ + "-l", + self.address().to_string().as_str(), + "-D", + self.repo_path().to_str().unwrap(), + ]) + .arg("-d") + .env_clear() + .env("RUST_BACKTRACE", "1") + .env( + "POSTGRES_DISTRIB_DIR", + self.env.pg_distrib_dir.to_str().unwrap(), + ) + .env("ZENITH_REPO_DIR", self.repo_path()) + .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); if !cmd.status()?.success() { bail!( @@ -183,8 +195,13 @@ impl PageServerNode { .flatten() .ok_or_else(|| anyhow!("missing branch"))?; - let res: BranchInfo = serde_json::from_str(branch_json) - .map_err(|e| anyhow!("failed to parse branch_create response: {}: {}", branch_json, e))?; + let res: BranchInfo = serde_json::from_str(branch_json).map_err(|e| { + anyhow!( + "failed to parse branch_create response: {}: {}", + branch_json, + e + ) + })?; Ok(res) } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index c062e2dfb3..77cc05629f 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -30,7 +30,8 @@ pub fn create_test_env(testname: &str) -> LocalEnv { // Remove remnants of old test repo let _ = fs::remove_dir_all(&base_path); - fs::create_dir_all(&base_path).expect(format!("could not create directory for {}", base_path_str).as_str()); + fs::create_dir_all(&base_path) + .expect(format!("could not create directory for {}", base_path_str).as_str()); let pgdatadirs_path = base_path.join("pgdatadirs"); fs::create_dir(&pgdatadirs_path) @@ -107,7 +108,7 @@ impl TestStorageControlPlane { data_dir: datadir_base.join(format!("wal_acceptor_{}", i)), systemid, env: local_env.clone(), - pass_to_pageserver: i == 0 + pass_to_pageserver: i == 0, }; wal_acceptor.init(); wal_acceptor.start(); @@ -343,15 +344,21 @@ impl WalAcceptorNode { [].to_vec() }; - let status = Command::new(self.env.zenith_distrib_dir.as_ref().unwrap().join("wal_acceptor")) - .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-l", self.listen.to_string().as_str()]) - .args(&["--systemid", self.systemid.to_string().as_str()]) - .args(&ps_arg) - .arg("-d") - .arg("-n") - .status() - .expect("failed to start wal_acceptor"); + let status = Command::new( + self.env + .zenith_distrib_dir + .as_ref() + .unwrap() + .join("wal_acceptor"), + ) + .args(&["-D", self.data_dir.to_str().unwrap()]) + .args(&["-l", self.listen.to_string().as_str()]) + .args(&["--systemid", self.systemid.to_string().as_str()]) + .args(&ps_arg) + .arg("-d") + .arg("-n") + .status() + .expect("failed to start wal_acceptor"); if !status.success() { panic!("wal_acceptor start failed"); diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 16a94286e0..77d270b23a 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -1,8 +1,8 @@ use control_plane::compute::ComputeControlPlane; use integration_tests; -use integration_tests::TestStorageControlPlane; use integration_tests::PostgresNodeExt; +use integration_tests::TestStorageControlPlane; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 8b8a193dcb..469cd9e11c 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -6,8 +6,8 @@ use std::{thread, time}; use control_plane::compute::ComputeControlPlane; use integration_tests; -use integration_tests::TestStorageControlPlane; use integration_tests::PostgresNodeExt; +use integration_tests::TestStorageControlPlane; const DOWNTIME: u64 = 2; @@ -101,7 +101,10 @@ fn test_many_timelines() { for i in 1..N_TIMELINES { let branchname = format!("experimental{}", i); - storage_cplane.pageserver.branch_create(&branchname, "main").unwrap(); + storage_cplane + .pageserver + .branch_create(&branchname, "main") + .unwrap(); timelines.push(branchname); } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 431dcdb37f..1b5a3fd2e4 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -5,11 +5,11 @@ use log::*; use parse_duration::parse; use std::fs::{File, OpenOptions}; -use std::{env, path::PathBuf}; use std::io; use std::process::exit; use std::thread; use std::time::Duration; +use std::{env, path::PathBuf}; use anyhow::{Context, Result}; use clap::{App, Arg}; @@ -17,7 +17,7 @@ use daemonize::Daemonize; use slog::{Drain, FnValue}; -use pageserver::{page_cache, page_service, tui, PageServerConf, branches}; +use pageserver::{branches, page_cache, page_service, tui, PageServerConf}; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_PERIOD_SEC: u64 = 10; @@ -199,7 +199,10 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { // change into the repository directory. In daemon mode, Daemonize // does this for us. std::env::set_current_dir(&conf.workdir)?; - info!("Changed current directory to repository in {:?}", &conf.workdir); + info!( + "Changed current directory to repository in {:?}", + &conf.workdir + ); } let mut threads = Vec::new(); diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 8c1973d2c0..e4562006c9 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -4,19 +4,25 @@ // TODO: move all paths construction to conf impl // -use anyhow::{Context, Result, anyhow}; +use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use fs::File; +use fs_extra; use postgres_ffi::xlog_utils; use rand::Rng; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr}; -use fs_extra; -use fs::File; -use std::io::Read; use std::env; +use std::io::Read; +use std::{ + collections::HashMap, + fs, + path::{Path, PathBuf}, + process::{Command, Stdio}, + str::FromStr, +}; use zenith_utils::lsn::Lsn; -use crate::{repository::Repository, ZTimelineId, PageServerConf}; +use crate::{repository::Repository, PageServerConf, ZTimelineId}; #[derive(Serialize, Deserialize, Clone)] pub struct BranchInfo { @@ -71,10 +77,7 @@ pub fn init_repo(conf: &PageServerConf) -> Result<()> { .arg("--no-instructions") .env_clear() .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) - .env( - "DYLD_LIBRARY_PATH", - conf.pg_lib_dir().to_str().unwrap(), - ) + .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .stdout(Stdio::null()) .status() .with_context(|| "failed to execute initdb")?; @@ -176,7 +179,11 @@ pub(crate) fn get_system_id(conf: &PageServerConf) -> Result { Ok(controlfile.system_identifier) } -pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_str: &str) -> Result { +pub(crate) fn create_branch( + conf: &PageServerConf, + branchname: &str, + startpoint_str: &str, +) -> Result { if conf.branch_path(&branchname).exists() { anyhow::bail!("branch {} already exists", branchname); } @@ -208,7 +215,7 @@ pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_ &oldtimelinedir.join("wal"), &newtimelinedir.join("wal"), startpoint.lsn, - 16 * 1024 * 1024 // FIXME: assume default WAL segment size + 16 * 1024 * 1024, // FIXME: assume default WAL segment size )?; Ok(BranchInfo { @@ -243,8 +250,9 @@ fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { let lsn: Option; if let Some(lsnstr) = strings.next() { - lsn = Some(Lsn::from_str(lsnstr) - .with_context(|| "invalid LSN in point-in-time specification")?); + lsn = Some( + Lsn::from_str(lsnstr).with_context(|| "invalid LSN in point-in-time specification")?, + ); } else { lsn = None } @@ -337,8 +345,7 @@ fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Resu /// If the given LSN is in the middle of a segment, the last segment containing it /// is written out as .partial, and padded with zeros. /// -fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{ - +fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()> { let last_segno = upto.segment_number(wal_seg_size); let last_segoff = upto.segment_offset(wal_seg_size); @@ -349,7 +356,7 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Res // Check if the filename looks like an xlog file, or a .partial file. if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) { - continue + continue; } let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize); @@ -371,7 +378,10 @@ fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Res std::io::copy(&mut src_file.take(copylen), &mut dst_file)?; if copylen < wal_seg_size { - std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?; + std::io::copy( + &mut std::io::repeat(0).take(wal_seg_size - copylen), + &mut dst_file, + )?; } } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 11cfd91b29..da5456aeb9 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -32,7 +32,6 @@ pub struct PageServerConf { } impl PageServerConf { - // // Repository paths, relative to workdir. // @@ -50,7 +49,9 @@ impl PageServerConf { } fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf { - std::path::Path::new("timelines").join(timelineid.to_string()).join("snapshots") + std::path::Path::new("timelines") + .join(timelineid.to_string()) + .join("snapshots") } // @@ -66,7 +67,6 @@ impl PageServerConf { } } - /// Zenith Timeline ID is a 128-bit random ID. /// /// Zenith timeline IDs are different from PostgreSQL timeline @@ -127,4 +127,3 @@ impl fmt::Display for ZTimelineId { f.write_str(&hex::encode(self.0)) } } - diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6ded2846a6..16dc4023ce 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -24,13 +24,13 @@ use std::time::Duration; use zenith_utils::lsn::Lsn; use crate::basebackup; +use crate::branches; use crate::page_cache; use crate::repository::{BufferTag, RelTag}; use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; use crate::ZTimelineId; -use crate::branches; #[derive(Debug)] enum FeMessage { @@ -691,7 +691,6 @@ impl Connection { self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message(&BeMessage::ReadyForQuery)?; - } else if query_string.starts_with(b"branch_create ") { let query_str = String::from_utf8(query_string.to_vec())?; let err = || anyhow!("invalid branch_create: '{}'", query_str); @@ -699,9 +698,7 @@ impl Connection { // branch_create // TODO lazy static let re = Regex::new(r"^branch_create (\w+) ([\w@\\]+)[\r\n\s]*;?$").unwrap(); - let caps = re - .captures(&query_str) - .ok_or_else(err)?; + let caps = re.captures(&query_str).ok_or_else(err)?; let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str()); let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str()); @@ -731,10 +728,12 @@ impl Connection { // on connect self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message(&BeMessage::ReadyForQuery)?; - } else if query_string.to_ascii_lowercase().starts_with(b"identify_system") { + } else if query_string + .to_ascii_lowercase() + .starts_with(b"identify_system") + { // TODO: match postgres response formarmat for 'identify_system' - let system_id = crate::branches::get_system_id(&self.conf)? - .to_string(); + let system_id = crate::branches::get_system_id(&self.conf)?.to_string(); self.write_message_noflush(&BeMessage::RowDescription)?; self.write_message_noflush(&BeMessage::DataRow(Bytes::from(system_id)))?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index aca2b7dd8f..5af2899893 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -287,11 +287,11 @@ mod tests { use super::*; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::PageServerConf; + use std::env; use std::fs; use std::path::Path; use std::str::FromStr; use std::time::Duration; - use std::env; fn get_test_conf() -> PageServerConf { PageServerConf { diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index 8bc59ca3e0..665803829a 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -156,10 +156,7 @@ impl CacheEntryContent { } impl RocksRepository { - pub fn new( - conf: &PageServerConf, - walredo_mgr: Arc, - ) -> RocksRepository { + pub fn new(conf: &PageServerConf, walredo_mgr: Arc) -> RocksRepository { RocksRepository { conf: conf.clone(), timelines: Mutex::new(HashMap::new()), @@ -185,8 +182,7 @@ impl Repository for RocksRepository { match timelines.get(&timelineid) { Some(timeline) => Ok(timeline.clone()), None => { - let timeline = - RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone()); + let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone()); restore_timeline(&self.conf, &timeline, timelineid)?; diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 10bb192359..a847ad888e 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -1,13 +1,13 @@ +use anyhow::Result; +use anyhow::{anyhow, Context}; +use clap::{App, Arg, ArgMatches, SubCommand}; use std::collections::HashMap; use std::process::exit; -use anyhow::{Context, anyhow}; -use anyhow::Result; -use clap::{App, Arg, ArgMatches, SubCommand}; -use control_plane::local_env; use control_plane::compute::ComputeControlPlane; +use control_plane::local_env; use control_plane::storage::PageServerNode; -use pageserver::{ZTimelineId, branches::BranchInfo}; +use pageserver::{branches::BranchInfo, ZTimelineId}; use zenith_utils::lsn::Lsn; // Main entry point for the 'zenith' CLI utility @@ -33,7 +33,7 @@ fn main() -> Result<()> { Arg::with_name("remote-pageserver") .long("remote-pageserver") .required(false) - .value_name("pageserver-url") + .value_name("pageserver-url"), ), ) .subcommand( @@ -66,8 +66,7 @@ fn main() -> Result<()> { // Create config file if let ("init", Some(sub_args)) = matches.subcommand() { let pageserver_uri = sub_args.value_of("pageserver-url"); - local_env::init(pageserver_uri) - .with_context(|| "Failed to create cofig file")?; + local_env::init(pageserver_uri).with_context(|| "Failed to create cofig file")?; } // all other commands would need config @@ -91,7 +90,11 @@ fn main() -> Result<()> { if let Some(branchname) = sub_args.value_of("branchname") { if let Some(startpoint_str) = sub_args.value_of("start-point") { let branch = pageserver.branch_create(branchname, startpoint_str)?; - println!("Created branch '{}' at {:?}", branch.name, branch.latest_valid_lsn.unwrap_or(Lsn(0))); + println!( + "Created branch '{}' at {:?}", + branch.name, + branch.latest_valid_lsn.unwrap_or(Lsn(0)) + ); } else { panic!("Missing start-point"); }