diff --git a/.circleci/config.yml b/.circleci/config.yml index 6010afb28f..73fee09c27 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -141,7 +141,7 @@ jobs: working_directory: test_runner environment: - ZENITH_BIN: /tmp/zenith/bin - - POSTGRES_BIN: /tmp/zenith/pg_install + - POSTGRES_DISTRIB_DIR: /tmp/zenith/pg_install - TEST_OUTPUT: /tmp/test_output command: | TEST_FILE="<< parameters.test_file >>" diff --git a/Cargo.lock b/Cargo.lock index 5d4a25dc14..134f1b0c1b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -263,8 +263,6 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", - "fs_extra", - "hex", "lazy_static", "nix", "pageserver", @@ -273,9 +271,10 @@ dependencies = [ "rand", "regex", "serde", + "serde_json", "tar", - "thiserror", "toml", + "url", "walkeeper", "workspace_hack", "zenith_utils", @@ -788,8 +787,10 @@ dependencies = [ name = "integration_tests" version = "0.1.0" dependencies = [ + "anyhow", "control_plane", "lazy_static", + "nix", "pageserver", "postgres", "rand", @@ -1169,6 +1170,7 @@ dependencies = [ "clap", "crc32c", "daemonize", + "fs_extra", "futures", "hex", "lazy_static", @@ -2190,9 +2192,9 @@ checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" [[package]] name = "url" -version = "2.2.1" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b" +checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" dependencies = [ "form_urlencoded", "idna", diff --git a/control_plane/Cargo.toml b/control_plane/Cargo.toml index 14cb41e2b7..e82045dfe5 100644 --- a/control_plane/Cargo.toml +++ b/control_plane/Cargo.toml @@ -11,15 +11,17 @@ rand = "0.8.3" tar = "0.4.33" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } serde = { version = "1.0", features = ["derive"] } +serde_json = "1" toml = "0.5" lazy_static = "1.4" regex = "1" anyhow = "1.0" -hex = "0.4.3" +# hex = "0.4.3" bytes = "1.0.1" -fs_extra = "1.2.0" +# fs_extra = "1.2.0" nix = "0.20" -thiserror = "1" +# thiserror = "1" +url = "2.2.2" pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 4497ae2543..65fbe8b72d 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -1,23 +1,24 @@ -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Write}; +use std::io::Write; use std::net::SocketAddr; use std::net::TcpStream; use std::os::unix::fs::PermissionsExt; -use std::path::Path; -use std::process::{Command, ExitStatus}; +use std::process::Command; use std::sync::Arc; use std::time::Duration; use std::{collections::BTreeMap, path::PathBuf}; +use std::{ + fs::{self, OpenOptions}, + io::Read, +}; use anyhow::{Context, Result}; use lazy_static::lazy_static; use regex::Regex; -use postgres::{Client, NoTls}; - use crate::local_env::LocalEnv; -use crate::storage::{PageServerNode, WalProposerNode}; -use pageserver::{zenith_repo_dir, ZTimelineId}; +use pageserver::ZTimelineId; + +use crate::storage::PageServerNode; // // ComputeControlPlane @@ -36,8 +37,8 @@ impl ComputeControlPlane { // it is running on default port. Change that when pageserver will have config. let pageserver = Arc::new(PageServerNode::from_env(&env)); - let pgdatadirspath = env.repo_path.join("pgdatadirs"); - let nodes: Result> = fs::read_dir(&pgdatadirspath) + let pgdatadirspath = &env.pg_data_dirs_path(); + let nodes: Result> = fs::read_dir(pgdatadirspath) .with_context(|| format!("failed to list {}", pgdatadirspath.display()))? .into_iter() .map(|f| { @@ -97,8 +98,14 @@ impl ComputeControlPlane { Ok(node) } - pub fn new_test_node(&mut self, timelineid: ZTimelineId) -> Arc { - let node = self.new_from_page_server(true, timelineid); + pub fn new_test_node(&mut self, branch_name: &str) -> Arc { + let timeline_id = self + .pageserver + .branch_get_by_name(branch_name) + .expect("failed to get timeline_id") + .timeline_id; + + let node = self.new_from_page_server(true, timeline_id); let node = node.unwrap(); // Configure the node to stream WAL directly to the pageserver @@ -115,8 +122,14 @@ impl ComputeControlPlane { node } - pub fn new_test_master_node(&mut self, timelineid: ZTimelineId) -> Arc { - let node = self.new_from_page_server(true, timelineid).unwrap(); + pub fn new_test_master_node(&mut self, branch_name: &str) -> Arc { + let timeline_id = self + .pageserver + .branch_get_by_name(branch_name) + .expect("failed to get timeline_id") + .timeline_id; + + let node = self.new_from_page_server(true, timeline_id).unwrap(); node.append_conf( "postgresql.conf", @@ -126,8 +139,14 @@ impl ComputeControlPlane { node } - pub fn new_node(&mut self, timelineid: ZTimelineId) -> Result> { - let node = self.new_from_page_server(false, timelineid).unwrap(); + pub fn new_node(&mut self, branch_name: &str) -> Result> { + let timeline_id = self + .pageserver + .branch_get_by_name(branch_name) + .expect("failed to get timeline_id") + .timeline_id; + + let node = self.new_from_page_server(false, timeline_id).unwrap(); // Configure the node to stream WAL directly to the pageserver node.append_conf( @@ -291,9 +310,9 @@ impl PostgresNode { max_replication_slots = 10\n\ hot_standby = on\n\ shared_buffers = 1MB\n\ - fsync = off\n\ + fsync = off\n\ max_connections = 100\n\ - wal_sender_timeout = 0\n\ + wal_sender_timeout = 0\n\ wal_level = replica\n\ listen_addresses = '{address}'\n\ port = {port}\n", @@ -326,8 +345,8 @@ impl PostgresNode { Ok(()) } - fn pgdata(&self) -> PathBuf { - self.env.repo_path.join("pgdatadirs").join(&self.name) + pub fn pgdata(&self) -> PathBuf { + self.env.pg_data_dir(&self.name) } pub fn status(&self) -> &str { @@ -413,152 +432,6 @@ impl PostgresNode { String::from_utf8(output.stdout).unwrap().trim().to_string() } - - fn dump_log_file(&self) { - if let Ok(mut file) = File::open(self.env.repo_path.join("pageserver.log")) { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("--------------- pageserver.log:\n{}", buffer); - } - } - - pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.address.ip(), - self.address.port(), - db, - self.whoami() - ); - let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); - - println!("Running {}", sql); - let result = client.query(sql, &[]); - if result.is_err() { - self.dump_log_file(); - } - result.unwrap() - } - - pub fn open_psql(&self, db: &str) -> Client { - let connstring = format!( - "host={} port={} dbname={} user={}", - self.address.ip(), - self.address.port(), - db, - self.whoami() - ); - Client::connect(connstring.as_str(), NoTls).unwrap() - } - - pub fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode { - let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy"); - match Command::new(proxy_path.as_path()) - .args(&["--ztimelineid", &self.timelineid.to_string()]) - .args(&["-s", wal_acceptors]) - .args(&["-h", &self.address.ip().to_string()]) - .args(&["-p", &self.address.port().to_string()]) - .arg("-v") - .stderr( - OpenOptions::new() - .create(true) - .append(true) - .open(self.pgdata().join("safekeeper_proxy.log")) - .unwrap(), - ) - .spawn() - { - Ok(child) => WalProposerNode { pid: child.id() }, - Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e), - } - } - - pub fn pg_regress(&self) -> ExitStatus { - self.safe_psql("postgres", "CREATE DATABASE regression"); - let data_dir = zenith_repo_dir(); - let regress_run_path = data_dir.join("regress"); - fs::create_dir_all(®ress_run_path).unwrap(); - fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); - std::env::set_current_dir(regress_run_path).unwrap(); - - let regress_build_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); - let regress_src_path = - Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); - - let regress_check = Command::new(regress_build_path.join("pg_regress")) - .args(&[ - "--bindir=''", - "--use-existing", - format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(), - format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), - format!( - "--schedule={}", - regress_src_path.join("parallel_schedule").to_str().unwrap() - ) - .as_str(), - format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), - ]) - .env_clear() - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("PGPORT", self.address.port().to_string()) - .env("PGUSER", self.whoami()) - .env("PGHOST", self.address.ip().to_string()) - .status() - .expect("pg_regress failed"); - if !regress_check.success() { - if let Ok(mut file) = File::open("regression.diffs") { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("--------------- regression.diffs:\n{}", buffer); - } - self.dump_log_file(); - if let Ok(mut file) = File::open( - self.env - .repo_path - .join("pgdatadirs") - .join("pg1") - .join("log"), - ) { - let mut buffer = String::new(); - file.read_to_string(&mut buffer).unwrap(); - println!("--------------- pgdatadirs/pg1/log:\n{}", buffer); - } - } - regress_check - } - - pub fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus { - let port = self.address.port().to_string(); - let clients = clients.to_string(); - let seconds = seconds.to_string(); - let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench")) - .args(&["-i", "-p", port.as_str(), "postgres"]) - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("pgbench -i"); - let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench")) - .args(&[ - "-p", - port.as_str(), - "-T", - seconds.as_str(), - "-P", - "1", - "-c", - clients.as_str(), - "-M", - "prepared", - "postgres", - ]) - .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) - .status() - .expect("pgbench run"); - pg_bench_run - } } impl Drop for PostgresNode { diff --git a/control_plane/src/lib.rs b/control_plane/src/lib.rs index a49d39150a..7ce1cffb9c 100644 --- a/control_plane/src/lib.rs +++ b/control_plane/src/lib.rs @@ -6,7 +6,26 @@ // Intended to be used in integration tests and in CLI tools for // local installations. // +use anyhow::{anyhow, bail, Context, Result}; +use std::fs; +use std::path::Path; pub mod compute; pub mod local_env; pub mod storage; + +/// Read a PID file +/// +/// We expect a file that contains a single integer. +/// We return an i32 for compatibility with libc and nix. +pub fn read_pidfile(pidfile: &Path) -> Result { + let pid_str = fs::read_to_string(pidfile) + .with_context(|| format!("failed to read pidfile {:?}", pidfile))?; + let pid: i32 = pid_str + .parse() + .map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?; + if pid < 1 { + bail!("pidfile {:?} contained bad value '{}'", pidfile, pid); + } + Ok(pid) +} diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 44cad16954..119e0fd56d 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -4,37 +4,23 @@ // Now it also provides init method which acts like a stub for proper installation // script which will use local paths. // -use anyhow::Context; -use bytes::Bytes; -use rand::Rng; +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; use std::env; use std::fs; -use std::fs::File; -use std::io::Read; -use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; - -use anyhow::Result; -use serde::{Deserialize, Serialize}; - -use pageserver::zenith_repo_dir; -use pageserver::ZTimelineId; -use postgres_ffi::xlog_utils; -use zenith_utils::lsn::Lsn; +use std::path::PathBuf; +use url::Url; // -// This data structure represents deserialized zenith config, which should be -// located in ~/.zenith -// -// TODO: should we also support ZENITH_CONF env var? +// This data structures represent deserialized zenith CLI config // #[derive(Serialize, Deserialize, Clone)] pub struct LocalEnv { - // Path to the Repository. Here page server and compute nodes will create and store their data. - pub repo_path: PathBuf, + // Pageserver connection strings + pub pageserver_connstring: String, - // System identifier, from the PostgreSQL control file - pub systemid: u64, + // Base directory for both pageserver and compute nodes + pub base_data_dir: PathBuf, // Path to postgres distribution. It's expected that "bin", "include", // "lib", "share" from postgres distribution are there. If at some point @@ -42,38 +28,66 @@ pub struct LocalEnv { // to four separate paths and match OS-specific installation layout. pub pg_distrib_dir: PathBuf, - // Path to pageserver binary. - pub zenith_distrib_dir: PathBuf, + // Path to pageserver binary. Empty for remote pageserver. + pub zenith_distrib_dir: Option, } impl LocalEnv { - // postgres installation + // postgres installation paths pub fn pg_bin_dir(&self) -> PathBuf { self.pg_distrib_dir.join("bin") } pub fn pg_lib_dir(&self) -> PathBuf { self.pg_distrib_dir.join("lib") } + + pub fn pageserver_bin(&self) -> Result { + Ok(self + .zenith_distrib_dir + .as_ref() + .ok_or(anyhow!("Can not manage remote pageserver"))? + .join("pageserver")) + } + + pub fn pg_data_dirs_path(&self) -> PathBuf { + self.base_data_dir.join("pgdatadirs") + } + + pub fn pg_data_dir(&self, name: &str) -> PathBuf { + self.pg_data_dirs_path().join(name) + } + + // TODO: move pageserver files into ./pageserver + pub fn pageserver_data_dir(&self) -> PathBuf { + self.base_data_dir.clone() + } +} + +fn base_path() -> PathBuf { + match std::env::var_os("ZENITH_REPO_DIR") { + Some(val) => PathBuf::from(val.to_str().unwrap()), + None => ".zenith".into(), + } } // // Initialize a new Zenith repository // -pub fn init() -> Result<()> { +pub fn init(remote_pageserver: Option<&str>) -> Result<()> { // check if config already exists - let repo_path = zenith_repo_dir(); - if repo_path.exists() { + let base_path = base_path(); + if base_path.exists() { anyhow::bail!( "{} already exists. Perhaps already initialized?", - repo_path.to_str().unwrap() + base_path.to_str().unwrap() ); } // ok, now check that expected binaries are present - // Find postgres binaries. Follow POSTGRES_BIN if set, otherwise look in "tmp_install". + // Find postgres binaries. Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "tmp_install". let pg_distrib_dir: PathBuf = { - if let Some(postgres_bin) = env::var_os("POSTGRES_BIN") { + if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { postgres_bin.into() } else { let cwd = env::current_dir()?; @@ -84,137 +98,45 @@ pub fn init() -> Result<()> { anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); } - // Find zenith binaries. - let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); - if !zenith_distrib_dir.join("pageserver").exists() { - anyhow::bail!("Can't find pageserver binary.",); - } + fs::create_dir(&base_path)?; + fs::create_dir(base_path.join("pgdatadirs"))?; - // ok, we are good to go - let mut conf = LocalEnv { - repo_path, - pg_distrib_dir, - zenith_distrib_dir, - systemid: 0, + let conf = if let Some(addr) = remote_pageserver { + // check that addr is parsable + let _uri = Url::parse(addr) + .map_err(|e| anyhow!("{}: {}", addr, e))?; + + LocalEnv { + pageserver_connstring: format!("postgresql://{}/", addr), + pg_distrib_dir, + zenith_distrib_dir: None, + base_data_dir: base_path, + } + } else { + // Find zenith binaries. + let zenith_distrib_dir = env::current_exe()?.parent().unwrap().to_owned(); + if !zenith_distrib_dir.join("pageserver").exists() { + anyhow::bail!("Can't find pageserver binary.",); + } + + LocalEnv { + pageserver_connstring: "postgresql://127.0.0.1:6400".to_string(), + pg_distrib_dir, + zenith_distrib_dir: Some(zenith_distrib_dir), + base_data_dir: base_path, + } }; - init_repo(&mut conf)?; + + let toml = toml::to_string(&conf)?; + fs::write(conf.base_data_dir.join("config"), toml)?; Ok(()) } -pub fn init_repo(local_env: &mut LocalEnv) -> Result<()> { - let repopath = &local_env.repo_path; - fs::create_dir(&repopath) - .with_context(|| format!("could not create directory {}", repopath.display()))?; - fs::create_dir(repopath.join("pgdatadirs"))?; - fs::create_dir(repopath.join("timelines"))?; - fs::create_dir(repopath.join("refs"))?; - fs::create_dir(repopath.join("refs").join("branches"))?; - fs::create_dir(repopath.join("refs").join("tags"))?; - println!("created directory structure in {}", repopath.display()); +// Locate and load config +pub fn load_config() -> Result { + let repopath = base_path(); - // Create initial timeline - let tli = create_timeline(&local_env, None)?; - let timelinedir = repopath.join("timelines").join(tli.to_string()); - println!("created initial timeline {}", timelinedir.display()); - - // Run initdb - // - // We create the cluster temporarily in a "tmp" directory inside the repository, - // and move it to the right location from there. - let tmppath = repopath.join("tmp"); - - let initdb_path = local_env.pg_bin_dir().join("initdb"); - let initdb = Command::new(initdb_path) - .args(&["-D", tmppath.to_str().unwrap()]) - .arg("--no-instructions") - .env_clear() - .env("LD_LIBRARY_PATH", local_env.pg_lib_dir().to_str().unwrap()) - .env( - "DYLD_LIBRARY_PATH", - local_env.pg_lib_dir().to_str().unwrap(), - ) - .stdout(Stdio::null()) - .status() - .with_context(|| "failed to execute initdb")?; - if !initdb.success() { - anyhow::bail!("initdb failed"); - } - println!("initdb succeeded"); - - // Read control file to extract the LSN and system id - let controlfile_path = tmppath.join("global").join("pg_control"); - let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?; - let systemid = controlfile.system_identifier; - let lsn = controlfile.checkPoint; - let lsnstr = format!("{:016X}", lsn); - - // Move the initial WAL file - fs::rename( - tmppath.join("pg_wal").join("000000010000000000000001"), - timelinedir - .join("wal") - .join("000000010000000000000001.partial"), - )?; - println!("moved initial WAL file"); - - // Remove pg_wal - fs::remove_dir_all(tmppath.join("pg_wal"))?; - - force_crash_recovery(&tmppath)?; - println!("updated pg_control"); - - let target = timelinedir.join("snapshots").join(&lsnstr); - fs::rename(tmppath, &target)?; - println!("moved 'tmp' to {}", target.display()); - - // Create 'main' branch to refer to the initial timeline - let data = tli.to_string(); - fs::write(repopath.join("refs").join("branches").join("main"), data)?; - println!("created main branch"); - - // Also update the system id in the LocalEnv - local_env.systemid = systemid; - - // write config - let toml = toml::to_string(&local_env)?; - fs::write(repopath.join("config"), toml)?; - - println!( - "new zenith repository was created in {}", - repopath.display() - ); - - Ok(()) -} - -// If control file says the cluster was shut down cleanly, modify it, to mark -// it as crashed. That forces crash recovery when you start the cluster. -// -// FIXME: -// We currently do this to the initial snapshot in "zenith init". It would -// be more natural to do this when the snapshot is restored instead, but we -// currently don't have any code to create new snapshots, so it doesn't matter -// Or better yet, use a less hacky way of putting the cluster into recovery. -// Perhaps create a backup label file in the data directory when it's restored. -fn force_crash_recovery(datadir: &Path) -> Result<()> { - // Read in the control file - let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); - let mut controlfile = - postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?; - - controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; - - fs::write( - controlfilepath.as_path(), - postgres_ffi::encode_pg_control(controlfile), - )?; - - Ok(()) -} - -// check that config file is present -pub fn load_config(repopath: &Path) -> Result { if !repopath.exists() { anyhow::bail!( "Zenith config is not found in {}. You need to run 'zenith init' first", @@ -222,32 +144,13 @@ pub fn load_config(repopath: &Path) -> Result { ); } + // TODO: check that it looks like a zenith repository + // load and parse file let config = fs::read_to_string(repopath.join("config"))?; toml::from_str(config.as_str()).map_err(|e| e.into()) } -// local env for tests -pub fn test_env(testname: &str) -> LocalEnv { - fs::create_dir_all("../tmp_check").expect("could not create directory ../tmp_check"); - - let repo_path = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_check/") - .join(testname); - - // Remove remnants of old test repo - let _ = fs::remove_dir_all(&repo_path); - - let mut local_env = LocalEnv { - repo_path, - pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"), - zenith_distrib_dir: cargo_bin_dir(), - systemid: 0, - }; - init_repo(&mut local_env).expect("could not initialize zenith repository"); - local_env -} - // Find the directory where the binaries were put (i.e. target/debug/) pub fn cargo_bin_dir() -> PathBuf { let mut pathbuf = std::env::current_exe().unwrap(); @@ -259,155 +162,3 @@ pub fn cargo_bin_dir() -> PathBuf { pathbuf } - -#[derive(Debug, Clone, Copy)] -pub struct PointInTime { - pub timelineid: ZTimelineId, - pub lsn: Lsn, -} - -fn create_timeline(local_env: &LocalEnv, ancestor: Option) -> Result { - let repopath = &local_env.repo_path; - - // Create initial timeline - let mut tli_buf = [0u8; 16]; - rand::thread_rng().fill(&mut tli_buf); - let timelineid = ZTimelineId::from(tli_buf); - - let timelinedir = repopath.join("timelines").join(timelineid.to_string()); - - fs::create_dir(&timelinedir)?; - fs::create_dir(&timelinedir.join("snapshots"))?; - fs::create_dir(&timelinedir.join("wal"))?; - - if let Some(ancestor) = ancestor { - let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn); - fs::write(timelinedir.join("ancestor"), data)?; - } - - Ok(timelineid) -} - -// Create a new branch in the repository (for the "zenith branch" subcommand) -pub fn create_branch( - local_env: &LocalEnv, - branchname: &str, - startpoint: PointInTime, -) -> Result<()> { - let repopath = &local_env.repo_path; - - // create a new timeline for it - let newtli = create_timeline(local_env, Some(startpoint))?; - let newtimelinedir = repopath.join("timelines").join(newtli.to_string()); - - let data = newtli.to_string(); - fs::write( - repopath.join("refs").join("branches").join(branchname), - data, - )?; - - // Copy the latest snapshot (TODO: before the startpoint) and all WAL - // TODO: be smarter and avoid the copying... - let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(local_env, startpoint.timelineid)?; - let copy_opts = fs_extra::dir::CopyOptions::new(); - fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; - - let oldtimelinedir = repopath - .join("timelines") - .join(startpoint.timelineid.to_string()); - copy_wal( - &oldtimelinedir.join("wal"), - &newtimelinedir.join("wal"), - startpoint.lsn, - 16 * 1024 * 1024 // FIXME: assume default WAL segment size - )?; - - Ok(()) -} - -/// -/// Copy all WAL segments from one directory to another, up to given LSN. -/// -/// If the given LSN is in the middle of a segment, the last segment containing it -/// is written out as .partial, and padded with zeros. -/// -fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{ - - let last_segno = upto.segment_number(wal_seg_size); - let last_segoff = upto.segment_offset(wal_seg_size); - - for entry in fs::read_dir(src_dir).unwrap() { - if let Ok(entry) = entry { - let entry_name = entry.file_name(); - let fname = entry_name.to_str().unwrap(); - - // Check if the filename looks like an xlog file, or a .partial file. - if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) { - continue - } - let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize); - - let copylen; - let mut dst_fname = PathBuf::from(fname); - if segno > last_segno { - // future segment, skip - continue; - } else if segno < last_segno { - copylen = wal_seg_size; - dst_fname.set_extension(""); - } else { - copylen = last_segoff; - dst_fname.set_extension("partial"); - } - - let src_file = File::open(entry.path())?; - let mut dst_file = File::create(dst_dir.join(&dst_fname))?; - std::io::copy(&mut src_file.take(copylen), &mut dst_file)?; - - if copylen < wal_seg_size { - std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?; - } - } - } - Ok(()) -} - -// Find the end of valid WAL in a wal directory -pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result { - let repopath = &local_env.repo_path; - let waldir = repopath - .join("timelines") - .join(timeline.to_string()) - .join("wal"); - - let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true); - - Ok(Lsn(lsn)) -} - -// Find the latest snapshot for a timeline -fn find_latest_snapshot(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> { - let repopath = &local_env.repo_path; - - let snapshotsdir = repopath - .join("timelines") - .join(timeline.to_string()) - .join("snapshots"); - let paths = fs::read_dir(&snapshotsdir)?; - let mut maxsnapshot = Lsn(0); - let mut snapshotdir: Option = None; - for path in paths { - let path = path?; - let filename = path.file_name().to_str().unwrap().to_owned(); - if let Ok(lsn) = Lsn::from_hex(&filename) { - maxsnapshot = std::cmp::max(lsn, maxsnapshot); - snapshotdir = Some(path.path()); - } - } - if maxsnapshot == Lsn(0) { - // TODO: check ancestor timeline - anyhow::bail!("no snapshot found in {}", snapshotsdir.display()); - } - - Ok((maxsnapshot, snapshotdir.unwrap())) -} diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index 0af74ace0d..fc45911bdd 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -1,123 +1,18 @@ -use anyhow::{anyhow, bail, Context, Result}; -use nix::sys::signal::{kill, Signal}; -use nix::unistd::Pid; -use std::convert::TryInto; -use std::fs; use std::net::{SocketAddr, TcpStream}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::process::Command; -use std::str::FromStr; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use std::thread; use std::time::Duration; +use std::collections::HashMap; +use anyhow::{anyhow, bail, Result}; +use nix::sys::signal::{kill, Signal}; +use nix::unistd::Pid; use postgres::{Client, NoTls}; +use pageserver::branches::BranchInfo; use crate::local_env::LocalEnv; -use pageserver::ZTimelineId; - -// -// Collection of several example deployments useful for tests. -// -// I'm intendedly modelling storage and compute control planes as a separate entities -// as it is closer to the actual setup. -// -pub struct TestStorageControlPlane { - pub wal_acceptors: Vec, - pub pageserver: Arc, - pub test_done: AtomicBool, - pub repopath: PathBuf, -} - -impl TestStorageControlPlane { - // Peek into the repository, to grab the timeline ID of given branch - pub fn get_branch_timeline(&self, branchname: &str) -> ZTimelineId { - let branchpath = self.repopath.join("refs/branches/".to_owned() + branchname); - - ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap() - } - - // postgres <-> page_server - // - // Initialize a new repository and configure a page server to run in it - // - pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane { - let repopath = local_env.repo_path.clone(); - - let pserver = Arc::new(PageServerNode { - env: local_env.clone(), - kill_on_exit: true, - listen_address: None, - }); - pserver.start().unwrap(); - - TestStorageControlPlane { - wal_acceptors: Vec::new(), - pageserver: pserver, - test_done: AtomicBool::new(false), - repopath, - } - } - - // postgres <-> {wal_acceptor1, wal_acceptor2, ...} - pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane { - let repopath = local_env.repo_path.clone(); - - let mut cplane = TestStorageControlPlane { - wal_acceptors: Vec::new(), - pageserver: Arc::new(PageServerNode { - env: local_env.clone(), - kill_on_exit: true, - listen_address: None, - }), - test_done: AtomicBool::new(false), - repopath, - }; - cplane.pageserver.start().unwrap(); - - const WAL_ACCEPTOR_PORT: usize = 54321; - - for i in 0..redundancy { - let wal_acceptor = WalAcceptorNode { - listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i) - .parse() - .unwrap(), - data_dir: local_env.repo_path.join(format!("wal_acceptor_{}", i)), - env: local_env.clone(), - }; - wal_acceptor.init(); - wal_acceptor.start(); - cplane.wal_acceptors.push(wal_acceptor); - } - cplane - } - - pub fn stop(&self) { - for wa in self.wal_acceptors.iter() { - let _ = wa.stop(); - } - self.test_done.store(true, Ordering::Relaxed); - } - - pub fn get_wal_acceptor_conn_info(&self) -> String { - self.wal_acceptors - .iter() - .map(|wa| wa.listen.to_string()) - .collect::>() - .join(",") - } - - pub fn is_running(&self) -> bool { - self.test_done.load(Ordering::Relaxed) - } -} - -impl Drop for TestStorageControlPlane { - fn drop(&mut self) { - self.stop(); - } -} +use crate::read_pidfile; // // Control routines for pageserver. @@ -125,8 +20,8 @@ impl Drop for TestStorageControlPlane { // Used in CLI and tests. // pub struct PageServerNode { - kill_on_exit: bool, - listen_address: Option, + pub kill_on_exit: bool, + pub listen_address: Option, pub env: LocalEnv, } @@ -146,12 +41,32 @@ impl PageServerNode { } } + pub fn init(&self) -> Result<()> { + let mut cmd = Command::new(self.env.pageserver_bin()?); + let status = cmd.args(&["--init", "-D", self.env.base_data_dir.to_str().unwrap()]) + .env_clear() + .env("RUST_BACKTRACE", "1") + .env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap()) + .env("ZENITH_REPO_DIR", self.repo_path()) + .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pageserver init failed"); + + if status.success() { + Ok(()) + } else { + Err(anyhow!("pageserver init failed")) + } + } + pub fn repo_path(&self) -> PathBuf { - self.env.repo_path.clone() + self.env.pageserver_data_dir() } pub fn pid_file(&self) -> PathBuf { - self.env.repo_path.join("pageserver.pid") + self.repo_path().join("pageserver.pid") } pub fn start(&self) -> Result<()> { @@ -161,11 +76,12 @@ impl PageServerNode { self.repo_path().display() ); - let mut cmd = Command::new(self.env.zenith_distrib_dir.join("pageserver")); - cmd.args(&["-l", self.address().to_string().as_str()]) + let mut cmd = Command::new(self.env.pageserver_bin()?); + cmd.args(&["-l", self.address().to_string().as_str(), "-D", self.repo_path().to_str().unwrap()]) .arg("-d") .env_clear() .env("RUST_BACKTRACE", "1") + .env("POSTGRES_DISTRIB_DIR", self.env.pg_distrib_dir.to_str().unwrap()) .env("ZENITH_REPO_DIR", self.repo_path()) .env("PATH", self.env.pg_bin_dir().to_str().unwrap()) // needs postres-wal-redo binary .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) @@ -226,9 +142,7 @@ impl PageServerNode { client.simple_query(sql).unwrap() } - pub fn page_server_psql_client( - &self, - ) -> std::result::Result { + pub fn page_server_psql_client(&self) -> Result { let connstring = format!( "host={} port={} dbname={} user={}", self.address().ip(), @@ -238,6 +152,74 @@ impl PageServerNode { ); Client::connect(connstring.as_str(), NoTls) } + + pub fn branches_list(&self) -> Result> { + let mut client = self.page_server_psql_client()?; + let query_result = client.simple_query("pg_list")?; + let branches_json = query_result + .first() + .map(|msg| match msg { + postgres::SimpleQueryMessage::Row(row) => row.get(0), + _ => None, + }) + .flatten() + .ok_or_else(|| anyhow!("missing branches"))?; + + let res: Vec = serde_json::from_str(branches_json)?; + Ok(res) + } + + pub fn branch_create(&self, name: &str, startpoint: &str) -> Result { + let mut client = self.page_server_psql_client()?; + let query_result = + client.simple_query(format!("branch_create {} {}", name, startpoint).as_str())?; + + let branch_json = query_result + .first() + .map(|msg| match msg { + postgres::SimpleQueryMessage::Row(row) => row.get(0), + _ => None, + }) + .flatten() + .ok_or_else(|| anyhow!("missing branch"))?; + + let res: BranchInfo = serde_json::from_str(branch_json) + .map_err(|e| anyhow!("failed to parse branch_create response: {}: {}", branch_json, e))?; + + Ok(res) + } + + // TODO: make this a separate request type and avoid loading all the branches + pub fn branch_get_by_name(&self, name: &str) -> Result { + let branch_infos = self.branches_list()?; + let branche_by_name: Result> = branch_infos + .into_iter() + .map(|branch_info| Ok((branch_info.name.clone(), branch_info))) + .collect(); + let branche_by_name = branche_by_name?; + + let branch = branche_by_name + .get(name) + .ok_or_else(|| anyhow!("Branch {} not found", name))?; + + Ok(branch.clone()) + } + + pub fn system_id_get(&self) -> Result { + let mut client = self.page_server_psql_client()?; + let query_result = client + .simple_query("identify_system")? + .first() + .map(|msg| match msg { + postgres::SimpleQueryMessage::Row(row) => row.get(0), + _ => None, + }) + .flatten() + .ok_or_else(|| anyhow!("failed to get system_id"))? + .parse::()?; + + Ok(query_result) + } } impl Drop for PageServerNode { @@ -247,104 +229,3 @@ impl Drop for PageServerNode { } } } - -// -// Control routines for WalAcceptor. -// -// Now used only in test setups. -// -pub struct WalAcceptorNode { - listen: SocketAddr, - data_dir: PathBuf, - env: LocalEnv, -} - -impl WalAcceptorNode { - pub fn init(&self) { - if self.data_dir.exists() { - fs::remove_dir_all(self.data_dir.clone()).unwrap(); - } - fs::create_dir_all(self.data_dir.clone()).unwrap(); - } - - pub fn start(&self) { - println!( - "Starting wal_acceptor in {} listening '{}'", - self.data_dir.to_str().unwrap(), - self.listen - ); - - let status = Command::new(self.env.zenith_distrib_dir.join("wal_acceptor")) - .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-l", self.listen.to_string().as_str()]) - .args(&["--systemid", &self.env.systemid.to_string()]) - // Tell page server it can receive WAL from this WAL safekeeper - // FIXME: If there are multiple safekeepers, they will all inform - // the page server. Only the last "notification" will stay in effect. - // So it's pretty random which safekeeper the page server will connect to - .args(&["--pageserver", "127.0.0.1:64000"]) - .arg("-d") - .arg("-n") - .status() - .expect("failed to start wal_acceptor"); - - if !status.success() { - panic!("wal_acceptor start failed"); - } - } - - pub fn stop(&self) -> Result<()> { - println!("Stopping wal acceptor on {}", self.listen); - let pidfile = self.data_dir.join("wal_acceptor.pid"); - let pid = read_pidfile(&pidfile)?; - let pid = Pid::from_raw(pid); - if kill(pid, Signal::SIGTERM).is_err() { - bail!("Failed to kill wal_acceptor with pid {}", pid); - } - Ok(()) - } -} - -impl Drop for WalAcceptorNode { - fn drop(&mut self) { - // Ignore errors. - let _ = self.stop(); - } -} - -/////////////////////////////////////////////////////////////////////////////// - -pub struct WalProposerNode { - pub pid: u32, -} - -impl WalProposerNode { - pub fn stop(&self) { - // std::process::Child::id() returns u32, we need i32. - let pid: i32 = self.pid.try_into().unwrap(); - let pid = Pid::from_raw(pid); - kill(pid, Signal::SIGTERM).expect("failed to execute kill"); - } -} - -impl Drop for WalProposerNode { - fn drop(&mut self) { - self.stop(); - } -} - -/// Read a PID file -/// -/// We expect a file that contains a single integer. -/// We return an i32 for compatibility with libc and nix. -fn read_pidfile(pidfile: &Path) -> Result { - let pid_str = fs::read_to_string(pidfile) - .with_context(|| format!("failed to read pidfile {:?}", pidfile))?; - let pid: i32 = pid_str - .parse() - .map_err(|_| anyhow!("failed to parse pidfile {:?}", pidfile))?; - if pid < 1 { - bail!("pidfile {:?} contained bad value '{}'", pidfile, pid); - } - Ok(pid) -} diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index 76d78aab0f..53247ac5cb 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -9,7 +9,10 @@ edition = "2018" [dependencies] lazy_static = "1.4.0" rand = "0.8.3" +anyhow = "1.0" +nix = "0.20" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } + pageserver = { path = "../pageserver" } walkeeper = { path = "../walkeeper" } control_plane = { path = "../control_plane" } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs new file mode 100644 index 0000000000..c062e2dfb3 --- /dev/null +++ b/integration_tests/src/lib.rs @@ -0,0 +1,403 @@ +use anyhow::{bail, Result}; +use std::sync::{atomic::AtomicBool, Arc}; +use std::{ + convert::TryInto, + fs::{self, File, OpenOptions}, + io::Read, + net::SocketAddr, + path::{Path, PathBuf}, + process::{Command, ExitStatus}, + sync::atomic::Ordering, +}; + +use control_plane::compute::PostgresNode; +use control_plane::local_env; +use control_plane::read_pidfile; +use control_plane::{local_env::LocalEnv, storage::PageServerNode}; +use nix::sys::signal::{kill, Signal}; +use nix::unistd::Pid; + +use postgres; + +// local compute env for tests +pub fn create_test_env(testname: &str) -> LocalEnv { + let base_path = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../tmp_check/") + .join(testname); + + let base_path_str = base_path.to_str().unwrap(); + + // Remove remnants of old test repo + let _ = fs::remove_dir_all(&base_path); + + fs::create_dir_all(&base_path).expect(format!("could not create directory for {}", base_path_str).as_str()); + + let pgdatadirs_path = base_path.join("pgdatadirs"); + fs::create_dir(&pgdatadirs_path) + .expect(format!("could not create directory {:?}", pgdatadirs_path).as_str()); + + LocalEnv { + pageserver_connstring: "postgresql://127.0.0.1:64000".to_string(), + pg_distrib_dir: Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install"), + zenith_distrib_dir: Some(local_env::cargo_bin_dir()), + base_data_dir: base_path, + } +} + +// +// Collection of several example deployments useful for tests. +// +// I'm intendedly modelling storage and compute control planes as a separate entities +// as it is closer to the actual setup. +// +pub struct TestStorageControlPlane { + pub wal_acceptors: Vec, + pub pageserver: Arc, + pub test_done: AtomicBool, +} + +impl TestStorageControlPlane { + // postgres <-> page_server + // + // Initialize a new repository and configure a page server to run in it + // + pub fn one_page_server(local_env: &LocalEnv) -> TestStorageControlPlane { + let pserver = Arc::new(PageServerNode { + env: local_env.clone(), + kill_on_exit: true, + listen_address: None, + }); + pserver.init().unwrap(); + pserver.start().unwrap(); + + TestStorageControlPlane { + wal_acceptors: Vec::new(), + pageserver: pserver, + test_done: AtomicBool::new(false), + } + } + + // postgres <-> {wal_acceptor1, wal_acceptor2, ...} + pub fn fault_tolerant(local_env: &LocalEnv, redundancy: usize) -> TestStorageControlPlane { + let mut cplane = TestStorageControlPlane { + wal_acceptors: Vec::new(), + pageserver: Arc::new(PageServerNode { + env: local_env.clone(), + kill_on_exit: true, + listen_address: None, + }), + test_done: AtomicBool::new(false), + // repopath, + }; + cplane.pageserver.init().unwrap(); + cplane.pageserver.start().unwrap(); + + let systemid = cplane.pageserver.system_id_get().unwrap(); + + const WAL_ACCEPTOR_PORT: usize = 54321; + + let datadir_base = local_env.base_data_dir.join("safekeepers"); + fs::create_dir_all(&datadir_base).unwrap(); + + for i in 0..redundancy { + let wal_acceptor = WalAcceptorNode { + listen: format!("127.0.0.1:{}", WAL_ACCEPTOR_PORT + i) + .parse() + .unwrap(), + data_dir: datadir_base.join(format!("wal_acceptor_{}", i)), + systemid, + env: local_env.clone(), + pass_to_pageserver: i == 0 + }; + wal_acceptor.init(); + wal_acceptor.start(); + cplane.wal_acceptors.push(wal_acceptor); + } + cplane + } + + pub fn stop(&self) { + for wa in self.wal_acceptors.iter() { + let _ = wa.stop(); + } + self.test_done.store(true, Ordering::Relaxed); + } + + pub fn get_wal_acceptor_conn_info(&self) -> String { + self.wal_acceptors + .iter() + .map(|wa| wa.listen.to_string()) + .collect::>() + .join(",") + } + + pub fn is_running(&self) -> bool { + self.test_done.load(Ordering::Relaxed) + } +} + +impl Drop for TestStorageControlPlane { + fn drop(&mut self) { + self.stop(); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// +// PostgresNodeExt +// +/////////////////////////////////////////////////////////////////////////////// + +/// +/// Testing utilities for PostgresNode type +/// +pub trait PostgresNodeExt { + fn pg_regress(&self) -> ExitStatus; + fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus; + fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode; + fn open_psql(&self, db: &str) -> postgres::Client; + fn dump_log_file(&self); + fn safe_psql(&self, db: &str, sql: &str) -> Vec; +} + +impl PostgresNodeExt for PostgresNode { + fn pg_regress(&self) -> ExitStatus { + self.safe_psql("postgres", "CREATE DATABASE regression"); + + let regress_run_path = self.env.base_data_dir.join("regress"); + fs::create_dir_all(®ress_run_path).unwrap(); + fs::create_dir_all(regress_run_path.join("testtablespace")).unwrap(); + std::env::set_current_dir(regress_run_path).unwrap(); + + let regress_build_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/build/src/test/regress"); + let regress_src_path = + Path::new(env!("CARGO_MANIFEST_DIR")).join("../vendor/postgres/src/test/regress"); + + let regress_check = Command::new(regress_build_path.join("pg_regress")) + .args(&[ + "--bindir=''", + "--use-existing", + format!("--bindir={}", self.env.pg_bin_dir().to_str().unwrap()).as_str(), + format!("--dlpath={}", regress_build_path.to_str().unwrap()).as_str(), + format!( + "--schedule={}", + regress_src_path.join("parallel_schedule").to_str().unwrap() + ) + .as_str(), + format!("--inputdir={}", regress_src_path.to_str().unwrap()).as_str(), + ]) + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("PGPORT", self.address.port().to_string()) + .env("PGUSER", self.whoami()) + .env("PGHOST", self.address.ip().to_string()) + .status() + .expect("pg_regress failed"); + if !regress_check.success() { + if let Ok(mut file) = File::open("regression.diffs") { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("--------------- regression.diffs:\n{}", buffer); + } + // self.dump_log_file(); + if let Ok(mut file) = File::open(self.env.pg_data_dir("pg1").join("log")) { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("--------------- pgdatadirs/pg1/log:\n{}", buffer); + } + } + regress_check + } + + fn pg_bench(&self, clients: u32, seconds: u32) -> ExitStatus { + let port = self.address.port().to_string(); + let clients = clients.to_string(); + let seconds = seconds.to_string(); + let _pg_bench_init = Command::new(self.env.pg_bin_dir().join("pgbench")) + .args(&["-i", "-p", port.as_str(), "postgres"]) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pgbench -i"); + let pg_bench_run = Command::new(self.env.pg_bin_dir().join("pgbench")) + .args(&[ + "-p", + port.as_str(), + "-T", + seconds.as_str(), + "-P", + "1", + "-c", + clients.as_str(), + "-M", + "prepared", + "postgres", + ]) + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .status() + .expect("pgbench run"); + pg_bench_run + } + + fn start_proxy(&self, wal_acceptors: &str) -> WalProposerNode { + let proxy_path = self.env.pg_bin_dir().join("safekeeper_proxy"); + match Command::new(proxy_path.as_path()) + .args(&["--ztimelineid", &self.timelineid.to_string()]) + .args(&["-s", wal_acceptors]) + .args(&["-h", &self.address.ip().to_string()]) + .args(&["-p", &self.address.port().to_string()]) + .arg("-v") + .stderr( + OpenOptions::new() + .create(true) + .append(true) + .open(self.pgdata().join("safekeeper_proxy.log")) + .unwrap(), + ) + .spawn() + { + Ok(child) => WalProposerNode { pid: child.id() }, + Err(e) => panic!("Failed to launch {:?}: {}", proxy_path, e), + } + } + + fn dump_log_file(&self) { + if let Ok(mut file) = File::open(self.env.pageserver_data_dir().join("pageserver.log")) { + let mut buffer = String::new(); + file.read_to_string(&mut buffer).unwrap(); + println!("--------------- pageserver.log:\n{}", buffer); + } + } + + fn safe_psql(&self, db: &str, sql: &str) -> Vec { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.address.ip(), + self.address.port(), + db, + self.whoami() + ); + let mut client = postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap(); + + println!("Running {}", sql); + let result = client.query(sql, &[]); + if result.is_err() { + // self.dump_log_file(); + } + result.unwrap() + } + + fn open_psql(&self, db: &str) -> postgres::Client { + let connstring = format!( + "host={} port={} dbname={} user={}", + self.address.ip(), + self.address.port(), + db, + self.whoami() + ); + postgres::Client::connect(connstring.as_str(), postgres::NoTls).unwrap() + } +} + +/////////////////////////////////////////////////////////////////////////////// +// +// WalAcceptorNode +// +/////////////////////////////////////////////////////////////////////////////// + +// +// Control routines for WalAcceptor. +// +// Now used only in test setups. +// +pub struct WalAcceptorNode { + listen: SocketAddr, + data_dir: PathBuf, + systemid: u64, + env: LocalEnv, + pass_to_pageserver: bool, +} + +impl WalAcceptorNode { + pub fn init(&self) { + if self.data_dir.exists() { + fs::remove_dir_all(self.data_dir.clone()).unwrap(); + } + fs::create_dir_all(self.data_dir.clone()).unwrap(); + } + + pub fn start(&self) { + println!( + "Starting wal_acceptor in {} listening '{}'", + self.data_dir.to_str().unwrap(), + self.listen + ); + + let ps_arg = if self.pass_to_pageserver { + // Tell page server it can receive WAL from this WAL safekeeper + ["--pageserver", "127.0.0.1:64000"].to_vec() + } else { + [].to_vec() + }; + + let status = Command::new(self.env.zenith_distrib_dir.as_ref().unwrap().join("wal_acceptor")) + .args(&["-D", self.data_dir.to_str().unwrap()]) + .args(&["-l", self.listen.to_string().as_str()]) + .args(&["--systemid", self.systemid.to_string().as_str()]) + .args(&ps_arg) + .arg("-d") + .arg("-n") + .status() + .expect("failed to start wal_acceptor"); + + if !status.success() { + panic!("wal_acceptor start failed"); + } + } + + pub fn stop(&self) -> Result<()> { + println!("Stopping wal acceptor on {}", self.listen); + let pidfile = self.data_dir.join("wal_acceptor.pid"); + let pid = read_pidfile(&pidfile)?; + let pid = Pid::from_raw(pid); + if kill(pid, Signal::SIGTERM).is_err() { + bail!("Failed to kill wal_acceptor with pid {}", pid); + } + Ok(()) + } +} + +impl Drop for WalAcceptorNode { + fn drop(&mut self) { + // Ignore errors. + let _ = self.stop(); + } +} + +/////////////////////////////////////////////////////////////////////////////// +// +// WalProposerNode +// +/////////////////////////////////////////////////////////////////////////////// + +pub struct WalProposerNode { + pub pid: u32, +} + +impl WalProposerNode { + pub fn stop(&self) { + // std::process::Child::id() returns u32, we need i32. + let pid: i32 = self.pid.try_into().unwrap(); + let pid = Pid::from_raw(pid); + kill(pid, Signal::SIGTERM).expect("failed to execute kill"); + } +} + +impl Drop for WalProposerNode { + fn drop(&mut self) { + self.stop(); + } +} diff --git a/integration_tests/tests/test_compute.rs b/integration_tests/tests/test_compute.rs deleted file mode 100644 index f4cf38432e..0000000000 --- a/integration_tests/tests/test_compute.rs +++ /dev/null @@ -1,11 +0,0 @@ -// test node resettlement to an empty datadir - -// TODO -/* -#[test] -fn test_resettlement() {} - -// test seq scan of everythin after restart -#[test] -fn test_cold_seqscan() {} -*/ diff --git a/integration_tests/tests/test_control_plane.rs b/integration_tests/tests/test_control_plane.rs deleted file mode 100644 index 8724d5fda1..0000000000 --- a/integration_tests/tests/test_control_plane.rs +++ /dev/null @@ -1,8 +0,0 @@ -// TODO -/* -#[test] -fn test_actions() {} - -#[test] -fn test_regress() {} -*/ diff --git a/integration_tests/tests/test_pageserver.rs b/integration_tests/tests/test_pageserver.rs index c90bb72b6c..16a94286e0 100644 --- a/integration_tests/tests/test_pageserver.rs +++ b/integration_tests/tests/test_pageserver.rs @@ -1,23 +1,22 @@ -// mod control_plane; use control_plane::compute::ComputeControlPlane; -use control_plane::local_env; -use control_plane::local_env::PointInTime; -use control_plane::storage::TestStorageControlPlane; + +use integration_tests; +use integration_tests::TestStorageControlPlane; +use integration_tests::PostgresNodeExt; // XXX: force all redo at the end // -- restart + seqscan won't read deleted stuff // -- pageserver api endpoint to check all rels #[test] fn test_redo_cases() { - let local_env = local_env::test_env("test_redo_cases"); + let local_env = integration_tests::create_test_env("test_redo_cases"); // Start pageserver that reads WAL directly from that postgres let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_node(maintli); + let node = compute_cplane.new_test_node("main"); node.start().unwrap(); // check basic work with table @@ -51,15 +50,14 @@ fn test_redo_cases() { // Runs pg_regress on a compute node #[test] fn test_regress() { - let local_env = local_env::test_env("test_regress"); + let local_env = integration_tests::create_test_env("test_regress"); // Start pageserver that reads WAL directly from that postgres let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_node(maintli); + let node = compute_cplane.new_test_node("main"); node.start().unwrap(); let status = node.pg_regress(); @@ -69,15 +67,14 @@ fn test_regress() { // Runs pg_bench on a compute node #[test] fn pgbench() { - let local_env = local_env::test_env("pgbench"); + let local_env = integration_tests::create_test_env("pgbench"); // Start pageserver that reads WAL directly from that postgres let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_node(maintli); + let node = compute_cplane.new_test_node("main"); node.start().unwrap(); let status = node.pg_bench(10, 5); @@ -87,30 +84,21 @@ fn pgbench() { // Run two postgres instances on one pageserver, on different timelines #[test] fn test_pageserver_two_timelines() { - let local_env = local_env::test_env("test_pageserver_two_timelines"); + let local_env = integration_tests::create_test_env("test_pageserver_two_timelines"); // Start pageserver that reads WAL directly from that postgres let storage_cplane = TestStorageControlPlane::one_page_server(&local_env); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); - let maintli = storage_cplane.get_branch_timeline("main"); - // Create new branch at the end of 'main' - let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap(); - local_env::create_branch( - &local_env, - "experimental", - PointInTime { - timelineid: maintli, - lsn: startpoint, - }, - ) - .unwrap(); - let experimentaltli = storage_cplane.get_branch_timeline("experimental"); + storage_cplane + .pageserver + .branch_create("experimental", "main") + .unwrap(); // Launch postgres instances on both branches - let node1 = compute_cplane.new_test_node(maintli); - let node2 = compute_cplane.new_test_node(experimentaltli); + let node1 = compute_cplane.new_test_node("main"); + let node2 = compute_cplane.new_test_node("experimental"); node1.start().unwrap(); node2.start().unwrap(); diff --git a/integration_tests/tests/test_wal_acceptor.rs b/integration_tests/tests/test_wal_acceptor.rs index 5a4a2074d2..8b8a193dcb 100644 --- a/integration_tests/tests/test_wal_acceptor.rs +++ b/integration_tests/tests/test_wal_acceptor.rs @@ -1,21 +1,20 @@ -// Restart acceptors one by one while compute is under the load. -use control_plane::compute::ComputeControlPlane; -use control_plane::local_env; -use control_plane::local_env::PointInTime; -use control_plane::storage::TestStorageControlPlane; -use pageserver::ZTimelineId; - use rand::Rng; use std::sync::Arc; use std::time::SystemTime; use std::{thread, time}; +use control_plane::compute::ComputeControlPlane; + +use integration_tests; +use integration_tests::TestStorageControlPlane; +use integration_tests::PostgresNodeExt; + const DOWNTIME: u64 = 2; #[test] //#[ignore] fn test_embedded_wal_proposer() { - let local_env = local_env::test_env("test_embedded_wal_proposer"); + let local_env = integration_tests::create_test_env("test_embedded_wal_proposer"); const REDUNDANCY: usize = 3; let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); @@ -23,8 +22,7 @@ fn test_embedded_wal_proposer() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_master_node(maintli); + let node = compute_cplane.new_test_master_node("main"); node.append_conf( "postgresql.conf", &format!("wal_acceptors='{}'\n", wal_acceptors), @@ -52,7 +50,7 @@ fn test_embedded_wal_proposer() { #[test] fn test_acceptors_normal_work() { - let local_env = local_env::test_env("test_acceptors_normal_work"); + let local_env = integration_tests::create_test_env("test_acceptors_normal_work"); const REDUNDANCY: usize = 3; let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); @@ -60,8 +58,7 @@ fn test_acceptors_normal_work() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_master_node(maintli); + let node = compute_cplane.new_test_master_node("main"); node.start().unwrap(); // start proxy @@ -93,36 +90,25 @@ fn test_many_timelines() { // Initialize a new repository, and set up WAL safekeepers and page server. const REDUNDANCY: usize = 3; const N_TIMELINES: usize = 5; - let local_env = local_env::test_env("test_many_timelines"); + let local_env = integration_tests::create_test_env("test_many_timelines"); let storage_cplane = TestStorageControlPlane::fault_tolerant(&local_env, REDUNDANCY); let mut compute_cplane = ComputeControlPlane::local(&local_env, &storage_cplane.pageserver); let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // Create branches - let mut timelines: Vec = Vec::new(); - let maintli = storage_cplane.get_branch_timeline("main"); // main branch - timelines.push(maintli); - let startpoint = local_env::find_end_of_wal(&local_env, maintli).unwrap(); + let mut timelines: Vec = Vec::new(); + timelines.push("main".to_string()); + for i in 1..N_TIMELINES { - // additional branches let branchname = format!("experimental{}", i); - local_env::create_branch( - &local_env, - &branchname, - PointInTime { - timelineid: maintli, - lsn: startpoint, - }, - ) - .unwrap(); - let tli = storage_cplane.get_branch_timeline(&branchname); - timelines.push(tli); + storage_cplane.pageserver.branch_create(&branchname, "main").unwrap(); + timelines.push(branchname); } // start postgres on each timeline let mut nodes = Vec::new(); - for tli in timelines { - let node = compute_cplane.new_test_node(tli); + for tli_name in timelines { + let node = compute_cplane.new_test_node(&tli_name); nodes.push(node.clone()); node.start().unwrap(); node.start_proxy(&wal_acceptors); @@ -159,7 +145,7 @@ fn test_many_timelines() { // Majority is always alive #[test] fn test_acceptors_restarts() { - let local_env = local_env::test_env("test_acceptors_restarts"); + let local_env = integration_tests::create_test_env("test_acceptors_restarts"); // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; @@ -171,8 +157,7 @@ fn test_acceptors_restarts() { let mut rng = rand::thread_rng(); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_master_node(maintli); + let node = compute_cplane.new_test_master_node("main"); node.start().unwrap(); // start proxy @@ -222,7 +207,7 @@ fn start_acceptor(cplane: &Arc, no: usize) { // N_CRASHES env var #[test] fn test_acceptors_unavailability() { - let local_env = local_env::test_env("test_acceptors_unavailability"); + let local_env = integration_tests::create_test_env("test_acceptors_unavailability"); // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 2; @@ -232,8 +217,7 @@ fn test_acceptors_unavailability() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_master_node(maintli); + let node = compute_cplane.new_test_master_node("main"); node.start().unwrap(); // start proxy @@ -307,7 +291,7 @@ fn simulate_failures(cplane: Arc) { // Race condition test #[test] fn test_race_conditions() { - let local_env = local_env::test_env("test_race_conditions"); + let local_env = integration_tests::create_test_env("test_race_conditions"); // Start pageserver that reads WAL directly from that postgres const REDUNDANCY: usize = 3; @@ -319,8 +303,7 @@ fn test_race_conditions() { let wal_acceptors = storage_cplane.get_wal_acceptor_conn_info(); // start postgres - let maintli = storage_cplane.get_branch_timeline("main"); - let node = compute_cplane.new_test_master_node(maintli); + let node = compute_cplane.new_test_master_node("main"); node.start().unwrap(); // start proxy diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 8250818c00..cf073ad4d6 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -40,6 +40,7 @@ tar = "0.4.33" parse_duration = "2.1.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1" +fs_extra = "1.2.0" postgres_ffi = { path = "../postgres_ffi" } zenith_utils = { path = "../zenith_utils" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index ed8ad22cd9..431dcdb37f 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -4,7 +4,8 @@ use log::*; use parse_duration::parse; -use std::fs::{self, File, OpenOptions}; +use std::fs::{File, OpenOptions}; +use std::{env, path::PathBuf}; use std::io; use std::process::exit; use std::thread; @@ -16,7 +17,7 @@ use daemonize::Daemonize; use slog::{Drain, FnValue}; -use pageserver::{page_cache, page_service, tui, zenith_repo_dir, PageServerConf}; +use pageserver::{page_cache, page_service, tui, PageServerConf, branches}; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_PERIOD_SEC: u64 = 10; @@ -47,6 +48,12 @@ fn main() -> Result<()> { .takes_value(false) .help("Run in the background"), ) + .arg( + Arg::with_name("init") + .long("init") + .takes_value(false) + .help("Initialize pageserver repo"), + ) .arg( Arg::with_name("gc_horizon") .long("gc_horizon") @@ -59,16 +66,55 @@ fn main() -> Result<()> { .takes_value(true) .help("Interval between garbage collector iterations"), ) + .arg( + Arg::with_name("workdir") + .short("D") + .long("workdir") + .takes_value(true) + .help("Working directory for the pageserver"), + ) .get_matches(); + let workdir = if let Some(workdir_arg) = arg_matches.value_of("workdir") { + PathBuf::from(workdir_arg) + } else if let Some(workdir_arg) = std::env::var_os("ZENITH_REPO_DIR") { + PathBuf::from(workdir_arg.to_str().unwrap()) + } else { + PathBuf::from(".zenith") + }; + + let pg_distrib_dir: PathBuf = { + if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { + postgres_bin.into() + } else { + let cwd = env::current_dir()?; + cwd.join("tmp_install") + } + }; + + if !pg_distrib_dir.join("bin/postgres").exists() { + anyhow::bail!("Can't find postgres binary at {:?}", pg_distrib_dir); + } + let mut conf = PageServerConf { daemonize: false, interactive: false, gc_horizon: DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(DEFAULT_GC_PERIOD_SEC), - listen_addr: "127.0.0.1:5430".parse().unwrap(), + listen_addr: "127.0.0.1:64000".parse().unwrap(), + workdir, + pg_distrib_dir, }; + // Create repo and exit if init was requested + if arg_matches.is_present("init") { + branches::init_repo(&conf)?; + return Ok(()); + } + + // Set CWD to workdir for non-daemon modes + env::set_current_dir(&conf.workdir)?; + if arg_matches.is_present("daemonize") { conf.daemonize = true; } @@ -98,8 +144,7 @@ fn main() -> Result<()> { } fn start_pageserver(conf: &PageServerConf) -> Result<()> { - let repodir = zenith_repo_dir(); - let log_filename = repodir.join("pageserver.log"); + let log_filename = "pageserver.log"; // Don't open the same file for output multiple times; // the different fds could overwrite each other's output. let log_file = OpenOptions::new() @@ -141,8 +186,8 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { let stderr = log_file; let daemonize = Daemonize::new() - .pid_file(repodir.join("pageserver.pid")) - .working_directory(repodir) + .pid_file("pageserver.pid") + .working_directory(".") .stdout(stdout) .stderr(stderr); @@ -153,26 +198,14 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { } else { // change into the repository directory. In daemon mode, Daemonize // does this for us. - let repodir = zenith_repo_dir(); - std::env::set_current_dir(&repodir)?; - info!("Changed current directory to repository in {:?}", &repodir); + std::env::set_current_dir(&conf.workdir)?; + info!("Changed current directory to repository in {:?}", &conf.workdir); } let mut threads = Vec::new(); // TODO: Check that it looks like a valid repository before going further - // Create directory for wal-redo datadirs - match fs::create_dir("wal-redo") { - Ok(_) => {} - Err(e) => match e.kind() { - io::ErrorKind::AlreadyExists => {} - _ => { - anyhow::bail!("Failed to create wal-redo data directory: {}", e); - } - }, - } - page_cache::init(conf); // GetPage@LSN requests are served by another thread. (It uses async I/O, diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 178fbe4075..8c1973d2c0 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -1,17 +1,135 @@ -use anyhow::Result; -use serde::{Deserialize, Serialize}; +// +// Branch management code +// +// TODO: move all paths construction to conf impl +// +use anyhow::{Context, Result, anyhow}; +use bytes::Bytes; +use postgres_ffi::xlog_utils; +use rand::Rng; +use serde::{Deserialize, Serialize}; +use std::{collections::HashMap, fs, path::{Path, PathBuf}, process::{Command, Stdio}, str::FromStr}; +use fs_extra; +use fs::File; +use std::io::Read; +use std::env; use zenith_utils::lsn::Lsn; -use crate::{repository::Repository, ZTimelineId}; +use crate::{repository::Repository, ZTimelineId, PageServerConf}; -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct BranchInfo { pub name: String, - pub timeline_id: String, + pub timeline_id: ZTimelineId, pub latest_valid_lsn: Option, } +// impl BranchInfo { +// pub fn lsn_string(&self) -> String { +// let lsn_string_opt = self.latest_valid_lsn.map(|lsn| lsn.to_string()); +// let lsn_str = lsn_string_opt.as_deref().unwrap_or("?"); +// format!("{}@{}", self.name, lsn_str) +// } +// } + +#[derive(Debug, Clone, Copy)] +pub struct PointInTime { + pub timelineid: ZTimelineId, + pub lsn: Lsn, +} + +pub fn init_repo(conf: &PageServerConf) -> Result<()> { + // top-level dir may exist if we are creating it through CLI + fs::create_dir_all(&conf.workdir) + .with_context(|| format!("could not create directory {}", &conf.workdir.display()))?; + + env::set_current_dir(&conf.workdir)?; + + fs::create_dir(std::path::Path::new("timelines"))?; + fs::create_dir(std::path::Path::new("refs"))?; + fs::create_dir(std::path::Path::new("refs").join("branches"))?; + fs::create_dir(std::path::Path::new("refs").join("tags"))?; + fs::create_dir(std::path::Path::new("wal-redo"))?; + + println!("created directory structure in {}", &conf.workdir.display()); + + // Create initial timeline + let tli = create_timeline(conf, None)?; + let timelinedir = conf.timeline_path(tli); + println!("created initial timeline {}", timelinedir.display()); + + // Run initdb + // + // We create the cluster temporarily in a "tmp" directory inside the repository, + // and move it to the right location from there. + let tmppath = std::path::Path::new("tmp"); + + let initdb_path = conf.pg_bin_dir().join("initdb"); + let initdb = Command::new(initdb_path) + .args(&["-D", tmppath.to_str().unwrap()]) + .arg("--no-instructions") + .env_clear() + .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) + .env( + "DYLD_LIBRARY_PATH", + conf.pg_lib_dir().to_str().unwrap(), + ) + .stdout(Stdio::null()) + .status() + .with_context(|| "failed to execute initdb")?; + if !initdb.success() { + anyhow::bail!("initdb failed"); + } + println!("initdb succeeded"); + + // Read control file to extract the LSN and system id + let controlfile_path = tmppath.join("global").join("pg_control"); + let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?; + // let systemid = controlfile.system_identifier; + let lsn = controlfile.checkPoint; + let lsnstr = format!("{:016X}", lsn); + + // Move the initial WAL file + fs::rename( + tmppath.join("pg_wal").join("000000010000000000000001"), + timelinedir + .join("wal") + .join("000000010000000000000001.partial"), + )?; + println!("moved initial WAL file"); + + // Remove pg_wal + fs::remove_dir_all(tmppath.join("pg_wal"))?; + + force_crash_recovery(&tmppath)?; + println!("updated pg_control"); + + let target = timelinedir.join("snapshots").join(&lsnstr); + fs::rename(tmppath, &target)?; + println!("moved 'tmp' to {}", target.display()); + + // Create 'main' branch to refer to the initial timeline + let data = tli.to_string(); + fs::write(conf.branch_path("main"), data)?; + println!("created main branch"); + + // XXX: do we need that now? -- yep, for test only + + // // Also update the system id in the LocalEnv + // local_env.systemid = systemid; + // // write config + // let toml = toml::to_string(&local_env)?; + // fs::write(repopath.join("config"), toml)?; + + println!( + "new zenith repository was created in {}", + conf.workdir.display() + ); + + Ok(()) +} + pub(crate) fn get_branches(repository: &dyn Repository) -> Result> { // adapted from CLI code let branches_dir = std::path::Path::new("refs").join("branches"); @@ -28,9 +146,263 @@ pub(crate) fn get_branches(repository: &dyn Repository) -> Result Result { + // let branches = get_branches(); + + let branches_dir = std::path::Path::new("refs").join("branches"); + let branches = std::fs::read_dir(&branches_dir)? + .map(|dir_entry_res| { + let dir_entry = dir_entry_res?; + let name = dir_entry.file_name().to_str().unwrap().to_string(); + let timeline_id = std::fs::read_to_string(dir_entry.path())?.parse::()?; + Ok((name, timeline_id)) + }) + .collect::>>()?; + + let main_tli = branches + .get("main") + .ok_or_else(|| anyhow!("Branch main not found"))?; + + let (_, main_snap_dir) = find_latest_snapshot(conf, *main_tli)?; + let controlfile_path = main_snap_dir.join("global").join("pg_control"); + let controlfile = postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfile_path)?))?; + Ok(controlfile.system_identifier) +} + +pub(crate) fn create_branch(conf: &PageServerConf, branchname: &str, startpoint_str: &str) -> Result { + if conf.branch_path(&branchname).exists() { + anyhow::bail!("branch {} already exists", branchname); + } + + let mut startpoint = parse_point_in_time(conf, startpoint_str)?; + + if startpoint.lsn == Lsn(0) { + // Find end of WAL on the old timeline + let end_of_wal = find_end_of_wal(conf, startpoint.timelineid)?; + println!("branching at end of WAL: {}", end_of_wal); + startpoint.lsn = end_of_wal; + } + + // create a new timeline for it + let newtli = create_timeline(conf, Some(startpoint))?; + let newtimelinedir = conf.timeline_path(newtli); + + let data = newtli.to_string(); + fs::write(conf.branch_path(&branchname), data)?; + + // Copy the latest snapshot (TODO: before the startpoint) and all WAL + // TODO: be smarter and avoid the copying... + let (_maxsnapshot, oldsnapshotdir) = find_latest_snapshot(conf, startpoint.timelineid)?; + let copy_opts = fs_extra::dir::CopyOptions::new(); + fs_extra::dir::copy(oldsnapshotdir, newtimelinedir.join("snapshots"), ©_opts)?; + + let oldtimelinedir = conf.timeline_path(startpoint.timelineid); + copy_wal( + &oldtimelinedir.join("wal"), + &newtimelinedir.join("wal"), + startpoint.lsn, + 16 * 1024 * 1024 // FIXME: assume default WAL segment size + )?; + + Ok(BranchInfo { + name: branchname.to_string(), + timeline_id: newtli, + latest_valid_lsn: Some(startpoint.lsn), + }) +} + +// +// Parse user-given string that represents a point-in-time. +// +// We support multiple variants: +// +// Raw timeline id in hex, meaning the end of that timeline: +// bc62e7d612d0e6fe8f99a6dd2f281f9d +// +// A specific LSN on a timeline: +// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8 +// +// Same, with a human-friendly branch name: +// main +// main@2/15D3DD8 +// +// Human-friendly tag name: +// mytag +// +// +fn parse_point_in_time(conf: &PageServerConf, s: &str) -> Result { + let mut strings = s.split('@'); + let name = strings.next().unwrap(); + + let lsn: Option; + if let Some(lsnstr) = strings.next() { + lsn = Some(Lsn::from_str(lsnstr) + .with_context(|| "invalid LSN in point-in-time specification")?); + } else { + lsn = None + } + + // Check if it's a tag + if lsn.is_none() { + let tagpath = conf.tag_path(name); + if tagpath.exists() { + let pointstr = fs::read_to_string(tagpath)?; + + return parse_point_in_time(conf, &pointstr); + } + } + + // Check if it's a branch + // Check if it's branch @ LSN + let branchpath = conf.branch_path(name); + if branchpath.exists() { + let pointstr = fs::read_to_string(branchpath)?; + + let mut result = parse_point_in_time(conf, &pointstr)?; + + result.lsn = lsn.unwrap_or(Lsn(0)); + return Ok(result); + } + + // Check if it's a timelineid + // Check if it's timelineid @ LSN + if let Ok(timelineid) = ZTimelineId::from_str(name) { + let tlipath = conf.timeline_path(timelineid); + if tlipath.exists() { + return Ok(PointInTime { + timelineid, + lsn: lsn.unwrap_or(Lsn(0)), + }); + } + } + + panic!("could not parse point-in-time {}", s); +} + +// If control file says the cluster was shut down cleanly, modify it, to mark +// it as crashed. That forces crash recovery when you start the cluster. +// +// FIXME: +// We currently do this to the initial snapshot in "zenith init". It would +// be more natural to do this when the snapshot is restored instead, but we +// currently don't have any code to create new snapshots, so it doesn't matter +// Or better yet, use a less hacky way of putting the cluster into recovery. +// Perhaps create a backup label file in the data directory when it's restored. +fn force_crash_recovery(datadir: &Path) -> Result<()> { + // Read in the control file + let controlfilepath = datadir.to_path_buf().join("global").join("pg_control"); + let mut controlfile = + postgres_ffi::decode_pg_control(Bytes::from(fs::read(controlfilepath.as_path())?))?; + + controlfile.state = postgres_ffi::DBState_DB_IN_PRODUCTION; + + fs::write( + controlfilepath.as_path(), + postgres_ffi::encode_pg_control(controlfile), + )?; + + Ok(()) +} + +fn create_timeline(conf: &PageServerConf, ancestor: Option) -> Result { + // Create initial timeline + let mut tli_buf = [0u8; 16]; + rand::thread_rng().fill(&mut tli_buf); + let timelineid = ZTimelineId::from(tli_buf); + + let timelinedir = conf.timeline_path(timelineid); + + fs::create_dir(&timelinedir)?; + fs::create_dir(&timelinedir.join("snapshots"))?; + fs::create_dir(&timelinedir.join("wal"))?; + + if let Some(ancestor) = ancestor { + let data = format!("{}@{}", ancestor.timelineid, ancestor.lsn); + fs::write(timelinedir.join("ancestor"), data)?; + } + + Ok(timelineid) +} + +/// +/// Copy all WAL segments from one directory to another, up to given LSN. +/// +/// If the given LSN is in the middle of a segment, the last segment containing it +/// is written out as .partial, and padded with zeros. +/// +fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{ + + let last_segno = upto.segment_number(wal_seg_size); + let last_segoff = upto.segment_offset(wal_seg_size); + + for entry in fs::read_dir(src_dir).unwrap() { + if let Ok(entry) = entry { + let entry_name = entry.file_name(); + let fname = entry_name.to_str().unwrap(); + + // Check if the filename looks like an xlog file, or a .partial file. + if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) { + continue + } + let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize); + + let copylen; + let mut dst_fname = PathBuf::from(fname); + if segno > last_segno { + // future segment, skip + continue; + } else if segno < last_segno { + copylen = wal_seg_size; + dst_fname.set_extension(""); + } else { + copylen = last_segoff; + dst_fname.set_extension("partial"); + } + + let src_file = File::open(entry.path())?; + let mut dst_file = File::create(dst_dir.join(&dst_fname))?; + std::io::copy(&mut src_file.take(copylen), &mut dst_file)?; + + if copylen < wal_seg_size { + std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?; + } + } + } + Ok(()) +} + +// Find the end of valid WAL in a wal directory +pub fn find_end_of_wal(conf: &PageServerConf, timeline: ZTimelineId) -> Result { + let waldir = conf.timeline_path(timeline).join("wal"); + let (lsn, _tli) = xlog_utils::find_end_of_wal(&waldir, 16 * 1024 * 1024, true); + Ok(Lsn(lsn)) +} + +// Find the latest snapshot for a timeline +fn find_latest_snapshot(conf: &PageServerConf, timeline: ZTimelineId) -> Result<(Lsn, PathBuf)> { + let snapshotsdir = conf.snapshots_path(timeline); + let paths = fs::read_dir(&snapshotsdir)?; + let mut maxsnapshot = Lsn(0); + let mut snapshotdir: Option = None; + for path in paths { + let path = path?; + let filename = path.file_name().to_str().unwrap().to_owned(); + if let Ok(lsn) = Lsn::from_hex(&filename) { + maxsnapshot = std::cmp::max(lsn, maxsnapshot); + snapshotdir = Some(path.path()); + } + } + if maxsnapshot == Lsn(0) { + // TODO: check ancestor timeline + anyhow::bail!("no snapshot found in {}", snapshotsdir.display()); + } + + Ok((maxsnapshot, snapshotdir.unwrap())) +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 9cdf169b28..11cfd91b29 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -26,8 +26,47 @@ pub struct PageServerConf { pub listen_addr: SocketAddr, pub gc_horizon: u64, pub gc_period: Duration, + pub workdir: PathBuf, + + pub pg_distrib_dir: PathBuf, } +impl PageServerConf { + + // + // Repository paths, relative to workdir. + // + + fn tag_path(&self, name: &str) -> PathBuf { + std::path::Path::new("refs").join("tags").join(name) + } + + fn branch_path(&self, name: &str) -> PathBuf { + std::path::Path::new("refs").join("branches").join(name) + } + + fn timeline_path(&self, timelineid: ZTimelineId) -> PathBuf { + std::path::Path::new("timelines").join(timelineid.to_string()) + } + + fn snapshots_path(&self, timelineid: ZTimelineId) -> PathBuf { + std::path::Path::new("timelines").join(timelineid.to_string()).join("snapshots") + } + + // + // Postgres distribution paths + // + + pub fn pg_bin_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("bin") + } + + pub fn pg_lib_dir(&self) -> PathBuf { + self.pg_distrib_dir.join("lib") + } +} + + /// Zenith Timeline ID is a 128-bit random ID. /// /// Zenith timeline IDs are different from PostgreSQL timeline @@ -89,10 +128,3 @@ impl fmt::Display for ZTimelineId { } } -pub fn zenith_repo_dir() -> PathBuf { - // Find repository path - match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => PathBuf::from(val.to_str().unwrap()), - None => ".zenith".into(), - } -} diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 2a238b5bc0..6cbc5e1181 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -8,7 +8,6 @@ use crate::repository::Repository; use crate::walredo::PostgresRedoManager; use crate::PageServerConf; use lazy_static::lazy_static; -use std::path::Path; use std::sync::{Arc, Mutex}; lazy_static! { @@ -22,7 +21,7 @@ pub fn init(conf: &PageServerConf) { let walredo_mgr = PostgresRedoManager::new(conf); // we have already changed current dir to the repository. - let repo = RocksRepository::new(conf, Path::new("."), Arc::new(walredo_mgr)); + let repo = RocksRepository::new(conf, Arc::new(walredo_mgr)); *m = Some(Arc::new(repo)); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c1e159d3bb..6ded2846a6 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -30,6 +30,7 @@ use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; use crate::ZTimelineId; +use crate::branches; #[derive(Debug)] enum FeMessage { @@ -690,6 +691,28 @@ impl Connection { self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message(&BeMessage::ReadyForQuery)?; + + } else if query_string.starts_with(b"branch_create ") { + let query_str = String::from_utf8(query_string.to_vec())?; + let err = || anyhow!("invalid branch_create: '{}'", query_str); + + // branch_create + // TODO lazy static + let re = Regex::new(r"^branch_create (\w+) ([\w@\\]+)[\r\n\s]*;?$").unwrap(); + let caps = re + .captures(&query_str) + .ok_or_else(err)?; + + let branchname: String = String::from(caps.get(1).ok_or_else(err)?.as_str()); + let startpoint_str: String = String::from(caps.get(2).ok_or_else(err)?.as_str()); + + let branch = branches::create_branch(&self.conf, &branchname, &startpoint_str)?; + let branch = serde_json::to_vec(&branch)?; + + self.write_message_noflush(&BeMessage::RowDescription)?; + self.write_message_noflush(&BeMessage::DataRow(Bytes::from(branch)))?; + self.write_message_noflush(&BeMessage::CommandComplete)?; + self.write_message(&BeMessage::ReadyForQuery)?; } else if query_string.starts_with(b"pg_list") { let branches = crate::branches::get_branches(&*page_cache::get_repository())?; let branches_buf = serde_json::to_vec(&branches)?; @@ -708,6 +731,15 @@ impl Connection { // on connect self.write_message_noflush(&BeMessage::CommandComplete)?; self.write_message(&BeMessage::ReadyForQuery)?; + } else if query_string.to_ascii_lowercase().starts_with(b"identify_system") { + // TODO: match postgres response formarmat for 'identify_system' + let system_id = crate::branches::get_system_id(&self.conf)? + .to_string(); + + self.write_message_noflush(&BeMessage::RowDescription)?; + self.write_message_noflush(&BeMessage::DataRow(Bytes::from(system_id)))?; + self.write_message_noflush(&BeMessage::CommandComplete)?; + self.write_message(&BeMessage::ReadyForQuery)?; } else { self.write_message_noflush(&BeMessage::RowDescription)?; self.write_message_noflush(&HELLO_WORLD_ROW)?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 4b78651a6c..aca2b7dd8f 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -291,6 +291,7 @@ mod tests { use std::path::Path; use std::str::FromStr; use std::time::Duration; + use std::env; fn get_test_conf() -> PageServerConf { PageServerConf { @@ -299,6 +300,8 @@ mod tests { gc_horizon: 64 * 1024 * 1024, gc_period: Duration::from_secs(10), listen_addr: "127.0.0.1:5430".parse().unwrap(), + workdir: "".into(), + pg_distrib_dir: "".into(), } } @@ -345,7 +348,9 @@ mod tests { let repo_dir = Path::new("../tmp_check/test_relsize_repo"); let _ = fs::remove_dir_all(repo_dir); fs::create_dir_all(repo_dir)?; - let repo = rocksdb::RocksRepository::new(&get_test_conf(), repo_dir, Arc::new(walredo_mgr)); + env::set_current_dir(repo_dir)?; + + let repo = rocksdb::RocksRepository::new(&get_test_conf(), Arc::new(walredo_mgr)); // get_timeline() with non-existent timeline id should fail //repo.get_timeline("11223344556677881122334455667788"); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs index e638c6a771..8bc59ca3e0 100644 --- a/pageserver/src/repository/rocksdb.rs +++ b/pageserver/src/repository/rocksdb.rs @@ -11,6 +11,8 @@ use crate::waldecoder::{Oid, TransactionId}; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::ZTimelineId; +// use crate::PageServerConf; +// use crate::branches; use anyhow::{bail, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use log::*; @@ -18,7 +20,6 @@ use postgres_ffi::pg_constants; use std::cmp::min; use std::collections::HashMap; use std::convert::TryInto; -use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; @@ -31,7 +32,6 @@ use zenith_utils::seqwait::SeqWait; static TIMEOUT: Duration = Duration::from_secs(60); pub struct RocksRepository { - repo_dir: PathBuf, conf: PageServerConf, timelines: Mutex>>, @@ -158,11 +158,9 @@ impl CacheEntryContent { impl RocksRepository { pub fn new( conf: &PageServerConf, - repo_dir: &Path, walredo_mgr: Arc, ) -> RocksRepository { RocksRepository { - repo_dir: PathBuf::from(repo_dir), conf: conf.clone(), timelines: Mutex::new(HashMap::new()), walredo_mgr, @@ -188,7 +186,7 @@ impl Repository for RocksRepository { Some(timeline) => Ok(timeline.clone()), None => { let timeline = - RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone()); + RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone()); restore_timeline(&self.conf, &timeline, timelineid)?; @@ -216,7 +214,7 @@ impl Repository for RocksRepository { fn create_empty_timeline(&self, timelineid: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); - let timeline = RocksTimeline::new(&self.repo_dir, timelineid, self.walredo_mgr.clone()); + let timeline = RocksTimeline::new(&self.conf, timelineid, self.walredo_mgr.clone()); let timeline_rc = Arc::new(timeline); let r = timelines.insert(timelineid, timeline_rc.clone()); @@ -229,8 +227,8 @@ impl Repository for RocksRepository { } impl RocksTimeline { - fn open_rocksdb(repo_dir: &Path, timelineid: ZTimelineId) -> rocksdb::DB { - let path = repo_dir.join("timelines").join(timelineid.to_string()); + fn open_rocksdb(conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { + let path = conf.timeline_path(timelineid); let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); opts.set_use_fsync(true); @@ -246,12 +244,12 @@ impl RocksTimeline { } fn new( - repo_dir: &Path, + conf: &PageServerConf, timelineid: ZTimelineId, walredo_mgr: Arc, ) -> RocksTimeline { RocksTimeline { - db: RocksTimeline::open_rocksdb(repo_dir, timelineid), + db: RocksTimeline::open_rocksdb(conf, timelineid), walredo_mgr, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 02167e5123..dc4f012b5b 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -457,7 +457,9 @@ impl PostgresRedoProcess { // Start postgres binary in special WAL redo mode. // // Tests who run pageserver binary are setting proper PG_BIN_DIR - // and PG_LIB_DIR so that WalRedo would start right postgres. We may later + // and PG_LIB_DIR so that WalRedo would start right postgres. + + // do that: We may later // switch to setting same things in pageserver config file. async fn launch(datadir: &str) -> Result { // Create empty data directory for wal-redo postgres deleting old one. diff --git a/test_runner/README.md b/test_runner/README.md index 21d3ae6711..98535ada58 100644 --- a/test_runner/README.md +++ b/test_runner/README.md @@ -41,7 +41,7 @@ If you want to run all tests that have the string "bench" in their names: Useful environment variables: `ZENITH_BIN`: The directory where zenith binaries can be found. -`POSTGRES_BIN`: The directory where postgres binaries can be found. +`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found. `TEST_OUTPUT`: Set the directory where test state and test output files should go. `TEST_SHARED_FIXTURES`: Try to re-use a single postgres and pageserver diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 42c2d02040..00da2e19a0 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -16,7 +16,7 @@ A fixture is created with the decorator @zenfixture, which is a wrapper around the standard pytest.fixture with some extra behavior. There are several environment variables that can control the running of tests: -ZENITH_BIN, POSTGRES_BIN, etc. See README.md for more information. +ZENITH_BIN, POSTGRES_DISTRIB_DIR, etc. See README.md for more information. To use fixtures in a test file, add this line of code: @@ -78,7 +78,7 @@ class ZenithCli: self.bin_zenith = os.path.join(binpath, 'zenith') self.env = os.environ.copy() self.env['ZENITH_REPO_DIR'] = repo_dir - self.env['POSTGRES_BIN'] = pg_distrib_dir + self.env['POSTGRES_DISTRIB_DIR'] = pg_distrib_dir def run(self, arguments): """ Run "zenith" with the specified arguments. @@ -108,11 +108,11 @@ class ZenithPageserver: self.running = False def start(self): - self.zenith_cli.run(['pageserver', 'start']) + self.zenith_cli.run(['start']) self.running = True def stop(self): - self.zenith_cli.run(['pageserver', 'stop']) + self.zenith_cli.run(['stop']) self.running = True @@ -316,7 +316,7 @@ def zenith_binpath(base_dir): @zenfixture def pg_distrib_dir(base_dir): """ find the postgress install """ - env_postgres_bin = os.environ.get('POSTGRES_BIN') + env_postgres_bin = os.environ.get('POSTGRES_DISTRIB_DIR') if env_postgres_bin: pg_dir = env_postgres_bin else: diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 0770deafbb..ab473b569e 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -3,7 +3,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::time::Duration; -mod pq_protocol; +pub mod pq_protocol; pub mod s3_offload; pub mod wal_service; diff --git a/zenith/Cargo.toml b/zenith/Cargo.toml index 78ab673fe2..eb48af882d 100644 --- a/zenith/Cargo.toml +++ b/zenith/Cargo.toml @@ -10,6 +10,18 @@ edition = "2018" clap = "2.33.0" anyhow = "1.0" serde_json = "1" +# rand = "0.8.3" +# tar = "0.4.33" +# serde = { version = "1.0", features = ["derive"] } +# toml = "0.5" +# lazy_static = "1.4" +# regex = "1" +# # hex = "0.4.3" +# bytes = "1.0.1" +# # fs_extra = "1.2.0" +# nix = "0.20" +# # thiserror = "1" +# url = "2.2.2" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 42c7ebc830..10bb192359 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -1,55 +1,51 @@ -use std::path::{Path, PathBuf}; +use std::collections::HashMap; use std::process::exit; -use std::str::FromStr; -use std::{collections::HashMap, fs}; - -use anyhow::{Result, Context}; -use anyhow::{anyhow, bail}; +use anyhow::{Context, anyhow}; +use anyhow::Result; use clap::{App, Arg, ArgMatches, SubCommand}; -use control_plane::local_env::LocalEnv; +use control_plane::local_env; +use control_plane::compute::ComputeControlPlane; use control_plane::storage::PageServerNode; -use control_plane::{compute::ComputeControlPlane, local_env, storage}; - -use pageserver::{branches::BranchInfo, ZTimelineId}; - +use pageserver::{ZTimelineId, branches::BranchInfo}; use zenith_utils::lsn::Lsn; -fn zenith_repo_dir() -> PathBuf { - // Find repository path - match std::env::var_os("ZENITH_REPO_DIR") { - Some(val) => PathBuf::from(val.to_str().unwrap()), - None => ".zenith".into(), - } -} - // Main entry point for the 'zenith' CLI utility // -// This utility can used to work with a local zenith repository. -// In order to run queries in it, you need to launch the page server, -// and a compute node against the page server +// This utility helps to manage zenith installation. That includes following: +// * Management of local postgres installations running on top of the +// pageserver. +// * Providing CLI api to the pageserver (local or remote) +// * TODO: export/import to/from usual postgres fn main() -> Result<()> { let name_arg = Arg::with_name("NAME") .short("n") .index(1) .help("name of this postgres instance") .required(true); + let matches = App::new("zenith") .about("Zenith CLI") - .subcommand(SubCommand::with_name("init").about("Initialize a new Zenith repository")) + .subcommand( + SubCommand::with_name("init") + .about("Initialize a new Zenith repository") + .arg( + Arg::with_name("remote-pageserver") + .long("remote-pageserver") + .required(false) + .value_name("pageserver-url") + ), + ) .subcommand( SubCommand::with_name("branch") .about("Create a new branch") .arg(Arg::with_name("branchname").required(false).index(1)) .arg(Arg::with_name("start-point").required(false).index(2)), ) - .subcommand( - SubCommand::with_name("pageserver") - .about("Manage pageserver instance") - .subcommand(SubCommand::with_name("status")) - .subcommand(SubCommand::with_name("start")) - .subcommand(SubCommand::with_name("stop")), - ) + .subcommand(SubCommand::with_name("status")) + .subcommand(SubCommand::with_name("start")) + .subcommand(SubCommand::with_name("stop")) + .subcommand(SubCommand::with_name("restart")) .subcommand( SubCommand::with_name("pg") .about("Manage postgres instances") @@ -67,52 +63,74 @@ fn main() -> Result<()> { ) .get_matches(); - // handle init separately and exit + // Create config file if let ("init", Some(sub_args)) = matches.subcommand() { - run_init_cmd(sub_args.clone())?; - exit(0); + let pageserver_uri = sub_args.value_of("pageserver-url"); + local_env::init(pageserver_uri) + .with_context(|| "Failed to create cofig file")?; } // all other commands would need config - - let repopath = zenith_repo_dir(); - if !repopath.exists() { - bail!( - "Zenith repository does not exist in {}.\n\ - Set ZENITH_REPO_DIR or initialize a new repository with 'zenith init'", - repopath.display() - ); - } - // TODO: check that it looks like a zenith repository - let env = match local_env::load_config(&repopath) { + let env = match local_env::load_config() { Ok(conf) => conf, Err(e) => { - eprintln!("Error loading config from {}: {}", repopath.display(), e); + eprintln!("Error loading config: {}", e); exit(1); } }; match matches.subcommand() { ("init", Some(_)) => { - panic!() /* Should not happen. Init was handled before */ + let pageserver = PageServerNode::from_env(&env); + pageserver.init()?; } - ("branch", Some(sub_args)) => run_branch_cmd(&env, sub_args.clone())?, - ("pageserver", Some(sub_args)) => run_pageserver_cmd(&env, sub_args.clone())?, + ("branch", Some(sub_args)) => { + let pageserver = PageServerNode::from_env(&env); + + if let Some(branchname) = sub_args.value_of("branchname") { + if let Some(startpoint_str) = sub_args.value_of("start-point") { + let branch = pageserver.branch_create(branchname, startpoint_str)?; + println!("Created branch '{}' at {:?}", branch.name, branch.latest_valid_lsn.unwrap_or(Lsn(0))); + } else { + panic!("Missing start-point"); + } + } else { + // No arguments, list branches + for branch in pageserver.branches_list()? { + println!(" {}", branch.name); + } + } + } ("start", Some(_sub_m)) => { - let pageserver = storage::PageServerNode::from_env(&env); + let pageserver = PageServerNode::from_env(&env); if let Err(e) = pageserver.start() { - eprintln!("pageserver start: {}", e); + eprintln!("pageserver start failed: {}", e); exit(1); } } ("stop", Some(_sub_m)) => { - let pageserver = storage::PageServerNode::from_env(&env); + let pageserver = PageServerNode::from_env(&env); + if let Err(e) = pageserver.stop() { - eprintln!("pageserver stop: {}", e); + eprintln!("pageserver stop failed: {}", e); + exit(1); + } + } + + ("restart", Some(_sub_m)) => { + let pageserver = PageServerNode::from_env(&env); + + if let Err(e) = pageserver.stop() { + eprintln!("pageserver stop failed: {}", e); + exit(1); + } + + if let Err(e) = pageserver.start() { + eprintln!("pageserver start failed: {}", e); exit(1); } } @@ -131,38 +149,10 @@ fn main() -> Result<()> { Ok(()) } -fn run_pageserver_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> { - match args.subcommand() { - ("status", Some(_sub_m)) => { - todo!(); - } - ("start", Some(_sub_m)) => { - let psnode = PageServerNode::from_env(local_env); - psnode.start()?; - println!("Page server started"); - } - ("stop", Some(_sub_m)) => { - let psnode = PageServerNode::from_env(local_env); - psnode.stop()?; - println!("Page server stopped"); - } - _ => unreachable!(), - }; - - Ok(()) -} - -// Peek into the repository, to grab the timeline ID of given branch -pub fn get_branch_timeline(repopath: &Path, branchname: &str) -> ZTimelineId { - let branchpath = repopath.join("refs/branches/".to_owned() + branchname); - - ZTimelineId::from_str(&(fs::read_to_string(&branchpath).unwrap())).unwrap() -} - /// Returns a map of timeline IDs to branch_name@lsn strings. /// Connects to the pageserver to query this information. -fn get_branch_infos(env: &LocalEnv) -> Result> { - let page_server = storage::PageServerNode::from_env(env); +fn get_branch_infos(env: &local_env::LocalEnv) -> Result> { + let page_server = PageServerNode::from_env(env); let mut client = page_server.page_server_psql_client()?; let branches_msgs = client.simple_query("pg_list")?; @@ -179,11 +169,10 @@ fn get_branch_infos(env: &LocalEnv) -> Result> { let branch_infos: Result> = branch_infos .into_iter() .map(|branch_info| { - let timeline_id = ZTimelineId::from_str(&branch_info.timeline_id)?; let lsn_string_opt = branch_info.latest_valid_lsn.map(|lsn| lsn.to_string()); let lsn_str = lsn_string_opt.as_deref().unwrap_or("?"); let branch_lsn_string = format!("{}@{}", branch_info.name, lsn_str); - Ok((timeline_id, branch_lsn_string)) + Ok((branch_info.timeline_id, branch_lsn_string)) }) .collect(); @@ -193,19 +182,11 @@ fn get_branch_infos(env: &LocalEnv) -> Result> { fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let mut cplane = ComputeControlPlane::load(env.clone())?; - // FIXME: cheat and resolve the timeline by peeking into the - // repository. In reality, when you're launching a compute node - // against a possibly-remote page server, we wouldn't know what - // branches exist in the remote repository. Or would we require - // that you "zenith fetch" them into a local repoitory first? match pg_match.subcommand() { ("create", Some(sub_m)) => { let timeline_arg = sub_m.value_of("timeline").unwrap_or("main"); - let timeline = get_branch_timeline(&env.repo_path, timeline_arg); - - println!("Initializing Postgres on timeline {}...", timeline); - - cplane.new_node(timeline)?; + println!("Initializing Postgres on timeline {}...", timeline_arg); + cplane.new_node(timeline_arg)?; } ("list", Some(_sub_m)) => { let branch_infos = get_branch_infos(env).unwrap_or_else(|e| { @@ -249,121 +230,3 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { Ok(()) } - -// "zenith init" - Initialize a new Zenith repository in current dir -fn run_init_cmd(_args: ArgMatches) -> Result<()> { - local_env::init()?; - Ok(()) -} - -// handle "zenith branch" subcommand -fn run_branch_cmd(local_env: &LocalEnv, args: ArgMatches) -> Result<()> { - let repopath = local_env.repo_path.to_str().unwrap(); - - if let Some(branchname) = args.value_of("branchname") { - if PathBuf::from(format!("{}/refs/branches/{}", repopath, branchname)).exists() { - anyhow::bail!("branch {} already exists", branchname); - } - - if let Some(startpoint_str) = args.value_of("start-point") { - let mut startpoint = parse_point_in_time(startpoint_str)?; - - if startpoint.lsn == Lsn(0) { - // Find end of WAL on the old timeline - let end_of_wal = local_env::find_end_of_wal(local_env, startpoint.timelineid)?; - - println!("branching at end of WAL: {}", end_of_wal); - - startpoint.lsn = end_of_wal; - } - - return local_env::create_branch(local_env, branchname, startpoint); - } else { - panic!("Missing start-point"); - } - } else { - // No arguments, list branches - list_branches()?; - } - Ok(()) -} - -fn list_branches() -> Result<()> { - // list branches - let paths = fs::read_dir(zenith_repo_dir().join("refs").join("branches"))?; - - for path in paths { - println!(" {}", path?.file_name().to_str().unwrap()); - } - - Ok(()) -} - -// -// Parse user-given string that represents a point-in-time. -// -// We support multiple variants: -// -// Raw timeline id in hex, meaning the end of that timeline: -// bc62e7d612d0e6fe8f99a6dd2f281f9d -// -// A specific LSN on a timeline: -// bc62e7d612d0e6fe8f99a6dd2f281f9d@2/15D3DD8 -// -// Same, with a human-friendly branch name: -// main -// main@2/15D3DD8 -// -// Human-friendly tag name: -// mytag -// -// -fn parse_point_in_time(s: &str) -> Result { - let mut strings = s.split('@'); - let name = strings.next().unwrap(); - - let lsn: Option; - if let Some(lsnstr) = strings.next() { - lsn = Some( - Lsn::from_str(lsnstr) - .with_context(|| "invalid LSN in point-in-time specification")? - ); - } else { - lsn = None - } - - // Check if it's a tag - if lsn.is_none() { - let tagpath = zenith_repo_dir().join("refs").join("tags").join(name); - if tagpath.exists() { - let pointstr = fs::read_to_string(tagpath)?; - - return parse_point_in_time(&pointstr); - } - } - // Check if it's a branch - // Check if it's branch @ LSN - let branchpath = zenith_repo_dir().join("refs").join("branches").join(name); - if branchpath.exists() { - let pointstr = fs::read_to_string(branchpath)?; - - let mut result = parse_point_in_time(&pointstr)?; - - result.lsn = lsn.unwrap_or(Lsn(0)); - return Ok(result); - } - - // Check if it's a timelineid - // Check if it's timelineid @ LSN - let tlipath = zenith_repo_dir().join("timelines").join(name); - if tlipath.exists() { - let result = local_env::PointInTime { - timelineid: ZTimelineId::from_str(name)?, - lsn: lsn.unwrap_or(Lsn(0)), - }; - - return Ok(result); - } - - panic!("could not parse point-in-time {}", s); -}