From 92e4f4b3b650c05df615922e66c090a165355ac4 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Tue, 20 Apr 2021 17:59:56 -0700 Subject: [PATCH] cargo fmt --- control_plane/src/compute.rs | 92 ++++++++----- control_plane/src/local_env.rs | 133 ++++++++++++------- control_plane/src/storage.rs | 32 +++-- integration_tests/tests/test_pageserver.rs | 16 ++- integration_tests/tests/test_wal_acceptor.rs | 29 ++-- pageserver/src/basebackup.rs | 25 ++-- pageserver/src/bin/pageserver.rs | 46 ++++--- pageserver/src/lib.rs | 8 +- pageserver/src/page_cache.rs | 29 ++-- pageserver/src/page_service.rs | 50 +++---- pageserver/src/restore_local_repo.rs | 91 +++++++++---- pageserver/src/waldecoder.rs | 18 ++- pageserver/src/walreceiver.rs | 62 ++++++--- pageserver/src/walredo.rs | 4 +- postgres_ffi/build.rs | 15 +-- postgres_ffi/src/lib.rs | 23 ++-- walkeeper/src/bin/wal_acceptor.rs | 2 +- walkeeper/src/wal_service.rs | 37 ++++-- zenith/src/main.rs | 67 +++++----- zenith_utils/src/lib.rs | 1 - 20 files changed, 480 insertions(+), 300 deletions(-) diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 91ad2ba805..df59bf439a 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,17 +1,17 @@ use std::fs::{self, OpenOptions}; -use std::os::unix::fs::PermissionsExt; +use std::io::{Read, Write}; +use std::net::SocketAddr; use std::net::TcpStream; +use std::os::unix::fs::PermissionsExt; use std::process::Command; use std::sync::Arc; use std::time::Duration; use std::{collections::BTreeMap, path::PathBuf}; -use std::io::{Read, Write}; -use std::net::SocketAddr; -use regex::Regex; -use lazy_static::lazy_static; -use tar; use anyhow::{Context, Result}; +use lazy_static::lazy_static; +use regex::Regex; +use tar; use postgres::{Client, NoTls}; @@ -75,7 +75,11 @@ impl ComputeControlPlane { /// Connect to a page server, get base backup, and untar it to initialize a /// new data directory - pub fn new_from_page_server(&mut self, is_test: bool, timelineid: ZTimelineId) -> Result> { + pub fn new_from_page_server( + &mut self, + is_test: bool, + timelineid: ZTimelineId, + ) -> Result> { let node_id = self.nodes.len() as u32 + 1; let node = Arc::new(PostgresNode { @@ -84,7 +88,7 @@ impl ComputeControlPlane { env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), is_test, - timelineid + timelineid, }); node.init_from_page_server()?; @@ -157,8 +161,10 @@ impl PostgresNode { pageserver: &Arc, ) -> Result { if !entry.file_type()?.is_dir() { - anyhow::bail!("PostgresNode::from_dir_entry failed: '{}' is not a directory", - entry.path().display()); + anyhow::bail!( + "PostgresNode::from_dir_entry failed: '{}' is not a directory", + entry.path().display() + ); } lazy_static! { @@ -171,9 +177,12 @@ impl PostgresNode { // find out tcp port in config file let cfg_path = entry.path().join("postgresql.conf"); - let config = fs::read_to_string(cfg_path.clone()) - .with_context(|| format!("failed to read config file in {}", - cfg_path.to_str().unwrap()))?; + let config = fs::read_to_string(cfg_path.clone()).with_context(|| { + format!( + "failed to read config file in {}", + cfg_path.to_str().unwrap() + ) + })?; let err_msg = format!( "failed to find port definition in config file {}", @@ -203,14 +212,13 @@ impl PostgresNode { env: env.clone(), pageserver: Arc::clone(pageserver), is_test: false, - timelineid + timelineid, }) } // Connect to a page server, get base backup, and untar it to initialize a // new data directory pub fn init_from_page_server(&self) -> Result<()> { - let pgdata = self.pgdata(); println!( @@ -225,26 +233,37 @@ impl PostgresNode { } let sql = format!("basebackup {}", self.timelineid); - let mut client = self.pageserver.page_server_psql_client().with_context(|| "connecting to page erver failed")?; + let mut client = self + .pageserver + .page_server_psql_client() + .with_context(|| "connecting to page erver failed")?; fs::create_dir_all(&pgdata) .with_context(|| format!("could not create data directory {}", pgdata.display()))?; - fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)) - .with_context(|| format!("could not set permissions in data directory {}", pgdata.display()))?; + fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)).with_context( + || { + format!( + "could not set permissions in data directory {}", + pgdata.display() + ) + }, + )?; // FIXME: The compute node should be able to stream the WAL it needs from the WAL safekeepers or archive. // But that's not implemented yet. For now, 'pg_wal' is included in the base backup tarball that // we receive from the Page Server, so we don't need to create the empty 'pg_wal' directory here. //fs::create_dir_all(pgdata.join("pg_wal"))?; - let mut copyreader = client.copy_out(sql.as_str()) + let mut copyreader = client + .copy_out(sql.as_str()) .with_context(|| "page server 'basebackup' command failed")?; // FIXME: Currently, we slurp the whole tarball into memory, and then extract it, // but we really should do this: //let mut ar = tar::Archive::new(copyreader); let mut buf = vec![]; - copyreader.read_to_end(&mut buf) + copyreader + .read_to_end(&mut buf) .with_context(|| "reading base backup from page server failed")?; let mut ar = tar::Archive::new(buf.as_slice()); ar.unpack(&pgdata) @@ -264,25 +283,28 @@ impl PostgresNode { port = {port}\n", address = self.address.ip(), port = self.address.port() - )); + ), + ); // Never clean up old WAL. TODO: We should use a replication // slot or something proper, to prevent the compute node // from removing WAL that hasn't been streamed to the safekeepr or // page server yet. But this will do for now. - self.append_conf("postgresql.conf", - &format!("wal_keep_size='10TB'\n")); + self.append_conf("postgresql.conf", &format!("wal_keep_size='10TB'\n")); // Connect it to the page server. // Configure that node to take pages from pageserver - self.append_conf("postgresql.conf", - &format!("page_server_connstring = 'host={} port={}'\n\ + self.append_conf( + "postgresql.conf", + &format!( + "page_server_connstring = 'host={} port={}'\n\ zenith_timeline='{}'\n", - self.pageserver.address().ip(), - self.pageserver.address().port(), - self.timelineid - )); + self.pageserver.address().ip(), + self.pageserver.address().port(), + self.timelineid + ), + ); Ok(()) } @@ -331,7 +353,8 @@ impl PostgresNode { ) .env_clear() .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status().with_context(|| "pg_ctl failed")?; + .status() + .with_context(|| "pg_ctl failed")?; if !pg_ctl.success() { anyhow::bail!("pg_ctl failed"); } @@ -406,10 +429,13 @@ impl PostgresNode { .args(&["-h", &self.address.ip().to_string()]) .args(&["-p", &self.address.port().to_string()]) .arg("-v") - .stderr(OpenOptions::new() + .stderr( + OpenOptions::new() .create(true) - .append(true) - .open(self.pgdata().join("safekeeper_proxy.log")).unwrap()) + .append(true) + .open(self.pgdata().join("safekeeper_proxy.log")) + .unwrap(), + ) .spawn() { Ok(child) => WalProposerNode { pid: child.id() }, diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index e2c310f733..adf5d6164c 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,19 +4,19 @@ // Now it also provides init method which acts like a stub for proper installation // script which will use local paths. // +use anyhow::Context; +use bytes::Bytes; +use rand::Rng; use std::env; use std::fs; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; -use bytes::Bytes; -use rand::Rng; -use anyhow::Context; -use serde_derive::{Deserialize, Serialize}; use anyhow::Result; +use serde_derive::{Deserialize, Serialize}; -use walkeeper::xlog_utils; use pageserver::ZTimelineId; +use walkeeper::xlog_utils; // // This data structure represents deserialized zenith config, which should be @@ -67,16 +67,20 @@ pub fn init() -> Result<()> { // check if config already exists let repo_path = zenith_repo_dir(); if repo_path.exists() { - anyhow::bail!("{} already exists. Perhaps already initialized?", - repo_path.to_str().unwrap()); + anyhow::bail!( + "{} already exists. Perhaps already initialized?", + repo_path.to_str().unwrap() + ); } // Now we can run init only from crate directory, so check that current dir is our crate. // Use 'pageserver/Cargo.toml' existence as evidendce. let cargo_path = env::current_dir()?; if !cargo_path.join("pageserver/Cargo.toml").exists() { - anyhow::bail!("Current dirrectory does not look like a zenith repo. \ - Please, run 'init' from zenith repo root."); + anyhow::bail!( + "Current dirrectory does not look like a zenith repo. \ + Please, run 'init' from zenith repo root." + ); } // ok, now check that expected binaries are present @@ -85,17 +89,21 @@ pub fn init() -> Result<()> { let pg_distrib_dir = cargo_path.join("tmp_install"); let pg_path = pg_distrib_dir.join("bin/postgres"); if !pg_path.exists() { - anyhow::bail!("Can't find postres binary at {}. \ + anyhow::bail!( + "Can't find postres binary at {}. \ Perhaps './pgbuild.sh' is needed to build it first.", - pg_path.to_str().unwrap()); + pg_path.to_str().unwrap() + ); } // check pageserver let zenith_distrib_dir = cargo_path.join("target/debug/"); let pageserver_path = zenith_distrib_dir.join("pageserver"); if !pageserver_path.exists() { - anyhow::bail!("Can't find pageserver binary at {}. Please build it.", - pageserver_path.to_str().unwrap()); + anyhow::bail!( + "Can't find pageserver binary at {}. Please build it.", + pageserver_path.to_str().unwrap() + ); } // ok, we are good to go @@ -110,10 +118,10 @@ pub fn init() -> Result<()> { Ok(()) } -pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> -{ +pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { let repopath = &local_env.repo_path; - fs::create_dir(&repopath).with_context(|| format!("could not create directory {}", repopath.display()))?; + fs::create_dir(&repopath) + .with_context(|| format!("could not create directory {}", repopath.display()))?; fs::create_dir(repopath.join("pgdatadirs"))?; fs::create_dir(repopath.join("timelines"))?; fs::create_dir(repopath.join("refs"))?; @@ -132,25 +140,30 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> // the repository. Use "tempdir()" or something? Or just create it directly // in the repo? let initdb_path = local_env.pg_bin_dir().join("initdb"); - let _initdb = - Command::new(initdb_path) + let _initdb = Command::new(initdb_path) .args(&["-D", "tmp"]) - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap()) - .stdout(Stdio::null()) + .arg("--no-instructions") + .env_clear() + .env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap()) + .stdout(Stdio::null()) .status() .with_context(|| "failed to execute initdb")?; println!("initdb succeeded"); // Read control file to extract the LSN and system id - let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read("tmp/global/pg_control")?))?; + let controlfile = + postgres_ffi::decode_pg_control(Bytes::from(fs::read("tmp/global/pg_control")?))?; let systemid = controlfile.system_identifier; let lsn = controlfile.checkPoint; let lsnstr = format!("{:016X}", lsn); // Move the initial WAL file - fs::rename("tmp/pg_wal/000000010000000000000001", timelinedir.join("wal").join("000000010000000000000001.partial"))?; + fs::rename( + "tmp/pg_wal/000000010000000000000001", + timelinedir + .join("wal") + .join("000000010000000000000001.partial"), + )?; println!("moved initial WAL file"); // Remove pg_wal @@ -176,12 +189,14 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> let toml = toml::to_string(&local_env)?; fs::write(repopath.join("config"), toml)?; - println!("new zenith repository was created in {}", repopath.display()); + println!( + "new zenith repository was created in {}", + repopath.display() + ); Ok(()) } - // If control file says the cluster was shut down cleanly, modify it, to mark // it as crashed. That forces crash recovery when you start the cluster. // @@ -192,16 +207,17 @@ pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> // Or better yet, use a less hacky way of putting the cluster into recovery. // Perhaps create a backup label file in the data directory when it's restored. fn force_crash_recovery(datadir: &Path) -> Result<()> { - // Read in the control file let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); - let mut controlfile = postgres_ffi::decode_pg_control( - Bytes::from(fs::read(controlfilepath.as_path())?))?; + let mut controlfile = + postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?; controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; - fs::write(controlfilepath.as_path(), - postgres_ffi::encode_pg_control(controlfile))?; + fs::write( + controlfilepath.as_path(), + postgres_ffi::encode_pg_control(controlfile), + )?; Ok(()) } @@ -209,8 +225,10 @@ fn force_crash_recovery(datadir: &Path) -> Result<()> { // check that config file is present pub fn load_config(repopath: &Path) -> Result { if !repopath.exists() { - anyhow::bail!("Zenith config is not found in {}. You need to run 'zenith init' first", - repopath.to_str().unwrap()); + anyhow::bail!( + "Zenith config is not found in {}. You need to run 'zenith init' first", + repopath.to_str().unwrap() + ); } // load and parse file @@ -222,7 +240,9 @@ pub fn load_config(repopath: &Path) -> Result { pub fn test_env(testname: &str) -> LocalEnv { fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check"); - let repo_path = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_check/").join(testname); + let repo_path = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../tmp_check/") + .join(testname); // Remove remnants of old test repo let _ = fs::remove_dir_all(&repo_path); @@ -252,7 +272,7 @@ pub fn cargo_bin_dir() -> PathBuf { #[derive(Debug, Clone, Copy)] pub struct PointInTime { pub timelineid: ZTimelineId, - pub lsn: u64 + pub lsn: u64, } fn create_timeline(local_env: &LocalEnv, ancestor: Option) -> Result { @@ -270,10 +290,12 @@ fn create_timeline(local_env: &LocalEnv, ancestor: Option) -> Resul fs::create_dir(&timelinedir.join("wal"))?; if let Some(ancestor) = ancestor { - let data = format!("{}@{:X}/{:X}", - ancestor.timelineid, - ancestor.lsn >> 32, - ancestor.lsn & 0xffffffff); + let data = format!( + "{}@{:X}/{:X}", + ancestor.timelineid, + ancestor.lsn >> 32, + ancestor.lsn & 0xffffffff + ); fs::write(timelinedir.join("ancestor"), data)?; } @@ -289,7 +311,11 @@ fn parse_lsn(s: &str) -> std::result::Result { } // Create a new branch in the repository (for the "zenith branch" subcommand) -pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointInTime) -> Result<()> { +pub fn create_branch( + local_env: &LocalEnv, + branchname: &str, + startpoint: PointInTime, +) -> Result<()> { let repopath = &local_env.repo_path; // create a new timeline for it @@ -297,7 +323,10 @@ pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointIn let newtimelinedir = repopath.join("timelines").join(newtli.to_string()); let data = newtli.to_string(); - fs::write(repopath.join("refs").join("branches").join(branchname), data)?; + fs::write( + repopath.join("refs").join("branches").join(branchname), + data, + )?; // Copy the latest snapshot (TODO: before the startpoint) and all WAL // TODO: be smarter and avoid the copying... @@ -305,12 +334,16 @@ pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointIn let copy_opts = fs_extra::dir::CopyOptions::new(); fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; - let oldtimelinedir = repopath.join("timelines").join(startpoint.timelineid.to_string()); + let oldtimelinedir = repopath + .join("timelines") + .join(startpoint.timelineid.to_string()); let mut copy_opts = fs_extra::dir::CopyOptions::new(); copy_opts.content_only = true; - fs_extra::dir::copy(oldtimelinedir.join("wal"), - newtimelinedir.join("wal"), - ©_opts)?; + fs_extra::dir::copy( + oldtimelinedir.join("wal"), + newtimelinedir.join("wal"), + ©_opts, + )?; Ok(()) } @@ -318,7 +351,10 @@ pub fn create_branch(local_env: &LocalEnv, branchname: &str, startpoint: PointIn // Find the end of valid WAL in a wal directory pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result { let repopath = &local_env.repo_path; - let waldir = repopath.join("timelines").join(timeline.to_string()).join("wal"); + let waldir = repopath + .join("timelines") + .join(timeline.to_string()) + .join("wal"); let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true); @@ -329,7 +365,10 @@ pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result Result<(u64, PathBuf)> { let repopath = &local_env.repo_path; - let snapshotsdir = repopath.join("timelines").join(timeline.to_string()).join("snapshots"); + let snapshotsdir = repopath + .join("timelines") + .join(timeline.to_string()) + .join("snapshots"); let paths = fs::read_dir(&snapshotsdir)?; let mut maxsnapshot: u64 = 0; let mut snapshotdir: Option = None; diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 5e5e2bff51..ab6a6d021d 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use std::fs; use std::io; use std::net::SocketAddr; @@ -9,12 +10,11 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; -use anyhow::Result; use postgres::{Client, NoTls}; -use crate::local_env::LocalEnv; use crate::compute::PostgresNode; +use crate::local_env::LocalEnv; use pageserver::ZTimelineId; // @@ -31,10 +31,8 @@ pub struct TestStorageControlPlane { } impl TestStorageControlPlane { - // Peek into the repository, to grab the timeline ID of given branch pub fn get_branch_timeline(&self, branchname: &str) -> ZTimelineId { - let branchpath = self.repopath.join("refs/branches/".to_owned() + branchname); ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap() @@ -171,10 +169,14 @@ impl PageServerNode { } pub fn start(&self) -> Result<()> { - println!("Starting pageserver at '{}' in {}", self.address(), self.repo_path().display()); + println!( + "Starting pageserver at '{}' in {}", + self.address(), + self.repo_path().display() + ); let mut cmd = Command::new(self.env.zenith_distrib_dir.join("pageserver")); - cmd .args(&["-l", self.address().to_string().as_str()]) + cmd.args(&["-l", self.address().to_string().as_str()]) .arg("-d") .env_clear() .env("RUST_BACKTRACE", "1") @@ -183,8 +185,10 @@ impl PageServerNode { .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); if !cmd.status()?.success() { - anyhow::bail!("Pageserver failed to start. See '{}' for details.", - self.repo_path().join("pageserver.log").display()); + anyhow::bail!( + "Pageserver failed to start. See '{}' for details.", + self.repo_path().join("pageserver.log").display() + ); } // It takes a while for the page server to start up. Wait until it is @@ -247,7 +251,9 @@ impl PageServerNode { client.simple_query(sql).unwrap() } - pub fn page_server_psql_client(&self) -> std::result::Result { + pub fn page_server_psql_client( + &self, + ) -> std::result::Result { let connstring = format!( "host={} port={} dbname={} user={}", self.address().ip(), @@ -297,10 +303,10 @@ impl WalAcceptorNode { .args(&["-D", self.data_dir.to_str().unwrap()]) .args(&["-l", self.listen.to_string().as_str()]) .args(&["--systemid", &self.env.systemid.to_string()]) - // Tell page server it can receive WAL from this WAL safekeeper - // FIXME: If there are multiple safekeepers, they will all inform - // the page server. Only the last "notification" will stay in effect. - // So it's pretty random which safekeeper the page server will connect to + // Tell page server it can receive WAL from this WAL safekeeper + // FIXME: If there are multiple safekeepers, they will all inform + // the page server. Only the last "notification" will stay in effect. + // So it's pretty random which safekeeper the page server will connect to .args(&["--pageserver", "127.0.0.1:64000"]) .arg("-d") .arg("-n") diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index 14c328be0e..d595a6a50f 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -1,8 +1,8 @@ // mod control_plane; use control_plane::compute::ComputeControlPlane; -use control_plane::storage::TestStorageControlPlane; use control_plane::local_env; use control_plane::local_env::PointInTime; +use control_plane::storage::TestStorageControlPlane; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff @@ -77,12 +77,18 @@ fn test_pageserver_two_timelines() { let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let maintli = storage_cplane.get_branch_timeline("main"); - + // Create new branch at the end of 'main' let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap(); - local_env::create_branch(&local_env, "experimental", - PointInTime { timelineid: maintli, - lsn: startpoint }).unwrap(); + local_env::create_branch( + &local_env, + "experimental", + PointInTime { + timelineid: maintli, + lsn: startpoint, + }, + ) + .unwrap(); let experimentaltli = storage_cplane.get_branch_timeline("experimental"); // Launch postgres instances on both branches diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 04ca933d74..939648b2ea 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -1,8 +1,8 @@ // Restart acceptors one by one while compute is under the load. use control_plane::compute::ComputeControlPlane; -use control_plane::storage::TestStorageControlPlane; use control_plane::local_env; use control_plane::local_env::PointInTime; +use control_plane::storage::TestStorageControlPlane; use pageserver::ZTimelineId; use rand::Rng; @@ -63,11 +63,18 @@ fn test_many_timelines() { let maintli = storage_cplane.get_branch_timeline("main"); // main branch timelines.push(maintli); let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap(); - for i in 1..N_TIMELINES { // additional branches + for i in 1..N_TIMELINES { + // additional branches let branchname = format!("experimental{}", i); - local_env::create_branch(&local_env, &branchname, - PointInTime { timelineid: maintli, - lsn: startpoint }).unwrap(); + local_env::create_branch( + &local_env, + &branchname, + PointInTime { + timelineid: maintli, + lsn: startpoint, + }, + ) + .unwrap(); let tli = storage_cplane.get_branch_timeline(&branchname); timelines.push(tli); } @@ -75,10 +82,10 @@ fn test_many_timelines() { // start postgres on each timeline let mut nodes = Vec::new(); for tli in timelines { - let node = compute_cplane.new_test_node(tli); - nodes.push(node.clone()); - node.start().unwrap(); - node.start_proxy(&wal_acceptors); + let node = compute_cplane.new_test_node(tli); + nodes.push(node.clone()); + node.start().unwrap(); + node.start_proxy(&wal_acceptors); } // create schema @@ -258,7 +265,9 @@ fn test_race_conditions() { // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; - let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY)); + let storage_cplane = Arc::new(TestStorageControlPlane::fault_tolerant( + &local_env, REDUNDANCY, + )); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 76ca3c3377..d8ed5183a8 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -1,14 +1,17 @@ use log::*; -use tar::{Builder}; +use regex::Regex; use std::fmt; use std::io::Write; +use tar::Builder; use walkdir::WalkDir; -use regex::Regex; use crate::ZTimelineId; - -pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, snapshotlsn: u64) -> Result<(), std::io::Error> { +pub fn send_snapshot_tarball( + write: &mut dyn Write, + timelineid: ZTimelineId, + snapshotlsn: u64, +) -> Result<(), std::io::Error> { let mut ar = Builder::new(write); let snappath = format!("timelines/{}/snapshots/{:016X}", timelineid, snapshotlsn); @@ -27,12 +30,15 @@ pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, sna } if entry.file_type().is_dir() { - trace!("sending dir {} as {}", fullpath.display(), relpath.display()); + trace!( + "sending dir {} as {}", + fullpath.display(), + relpath.display() + ); ar.append_dir(relpath, fullpath)?; } else if entry.file_type().is_symlink() { error!("ignoring symlink in snapshot dir"); } else if entry.file_type().is_file() { - // Shared catalogs are exempt if relpath.starts_with("global/") { trace!("sending shared catalog {}", relpath.display()); @@ -61,7 +67,9 @@ pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, sna } let archive_fname = relpath.to_str().unwrap().clone(); - let archive_fname = archive_fname.strip_suffix(".partial").unwrap_or(&archive_fname); + let archive_fname = archive_fname + .strip_suffix(".partial") + .unwrap_or(&archive_fname); let archive_path = "pg_wal/".to_owned() + archive_fname; ar.append_path_with_name(fullpath, archive_path)?; } @@ -71,14 +79,12 @@ pub fn send_snapshot_tarball(write: &mut dyn Write, timelineid: ZTimelineId, sna Ok(()) } - // formats: // // _ // . // _. - #[derive(Debug)] struct FilePathError { msg: String, @@ -145,7 +151,6 @@ fn parse_filename(fname: &str) -> Result<(u32, u32, u32), FilePathError> { return Ok((relnode, forknum, segno)); } - fn parse_rel_file_path(path: &str) -> Result<(), FilePathError> { /* * Relation data files can be in one of the following directories: diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 10336d84f5..f8dfc32c5e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -4,11 +4,11 @@ use log::*; use std::fs; +use std::fs::{File, OpenOptions}; use std::io; +use std::path::PathBuf; use std::process::exit; use std::thread; -use std::fs::{File, OpenOptions}; -use std::path::PathBuf; use anyhow::{Context, Result}; use clap::{App, Arg}; @@ -32,27 +32,33 @@ fn zenith_repo_dir() -> String { fn main() -> Result<()> { let arg_matches = App::new("Zenith page server") .about("Materializes WAL stream to pages and serves them to the postgres") - .arg(Arg::with_name("listen") - .short("l") - .long("listen") - .takes_value(true) - .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)")) - .arg(Arg::with_name("interactive") - .short("i") - .long("interactive") - .takes_value(false) - .help("Interactive mode")) - .arg(Arg::with_name("daemonize") - .short("d") - .long("daemonize") - .takes_value(false) - .help("Run in the background")) + .arg( + Arg::with_name("listen") + .short("l") + .long("listen") + .takes_value(true) + .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)"), + ) + .arg( + Arg::with_name("interactive") + .short("i") + .long("interactive") + .takes_value(false) + .help("Interactive mode"), + ) + .arg( + Arg::with_name("daemonize") + .short("d") + .long("daemonize") + .takes_value(false) + .help("Run in the background"), + ) .get_matches(); let mut conf = PageServerConf { daemonize: false, interactive: false, - listen_addr: "127.0.0.1:5430".parse().unwrap() + listen_addr: "127.0.0.1:5430".parse().unwrap(), }; if arg_matches.is_present("daemonize") { @@ -128,9 +134,7 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { Ok(_) => info!("Success, daemonized"), Err(e) => error!("Error, {}", e), } - } - else - { + } else { // change into the repository directory. In daemon mode, Daemonize // does this for us. let repodir = zenith_repo_dir(); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index c738f90f41..4c344749f5 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,7 +1,8 @@ +use std::fmt; use std::net::SocketAddr; use std::str::FromStr; -use std::fmt; +pub mod basebackup; pub mod page_cache; pub mod page_service; pub mod pg_constants; @@ -12,7 +13,6 @@ mod tui_logger; pub mod waldecoder; pub mod walreceiver; pub mod walredo; -pub mod basebackup; #[derive(Debug, Clone)] pub struct PageServerConf { @@ -35,7 +35,6 @@ impl FromStr for ZTimelineId { buf.copy_from_slice(timelineid.as_slice()); Ok(ZTimelineId(buf)) } - } impl ZTimelineId { @@ -55,8 +54,7 @@ impl ZTimelineId { } impl fmt::Display for ZTimelineId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&hex::encode(self.0)) + f.write_str(&hex::encode(self.0)) } } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index edeba3b21f..c25e5cadb5 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -6,9 +6,9 @@ // per-entry mutex. // -use crate::{walredo, PageServerConf}; use crate::restore_local_repo::restore_timeline; use crate::ZTimelineId; +use crate::{walredo, PageServerConf}; use anyhow::bail; use bytes::Bytes; use core::ops::Bound::Included; @@ -109,7 +109,8 @@ struct PageCacheShared { } lazy_static! { - pub static ref PAGECACHES: Mutex>> = Mutex::new(HashMap::new()); + pub static ref PAGECACHES: Mutex>> = + Mutex::new(HashMap::new()); } // Get Page Cache for given timeline. It is assumed to already exist. @@ -118,11 +119,14 @@ pub fn get_pagecache(_conf: &PageServerConf, timelineid: ZTimelineId) -> Option< match pcaches.get(&timelineid) { Some(pcache) => Some(pcache.clone()), - None => None + None => None, } } -pub fn get_or_restore_pagecache(conf: &PageServerConf, timelineid: ZTimelineId) -> anyhow::Result> { +pub fn get_or_restore_pagecache( + conf: &PageServerConf, + timelineid: ZTimelineId, +) -> anyhow::Result> { let mut pcaches = PAGECACHES.lock().unwrap(); match pcaches.get(&timelineid) { @@ -475,8 +479,11 @@ impl PageCache { self.num_entries.fetch_add(1, Ordering::Relaxed); if !oldentry.is_none() { - error!("overwriting WAL record with LSN {:X}/{:X} in page cache", - lsn >> 32, lsn & 0xffffffff); + error!( + "overwriting WAL record with LSN {:X}/{:X} in page cache", + lsn >> 32, + lsn & 0xffffffff + ); } self.num_wal_records.fetch_add(1, Ordering::Relaxed); @@ -511,14 +518,18 @@ impl PageCache { // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { - shared.last_valid_lsn = lsn; self.valid_lsn_condvar.notify_all(); self.last_valid_lsn.store(lsn, Ordering::Relaxed); } else { - warn!("attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", - oldlsn >> 32, oldlsn & 0xffffffff, lsn >> 32, lsn & 0xffffffff); + warn!( + "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", + oldlsn >> 32, + oldlsn & 0xffffffff, + lsn >> 32, + lsn & 0xffffffff + ); } } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 6dac213be6..c23537233d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -13,26 +13,25 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; +use regex::Regex; use std::io; -use std::thread; use std::str::FromStr; use std::sync::Arc; -use regex::Regex; +use std::thread; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufWriter}; use tokio::net::{TcpListener, TcpStream}; use tokio::runtime; use tokio::runtime::Runtime; -use tokio::task; use tokio::sync::mpsc; +use tokio::task; +use crate::basebackup; use crate::page_cache; use crate::restore_local_repo; -use crate::basebackup; use crate::walreceiver; use crate::PageServerConf; use crate::ZTimelineId; - type Result = std::result::Result; #[derive(Debug)] @@ -172,8 +171,7 @@ struct FeParseMessage { query_string: Bytes, } -fn read_null_terminated(buf: &mut Bytes) -> Result -{ +fn read_null_terminated(buf: &mut Bytes) -> Result { let mut result = BytesMut::new(); loop { @@ -221,15 +219,14 @@ impl FeParseMessage { )); } - - Ok(FeMessage::Parse(FeParseMessage {query_string})) + Ok(FeMessage::Parse(FeParseMessage { query_string })) } } #[derive(Debug)] struct FeDescribeMessage { - kind: u8, // 'S' to describe a prepared statement; or 'P' to describe a portal. - // we only support unnamed prepared stmt or portal + kind: u8, // 'S' to describe a prepared statement; or 'P' to describe a portal. + // we only support unnamed prepared stmt or portal } impl FeDescribeMessage { @@ -255,7 +252,7 @@ impl FeDescribeMessage { )); } - Ok(FeMessage::Describe(FeDescribeMessage {kind})) + Ok(FeMessage::Describe(FeDescribeMessage { kind })) } } @@ -263,7 +260,7 @@ impl FeDescribeMessage { #[derive(Debug)] struct FeExecuteMessage { /// max # of rows - maxrows: i32 + maxrows: i32, } impl FeExecuteMessage { @@ -286,14 +283,13 @@ impl FeExecuteMessage { )); } - Ok(FeMessage::Execute(FeExecuteMessage {maxrows})) + Ok(FeMessage::Execute(FeExecuteMessage { maxrows })) } } // we only support unnamed prepared stmt and portal #[derive(Debug)] -struct FeBindMessage { -} +struct FeBindMessage {} impl FeBindMessage { pub fn parse(body: Bytes) -> Result { @@ -324,8 +320,7 @@ impl FeBindMessage { // we only support unnamed prepared stmt and portal #[derive(Debug)] -struct FeCloseMessage { -} +struct FeCloseMessage {} impl FeCloseMessage { pub fn parse(body: Bytes) -> Result { @@ -370,9 +365,7 @@ impl FeMessage { let mut body = body.freeze(); match tag { - b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { - body: body, - }))), + b'Q' => Ok(Some(FeMessage::Query(FeQueryMessage { body: body }))), b'P' => Ok(Some(FeParseMessage::parse(body)?)), b'D' => Ok(Some(FeDescribeMessage::parse(body)?)), b'E' => Ok(Some(FeExecuteMessage::parse(body)?)), @@ -634,7 +627,6 @@ impl Connection { } async fn run(&mut self) -> Result<()> { - let mut unnamed_query_string = Bytes::new(); loop { let msg = self.read_message().await?; @@ -666,7 +658,8 @@ impl Connection { self.write_message(&BeMessage::ParseComplete).await?; } Some(FeMessage::Describe(_)) => { - self.write_message_noflush(&BeMessage::ParameterDescription).await?; + self.write_message_noflush(&BeMessage::ParameterDescription) + .await?; self.write_message(&BeMessage::NoData).await?; } Some(FeMessage::Bind(_)) => { @@ -724,10 +717,13 @@ impl Connection { // Check that the timeline exists self.handle_basebackup_request(timelineid).await?; - self.write_message_noflush(&BeMessage::CommandComplete).await?; + self.write_message_noflush(&BeMessage::CommandComplete) + .await?; self.write_message(&BeMessage::ReadyForQuery).await } else if query_string.starts_with(b"callmemaybe ") { - let query_str = String::from_utf8(query_string.to_vec()).unwrap().to_string(); + let query_str = String::from_utf8(query_string.to_vec()) + .unwrap() + .to_string(); // callmemaybe let re = Regex::new(r"^callmemaybe ([[:xdigit:]]+) (.*)$").unwrap(); @@ -777,7 +773,6 @@ impl Connection { } async fn handle_pagerequests(&mut self, timelineid: ZTimelineId) -> Result<()> { - // Check that the timeline exists let pcache = page_cache::get_or_restore_pagecache(&self.conf, timelineid); if pcache.is_err() { @@ -954,7 +949,7 @@ impl Connection { if joinres.is_err() { return Err(io::Error::new( io::ErrorKind::InvalidData, - joinres.unwrap_err() + joinres.unwrap_err(), )); } return joinres.unwrap(); @@ -1002,7 +997,6 @@ struct CopyDataSink(mpsc::Sender); impl std::io::Write for CopyDataSink { fn write(&mut self, data: &[u8]) -> std::result::Result { - let buf = Bytes::copy_from_slice(data); if let Err(e) = self.0.blocking_send(buf) { diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 279f13f848..262479a556 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -14,6 +14,7 @@ use log::*; use regex::Regex; use std::fmt; +use std::cmp::max; use std::error::Error; use std::fs; use std::fs::File; @@ -21,19 +22,17 @@ use std::io::Read; use std::io::Seek; use std::io::SeekFrom; use std::path::{Path, PathBuf}; -use std::cmp::max; use anyhow::Result; use bytes::Bytes; use crate::page_cache; -use crate::page_cache::PageCache; -use crate:: PageServerConf; use crate::page_cache::BufferTag; +use crate::page_cache::PageCache; use crate::waldecoder::WalStreamDecoder; +use crate::PageServerConf; use crate::ZTimelineId; - // From pg_tablespace_d.h // // FIXME: we'll probably need these elsewhere too, move to some common location @@ -43,8 +42,11 @@ const GLOBALTABLESPACE_OID: u32 = 1664; // // Load it all into the page cache. // -pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId) -> Result<()> { - +pub fn restore_timeline( + conf: &PageServerConf, + pcache: &PageCache, + timeline: ZTimelineId, +) -> Result<()> { let timelinepath = PathBuf::from("timelines").join(timeline.to_string()); if !timelinepath.exists() { @@ -52,7 +54,9 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi } // Scan .zenith/timelines//snapshots - let snapshotspath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots"); + let snapshotspath = PathBuf::from("timelines") + .join(timeline.to_string()) + .join("snapshots"); let mut last_snapshot_lsn: u64 = 0; @@ -68,7 +72,10 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi } if last_snapshot_lsn == 0 { - error!("could not find valid snapshot in {}", snapshotspath.display()); + error!( + "could not find valid snapshot in {}", + snapshotspath.display() + ); // TODO return error? } pcache.init_valid_lsn(last_snapshot_lsn); @@ -79,7 +86,6 @@ pub fn restore_timeline(conf: &PageServerConf, pcache: &PageCache, timeline: ZTi } pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Result { - let snapshotspath = format!("timelines/{}/snapshots", timeline); let mut last_snapshot_lsn = 0; @@ -97,9 +103,16 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re Ok(last_snapshot_lsn) } -fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, snapshot: &str) -> Result<()> { - - let snapshotpath = PathBuf::from("timelines").join(timeline.to_string()).join("snapshots").join(snapshot); +fn restore_snapshot( + conf: &PageServerConf, + pcache: &PageCache, + timeline: ZTimelineId, + snapshot: &str, +) -> Result<()> { + let snapshotpath = PathBuf::from("timelines") + .join(timeline.to_string()) + .join("snapshots") + .join(snapshot); // Scan 'global' for direntry in fs::read_dir(snapshotpath.join("global"))? { @@ -112,7 +125,15 @@ fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimeli Some("pg_filenode.map") => continue, // Load any relation files into the page server - _ => restore_relfile(conf, pcache, timeline, snapshot, GLOBALTABLESPACE_OID, 0, &direntry.path())?, + _ => restore_relfile( + conf, + pcache, + timeline, + snapshot, + GLOBALTABLESPACE_OID, + 0, + &direntry.path(), + )?, } } @@ -133,7 +154,15 @@ fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimeli Some("pg_filenode.map") => continue, // Load any relation files into the page server - _ => restore_relfile(conf, pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, dboid, &direntry.path())?, + _ => restore_relfile( + conf, + pcache, + timeline, + snapshot, + DEFAULTTABLESPACE_OID, + dboid, + &direntry.path(), + )?, } } } @@ -143,8 +172,15 @@ fn restore_snapshot(conf: &PageServerConf, pcache: &PageCache, timeline: ZTimeli Ok(()) } -fn restore_relfile(_conf: &PageServerConf, pcache: &PageCache, _timeline: ZTimelineId, snapshot: &str, spcoid: u32, dboid: u32, path: &Path) -> Result<()> { - +fn restore_relfile( + _conf: &PageServerConf, + pcache: &PageCache, + _timeline: ZTimelineId, + snapshot: &str, + spcoid: u32, + dboid: u32, + path: &Path, +) -> Result<()> { let lsn = u64::from_str_radix(snapshot, 16)?; // Does it look like a relation file? @@ -187,12 +223,12 @@ fn restore_relfile(_conf: &PageServerConf, pcache: &PageCache, _timeline: ZTimel // reached EOF. That's expected. // FIXME: maybe check that we read the full length of the file? break; - }, + } _ => { error!("error reading file: {:?} ({})", path, e); break; } - } + }, }; blknum += 1; } @@ -210,7 +246,12 @@ fn restore_relfile(_conf: &PageServerConf, pcache: &PageCache, _timeline: ZTimel // Scan WAL on a timeline, starting from gien LSN, and load all the records // into the page cache. -fn restore_wal(_conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, startpoint: u64) -> Result<()> { +fn restore_wal( + _conf: &PageServerConf, + pcache: &PageCache, + timeline: ZTimelineId, + startpoint: u64, +) -> Result<()> { let walpath = format!("timelines/{}/wal", timeline); let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); @@ -259,8 +300,7 @@ fn restore_wal(_conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId break; } if let Some((lsn, recdata)) = rec.unwrap() { - let decoded = - crate::waldecoder::decode_wal_record(recdata.clone()); + let decoded = crate::waldecoder::decode_wal_record(recdata.clone()); // Put the WAL record to the page cache. We make a separate copy of // it for every block it modifies. (The actual WAL record is kept in @@ -299,7 +339,11 @@ fn restore_wal(_conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId segno += 1; offset = 0; } - info!("reached end of WAL at {:X}/{:X}", last_lsn >> 32, last_lsn & 0xffffffff); + info!( + "reached end of WAL at {:X}/{:X}", + last_lsn >> 32, + last_lsn & 0xffffffff + ); Ok(()) } @@ -320,7 +364,6 @@ pub fn XLByteToSeg(xlogptr: XLogRecPtr, wal_segsz_bytes: usize) -> XLogSegNo { return xlogptr / wal_segsz_bytes as u64; } - #[allow(non_snake_case)] pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize) -> String { return format!( @@ -358,7 +401,6 @@ pub fn IsPartialXLogFileName(fname: &str) -> bool { } } - #[derive(Debug, Clone)] struct FilePathError { msg: String, @@ -446,4 +488,3 @@ fn parse_relfilename(fname: &str) -> Result<(u32, u32, u32), FilePathError> { return Ok((relnode, forknum, segno)); } - diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 8172b875da..b1daeaceae 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -50,12 +50,11 @@ pub struct WalStreamDecoder { recordbuf: BytesMut, } - #[derive(Error, Debug, Clone)] #[error("{msg} at {lsn}")] pub struct WalDecodeError { msg: String, - lsn: u64 + lsn: u64, } // @@ -100,7 +99,10 @@ impl WalStreamDecoder { let hdr = self.decode_XLogLongPageHeaderData(); if hdr.std.xlp_pageaddr != self.lsn { - return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn }); + return Err(WalDecodeError { + msg: "invalid xlog segment header".into(), + lsn: self.lsn, + }); } // TODO: verify the remaining fields in the header @@ -115,7 +117,10 @@ impl WalStreamDecoder { let hdr = self.decode_XLogPageHeaderData(); if hdr.xlp_pageaddr != self.lsn { - return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn }); + return Err(WalDecodeError { + msg: "invalid xlog page header".into(), + lsn: self.lsn, + }); } // TODO: verify the remaining fields in the header @@ -141,7 +146,10 @@ impl WalStreamDecoder { self.startlsn = self.lsn; let xl_tot_len = self.inputbuf.get_u32_le(); if xl_tot_len < SizeOfXLogRecord { - return Err(WalDecodeError {msg: format!("invalid xl_tot_len {}", xl_tot_len), lsn: self.lsn }); + return Err(WalDecodeError { + msg: format!("invalid xl_tot_len {}", xl_tot_len), + lsn: self.lsn, + }); } self.lsn += 4; diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 23af8c2ee3..99c4142232 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -19,7 +19,7 @@ use postgres_types::PgLsn; use std::collections::HashMap; use std::fs; use std::fs::{File, OpenOptions}; -use std::io::{Write, Seek, SeekFrom}; +use std::io::{Seek, SeekFrom, Write}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; @@ -38,11 +38,16 @@ struct WalReceiverEntry { } lazy_static! { - static ref WAL_RECEIVERS: Mutex> = Mutex::new(HashMap::new()); + static ref WAL_RECEIVERS: Mutex> = + Mutex::new(HashMap::new()); } // Launch a new WAL receiver, or tell one that's running about change in connection string -pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str) { +pub fn launch_wal_receiver( + conf: &PageServerConf, + timelineid: ZTimelineId, + wal_producer_connstr: &str, +) { let mut receivers = WAL_RECEIVERS.lock().unwrap(); match receivers.get_mut(&timelineid) { @@ -50,7 +55,9 @@ pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_p receiver.wal_producer_connstr = wal_producer_connstr.into(); } None => { - let receiver = WalReceiverEntry { wal_producer_connstr: wal_producer_connstr.into() }; + let receiver = WalReceiverEntry { + wal_producer_connstr: wal_producer_connstr.into(), + }; receivers.insert(timelineid, receiver); // Also launch a new thread to handle this connection @@ -59,7 +66,8 @@ pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_p .name("WAL receiver thread".into()) .spawn(move || { thread_main(&conf_copy, timelineid); - }).unwrap(); + }) + .unwrap(); } }; } @@ -68,14 +76,21 @@ pub fn launch_wal_receiver(conf: &PageServerConf, timelineid: ZTimelineId, wal_p fn get_wal_producer_connstr(timelineid: ZTimelineId) -> String { let receivers = WAL_RECEIVERS.lock().unwrap(); - receivers.get(&timelineid).unwrap().wal_producer_connstr.clone() + receivers + .get(&timelineid) + .unwrap() + .wal_producer_connstr + .clone() } // // This is the entry point for the WAL receiver thread. // fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { - info!("WAL receiver thread started for timeline : '{}'", timelineid); + info!( + "WAL receiver thread started for timeline : '{}'", + timelineid + ); let runtime = runtime::Builder::new_current_thread() .enable_all() @@ -100,7 +115,11 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { }); } -async fn walreceiver_main(conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str) -> Result<(), Error> { +async fn walreceiver_main( + conf: &PageServerConf, + timelineid: ZTimelineId, + wal_producer_connstr: &str, +) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); let connect_cfg = format!("{} replication=true", wal_producer_connstr); @@ -174,10 +193,12 @@ async fn walreceiver_main(conf: &PageServerConf, timelineid: ZTimelineId, wal_pr let startlsn = xlog_data.wal_start(); let endlsn = startlsn + data.len() as u64; - write_wal_file(startlsn, - timelineid, - 16 * 1024 * 1024, // FIXME - data)?; + write_wal_file( + startlsn, + timelineid, + 16 * 1024 * 1024, // FIXME + data, + )?; trace!( "received XLogData between {:X}/{:X} and {:X}/{:X}", @@ -376,7 +397,6 @@ pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLin return (log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli); } - fn write_wal_file( startpos: XLogRecPtr, timeline: ZTimelineId, @@ -409,12 +429,13 @@ fn write_wal_file( /* Open file */ let segno = XLByteToSeg(start_pos, wal_seg_size); - let wal_file_name = XLogFileName(1, // FIXME: always use Postgres timeline 1 - segno, wal_seg_size); - let wal_file_path = wal_dir - .join(wal_file_name.clone()); - let wal_file_partial_path = wal_dir - .join(wal_file_name.clone() + ".partial"); + let wal_file_name = XLogFileName( + 1, // FIXME: always use Postgres timeline 1 + segno, + wal_seg_size, + ); + let wal_file_path = wal_dir.join(wal_file_name.clone()); + let wal_file_partial_path = wal_dir.join(wal_file_name.clone() + ".partial"); { let mut wal_file: File; @@ -422,8 +443,7 @@ fn write_wal_file( if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { wal_file = file; partial = false; - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) - { + } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { /* Try to open existed partial file */ wal_file = file; partial = true; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9b0010a1be..06ac25286b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -19,10 +19,10 @@ use std::assert; use std::cell::RefCell; use std::fs; use std::io::Error; +use std::process::Stdio; use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use std::process::Stdio; use tokio::io::AsyncBufReadExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; @@ -34,8 +34,8 @@ use bytes::{BufMut, Bytes, BytesMut}; use crate::page_cache; use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; -use crate::{page_cache::BufferTag, PageServerConf}; use crate::ZTimelineId; +use crate::{page_cache::BufferTag, PageServerConf}; static TIMEOUT: Duration = Duration::from_secs(20); diff --git a/postgres_ffi/build.rs b/postgres_ffi/build.rs index 97b3392b3e..dc3e1509c0 100644 --- a/postgres_ffi/build.rs +++ b/postgres_ffi/build.rs @@ -17,21 +17,18 @@ fn main() { // Tell cargo to invalidate the built crate whenever any of the // included header files changed. .parse_callbacks(Box::new(bindgen::CargoCallbacks)) - .whitelist_type("ControlFileData") .whitelist_var("PG_CONTROL_FILE_SIZE") .whitelist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC") .whitelist_type("DBState") - - // Path the server include dir. It is in tmp_install/include/server, if you did - // "configure --prefix=". But if you used "configure --prefix=/", - // and used DESTDIR to move it into tmp_install, then it's in - // tmp_install/include/postgres/server (that's how the pgbuild.sh script does it). - // 'pg_config --includedir-server' would perhaps be the more proper way to find it, - // but this will do for now. + // Path the server include dir. It is in tmp_install/include/server, if you did + // "configure --prefix=". But if you used "configure --prefix=/", + // and used DESTDIR to move it into tmp_install, then it's in + // tmp_install/include/postgres/server (that's how the pgbuild.sh script does it). + // 'pg_config --includedir-server' would perhaps be the more proper way to find it, + // but this will do for now. .clang_arg("-I../tmp_install/include/server") .clang_arg("-I../tmp_install/include/postgresql/server") - // Finish the builder and generate the bindings. .generate() // Unwrap the Result and panic on failure. diff --git a/postgres_ffi/src/lib.rs b/postgres_ffi/src/lib.rs index b62114ea7d..b6cf6bdb2b 100644 --- a/postgres_ffi/src/lib.rs +++ b/postgres_ffi/src/lib.rs @@ -10,22 +10,19 @@ const SIZEOF_CONTROLDATA: usize = std::mem::size_of::(); const OFFSETOF_CRC: usize = PG_CONTROLFILEDATA_OFFSETOF_CRC as usize; impl ControlFileData { - // Initialize an all-zeros ControlFileData struct pub fn new() -> ControlFileData { let controlfile: ControlFileData; let b = [0u8; SIZEOF_CONTROLDATA]; - controlfile = unsafe { - std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) - }; + controlfile = + unsafe { std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) }; return controlfile; } } pub fn decode_pg_control(buf: Bytes) -> Result { - let mut b: [u8; SIZEOF_CONTROLDATA] = [0u8; SIZEOF_CONTROLDATA]; buf.clone().copy_to_slice(&mut b); @@ -36,25 +33,23 @@ pub fn decode_pg_control(buf: Bytes) -> Result { data_without_crc.copy_from_slice(&b[0..OFFSETOF_CRC]); let expectedcrc = crc32c::crc32c(&data_without_crc); - controlfile = unsafe { - std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) - }; + controlfile = unsafe { std::mem::transmute::<[u8; SIZEOF_CONTROLDATA], ControlFileData>(b) }; if expectedcrc != controlfile.crc { - anyhow::bail!("invalid CRC in control file: expected {:08X}, was {:08X}", - expectedcrc, controlfile.crc); + anyhow::bail!( + "invalid CRC in control file: expected {:08X}, was {:08X}", + expectedcrc, + controlfile.crc + ); } Ok(controlfile) } pub fn encode_pg_control(controlfile: ControlFileData) -> Bytes { - let b: [u8; SIZEOF_CONTROLDATA]; - b = unsafe { - std::mem::transmute::(controlfile) - }; + b = unsafe { std::mem::transmute::(controlfile) }; // Recompute the CRC let mut data_without_crc: [u8; OFFSETOF_CRC] = [0u8; OFFSETOF_CRC]; diff --git a/walkeeper/src/bin/wal_acceptor.rs b/walkeeper/src/bin/wal_acceptor.rs index fcc475826d..8dfa31e23b 100644 --- a/walkeeper/src/bin/wal_acceptor.rs +++ b/walkeeper/src/bin/wal_acceptor.rs @@ -9,8 +9,8 @@ use std::path::PathBuf; use std::thread; use std::{fs::File, fs::OpenOptions}; -use clap::{App, Arg}; use anyhow::Result; +use clap::{App, Arg}; use slog::Drain; diff --git a/walkeeper/src/wal_service.rs b/walkeeper/src/wal_service.rs index 54e28cda5b..74e0f1d3b7 100644 --- a/walkeeper/src/wal_service.rs +++ b/walkeeper/src/wal_service.rs @@ -354,7 +354,8 @@ impl Serializer for SafeKeeperResponse { } lazy_static! { - pub static ref TIMELINES: Mutex>> = Mutex::new(HashMap::new()); + pub static ref TIMELINES: Mutex>> = + Mutex::new(HashMap::new()); } pub fn thread_main(conf: WalAcceptorConf) { @@ -450,11 +451,13 @@ impl Timeline { // Load and lock control file (prevent running more than one instance of safekeeper) fn load_control_file(&self, conf: &WalAcceptorConf) -> Result<()> { - let mut shared_state = self.mutex.lock().unwrap(); if shared_state.control_file.is_some() { - info!("control file for timeline {} is already open", self.timelineid); + info!( + "control file for timeline {} is already open", + self.timelineid + ); return Ok(()); } @@ -476,7 +479,8 @@ impl Timeline { Err(e) => { io_error!( "Control file {:?} is locked by some other process: {}", - &control_file_path, e + &control_file_path, + e ); } } @@ -501,7 +505,8 @@ impl Timeline { if my_info.format_version != SK_FORMAT_VERSION { io_error!( "Incompatible format version: {} vs. {}", - my_info.format_version, SK_FORMAT_VERSION + my_info.format_version, + SK_FORMAT_VERSION ); } shared_state.info = my_info; @@ -583,7 +588,10 @@ impl Connection { self.conf.listen_addr.port(), self.timeline().timelineid ); - info!("requesting page server to connect to us: start {} {}", ps_connstr, callme); + info!( + "requesting page server to connect to us: start {} {}", + ps_connstr, callme + ); let (client, connection) = connect(&ps_connstr, NoTls).await?; // The connection object performs the actual communication with the database, @@ -716,8 +724,14 @@ impl Connection { let rec_size = (end_pos - start_pos) as usize; assert!(rec_size <= MAX_SEND_SIZE); - debug!("received for {} bytes between {:X}/{:X} and {:X}/{:X}", - rec_size, start_pos >> 32, start_pos & 0xffffffff, end_pos >> 32, end_pos & 0xffffffff); + debug!( + "received for {} bytes between {:X}/{:X} and {:X}/{:X}", + rec_size, + start_pos >> 32, + start_pos & 0xffffffff, + end_pos >> 32, + end_pos & 0xffffffff + ); /* Receive message body */ self.inbuf.resize(rec_size, 0u8); @@ -1054,8 +1068,11 @@ impl Connection { self.stream.write_all(&self.outbuf[0..msg_size]).await?; start_pos += send_size as u64; - debug!("Sent WAL to page server up to {:X}/{:>08X}", - (end_pos>>32) as u32, end_pos as u32); + debug!( + "Sent WAL to page server up to {:X}/{:>08X}", + (end_pos >> 32) as u32, + end_pos as u32 + ); if XLogSegmentOffset(start_pos, wal_seg_size) != 0 { wal_file = Some(file); diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 1d0b5b73d4..53d1528a6b 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -3,13 +3,13 @@ use std::path::{Path, PathBuf}; use std::process::exit; use std::str::FromStr; -use clap::{App, Arg, ArgMatches, SubCommand}; use anyhow::Result; use anyhow::{anyhow, bail}; +use clap::{App, Arg, ArgMatches, SubCommand}; -use control_plane::{compute::ComputeControlPlane, local_env, storage}; use control_plane::local_env::LocalEnv; use control_plane::storage::PageServerNode; +use control_plane::{compute::ComputeControlPlane, local_env, storage}; use pageserver::ZTimelineId; @@ -34,34 +34,32 @@ fn main() -> Result<()> { .required(true); let matches = App::new("zenith") .about("Zenith CLI") - .subcommand(SubCommand::with_name("init") - .about("Initialize a new Zenith repository in current directory")) - .subcommand(SubCommand::with_name("branch") - .about("Create a new branch") - .arg(Arg::with_name("branchname") - .required(false) - .index(1)) - .arg(Arg::with_name("start-point") - .required(false) - .index(2))) + .subcommand( + SubCommand::with_name("init") + .about("Initialize a new Zenith repository in current directory"), + ) + .subcommand( + SubCommand::with_name("branch") + .about("Create a new branch") + .arg(Arg::with_name("branchname").required(false).index(1)) + .arg(Arg::with_name("start-point").required(false).index(2)), + ) .subcommand( SubCommand::with_name("pageserver") .about("Manage pageserver instance") .subcommand(SubCommand::with_name("status")) .subcommand(SubCommand::with_name("start")) - .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("stop")), ) .subcommand( SubCommand::with_name("pg") .about("Manage postgres instances") .subcommand( SubCommand::with_name("create") - // .arg(name_arg.clone() - // .required(false) - // .help("name of this postgres instance (will be pgN if omitted)")) - .arg(Arg::with_name("timeline") - .required(false) - .index(1)) + // .arg(name_arg.clone() + // .required(false) + // .help("name of this postgres instance (will be pgN if omitted)")) + .arg(Arg::with_name("timeline").required(false).index(1)), ) .subcommand(SubCommand::with_name("list")) .subcommand(SubCommand::with_name("start").arg(name_arg.clone())) @@ -80,9 +78,11 @@ fn main() -> Result<()> { let repopath = PathBuf::from(zenith_repo_dir()); if !repopath.exists() { - bail!("Zenith repository does not exists in {}.\n\ + bail!( + "Zenith repository does not exists in {}.\n\ Set ZENITH_REPO_DIR or initialize a new repository with 'zenith init'", - repopath.display()); + repopath.display() + ); } // TODO: check that it looks like a zenith repository let env = match local_env::load_config(&repopath) { @@ -204,7 +204,6 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { Ok(()) } - // "zenith init" - Initialize a new Zenith repository in current dir fn run_init_cmd(_args: ArgMatches) -> Result<()> { local_env::init()?; @@ -221,20 +220,22 @@ fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> { } if let Some(startpoint_str) = args.value_of("start-point") { - let mut startpoint = parse_point_in_time(startpoint_str)?; if startpoint.lsn == 0 { // Find end of WAL on the old timeline let end_of_wal = local_env::find_end_of_wal(local_env, startpoint.timelineid)?; - println!("branching at end of WAL: {:X}/{:X}", end_of_wal >> 32, end_of_wal & 0xffffffff); + println!( + "branching at end of WAL: {:X}/{:X}", + end_of_wal >> 32, + end_of_wal & 0xffffffff + ); startpoint.lsn = end_of_wal; } return local_env::create_branch(local_env, branchname, startpoint); - } else { panic!("Missing start-point"); } @@ -276,18 +277,22 @@ fn list_branches() -> Result<()> { // // fn parse_point_in_time(s: &str) -> Result { - let mut strings = s.split("@"); let name = strings.next().unwrap(); let lsn: Option; if let Some(lsnstr) = strings.next() { let mut s = lsnstr.split("/"); - let lsn_hi: u64 = s.next().ok_or(anyhow!("invalid LSN in point-in-time specification"))?.parse()?; - let lsn_lo: u64 = s.next().ok_or(anyhow!("invalid LSN in point-in-time specification"))?.parse()?; + let lsn_hi: u64 = s + .next() + .ok_or(anyhow!("invalid LSN in point-in-time specification"))? + .parse()?; + let lsn_lo: u64 = s + .next() + .ok_or(anyhow!("invalid LSN in point-in-time specification"))? + .parse()?; lsn = Some(lsn_hi << 32 | lsn_lo); - } - else { + } else { lsn = None } @@ -321,7 +326,7 @@ fn parse_point_in_time(s: &str) -> Result { if tlipath.exists() { let result = local_env::PointInTime { timelineid: ZTimelineId::from_str(name)?, - lsn: lsn.unwrap_or(0) + lsn: lsn.unwrap_or(0), }; return Ok(result); diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 3b833f4c2a..2d86ad041f 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -1,3 +1,2 @@ //! zenith_utils is intended to be a place to put code that is shared //! between other crates in this repository. -