diff --git a/.gitignore b/.gitignore index cfa64b8f7c..20c5981274 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ /target -/tmp_install +/tmp_check diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index d8425e0abd..204573d00f 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -3,7 +3,7 @@ // use log::*; -use std::fs::File; +use std::{fs::File, str::FromStr}; use std::io; use std::path::PathBuf; use std::thread; @@ -35,7 +35,7 @@ fn main() -> Result<(), io::Error> { .short("w") .long("wal-producer") .takes_value(true) - .help("connect to the WAL sender (postgres or wal_acceptor) on ip:port (default: 127.0.0.1:65432)")) + .help("connect to the WAL sender (postgres or wal_acceptor) on connstr (default: 'host=127.0.0.1 port=65432 user=zenith')")) .arg(Arg::with_name("listen") .short("l") .long("listen") @@ -61,7 +61,7 @@ fn main() -> Result<(), io::Error> { data_dir: PathBuf::from("./"), daemonize: false, interactive: false, - wal_producer_addr: "127.0.0.1:65432".parse().unwrap(), + wal_producer_connstr: String::from_str("host=127.0.0.1 port=65432 user=zenith").unwrap(), listen_addr: "127.0.0.1:5430".parse().unwrap(), skip_recovery: false, }; @@ -90,7 +90,7 @@ fn main() -> Result<(), io::Error> { } if let Some(addr) = arg_matches.value_of("wal_producer") { - conf.wal_producer_addr = addr.parse().unwrap(); + conf.wal_producer_connstr = String::from_str(addr).unwrap(); } if let Some(addr) = arg_matches.value_of("listen") { diff --git a/src/control_plane.rs b/src/control_plane.rs index 432a61eecd..e85e854e11 100644 --- a/src/control_plane.rs +++ b/src/control_plane.rs @@ -17,6 +17,21 @@ use std::{ }; use postgres::{Client, NoTls}; +use lazy_static::lazy_static; + +lazy_static! { + // postgres would be there if it was build by 'make postgres' here in the repo + pub static ref PG_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("../tmp_install/bin"); + + pub static ref CARGO_BIN_DIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("./target/debug/"); + + pub static ref TEST_WORKDIR : PathBuf = Path::new(env!("CARGO_MANIFEST_DIR")) + .join("./tmp_check"); + + // XXX: drop dots +} // // I'm intendedly modelling storage and compute control planes as a separate entities @@ -29,36 +44,18 @@ pub struct StorageControlPlane { impl StorageControlPlane { // postgres <-> page_server - pub fn one_page_server(pg_addr: SocketAddr) -> StorageControlPlane { + pub fn one_page_server(pg_connstr: String) -> StorageControlPlane { let mut cplane = StorageControlPlane { wal_acceptors: Vec::new(), page_servers: Vec::new(), }; - let workdir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/pageserver1"); - let pg_install_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/bin"); - - fs::create_dir(workdir.clone()).unwrap(); - - // initialize data directory - // TODO: make wal-redo-postgres workable without data directory? - // XXX: common initdb method? - // XXX: shared paths - let initdb_path = pg_install_dir.join("initdb"); - let initdb = Command::new(initdb_path) - .args(&["-D", workdir.join("wal_redo_pgdata").to_str().unwrap()]) - .env_clear() - .status() - .expect("failed to execute initdb"); - if !initdb.success() { - panic!("initdb failed"); - } - let pserver = PageServerNode { page_service_addr: "127.0.0.1:65200".parse().unwrap(), - wal_producer_addr: pg_addr, - data_dir: workdir + wal_producer_connstr: pg_connstr, + data_dir: TEST_WORKDIR.join("pageserver") }; + pserver.init(); pserver.start(); cplane.page_servers.push(pserver); @@ -78,33 +75,39 @@ impl StorageControlPlane { pub struct PageServerNode { page_service_addr: SocketAddr, - wal_producer_addr: SocketAddr, + wal_producer_connstr: String, data_dir: PathBuf, } impl PageServerNode { // TODO: method to force redo on a specific relation - fn binary_path(&self) -> PathBuf { - Path::new(env!("CARGO_MANIFEST_DIR")).join("./target/debug/pageserver") - } + // TODO: make wal-redo-postgres workable without data directory? + pub fn init(&self) { + fs::create_dir(self.data_dir.clone()).unwrap(); - fn pg_install_path(&self) -> PathBuf { - Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/bin") + let initdb = Command::new(PG_BIN_DIR.join("initdb")) + .args(&["-D", self.data_dir.join("wal_redo_pgdata").to_str().unwrap()]) + .env_clear() + .status() + .expect("failed to execute initdb"); + if !initdb.success() { + panic!("initdb failed"); + } } pub fn start(&self) { - let wal_redo_pgdata = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/pageserver1/wal_redo_pgdata"); + println!("Starting pageserver at '{}', wal_producer='{}'", self.page_service_addr, self.wal_producer_connstr); - let status = Command::new(self.binary_path()) + let status = Command::new(CARGO_BIN_DIR.join("pageserver")) .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-w", self.wal_producer_addr.to_string().as_str()]) + .args(&["-w", self.wal_producer_connstr.as_str()]) .args(&["-l", self.page_service_addr.to_string().as_str()]) .arg("-d") .arg("--skip-recovery") .env_clear() - .env("PATH", self.pg_install_path()) // path to postres-wal-redo binary - .env("PGDATA", wal_redo_pgdata) // postres-wal-redo pgdata + .env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary + .env("PGDATA", self.data_dir.join("wal_redo_pgdata")) // postres-wal-redo pgdata .status() .expect("failed to execute initdb"); @@ -114,8 +117,8 @@ impl PageServerNode { } pub fn stop(&self) { - let pifile = self.data_dir.join("pageserver.pid"); - let pid = fs::read_to_string(pifile).unwrap(); + let pidfile = self.data_dir.join("pageserver.pid"); + let pid = fs::read_to_string(pidfile).unwrap(); let status = Command::new("kill") .arg(pid) .env_clear() @@ -148,7 +151,7 @@ impl WalAcceptorNode {} // ComputeControlPlane // pub struct ComputeControlPlane { - pg_install_dir: PathBuf, + pg_bin_dir: PathBuf, work_dir: PathBuf, last_assigned_port: u16, nodes: Vec, @@ -156,14 +159,9 @@ pub struct ComputeControlPlane { impl ComputeControlPlane { pub fn local() -> ComputeControlPlane { - // postgres configure and `make temp-install` are using this path - let pg_install_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/"); - - let work_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/"); - ComputeControlPlane { - pg_install_dir: pg_install_dir, - work_dir: work_dir, + pg_bin_dir: PG_BIN_DIR.to_path_buf(), + work_dir: TEST_WORKDIR.to_path_buf(), last_assigned_port: 65431, nodes: Vec::new(), } @@ -184,13 +182,13 @@ impl ComputeControlPlane { port: self.get_port(), ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), - pg_install_dir: self.pg_install_dir.clone(), + pg_bin_dir: self.pg_bin_dir.clone(), }; self.nodes.push(node); let node = self.nodes.last().unwrap(); // initialize data directory - let initdb_path = self.pg_install_dir.join("bin/initdb"); + let initdb_path = self.pg_bin_dir.join("initdb"); println!("initdb_path: {}", initdb_path.to_str().unwrap()); let initdb = Command::new(initdb_path) .args(&["-D", node.pgdata.to_str().unwrap()]) @@ -232,7 +230,7 @@ pub struct PostgresNode { port: u16, ip: IpAddr, pgdata: PathBuf, - pg_install_dir: PathBuf, + pg_bin_dir: PathBuf, } impl PostgresNode { @@ -246,7 +244,7 @@ impl PostgresNode { } fn pg_ctl(&self, action: &str, check_ok: bool) { - let pg_ctl_path = self.pg_install_dir.join("bin/pg_ctl"); + let pg_ctl_path = self.pg_bin_dir.join("pg_ctl"); let pg_ctl = Command::new(pg_ctl_path) .args(&[ "-D", @@ -265,6 +263,7 @@ impl PostgresNode { } pub fn start(&self) { + println!("Started postgres node at '{}'", self.connstr()); self.pg_ctl("start", true); } @@ -276,8 +275,8 @@ impl PostgresNode { self.pg_ctl("stop", true); } - pub fn addr(&self) -> SocketAddr { - SocketAddr::new(self.ip, self.port) + pub fn connstr(&self) -> String { + format!("user={} host={} port={}", self.whoami(), self.ip, self.port) } // XXX: cache that in control plane @@ -290,7 +289,7 @@ impl PostgresNode { panic!("whoami failed"); } - String::from_utf8(output.stdout).unwrap() + String::from_utf8(output.stdout).unwrap().trim().to_string() } pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { diff --git a/src/lib.rs b/src/lib.rs index a45dd484a4..91254bc82e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,7 +20,7 @@ pub struct PageServerConf { pub data_dir: PathBuf, pub daemonize: bool, pub interactive: bool, - pub wal_producer_addr: SocketAddr, + pub wal_producer_connstr: String, pub listen_addr: SocketAddr, pub skip_recovery: bool, } diff --git a/src/walreceiver.rs b/src/walreceiver.rs index dbef0297e2..5f4cd5d93f 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -45,10 +45,9 @@ pub fn thread_main(conf: PageServerConf) { async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // Connect to the database in replication mode. - let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_addr.ip(), conf.wal_producer_addr.port()); - debug!("connecting to {}...", conn_str); + debug!("connecting to {}...", conf.wal_producer_connstr); let (mut rclient, connection) = connect_replication( - conn_str.as_str(), + conf.wal_producer_connstr.as_str(), NoTls, ReplicationMode::Physical ).await?; @@ -67,12 +66,14 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // // Start streaming the WAL, from where we left off previously. // - let last_valid_lsn = page_cache::get_last_valid_lsn(); - if last_valid_lsn == 0 { + let mut startpoint = page_cache::get_last_valid_lsn(); + if startpoint == 0 { page_cache::init_valid_lsn(u64::from(_identify_system.xlogpos())); + startpoint = u64::from(_identify_system.xlogpos()); } - let startpoint = tokio_postgres::types::Lsn::from(last_valid_lsn); + let startpoint = tokio_postgres::types::Lsn::from(startpoint); + debug!("starting replication from {:?}...", startpoint); let mut physical_stream = rclient .start_physical_replication(None, startpoint, None) .await?; diff --git a/tests/test_pageserver.rs b/tests/test_pageserver.rs index be643c5f38..8a32ccfd5c 100644 --- a/tests/test_pageserver.rs +++ b/tests/test_pageserver.rs @@ -16,7 +16,7 @@ fn test_redo_cases() { let node = compute_cplane.new_vanilla_node(); // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(node.addr()); + let storage_cplane = StorageControlPlane::one_page_server(node.connstr()); let pageserver_addr = storage_cplane.page_server_addr(); // Configure that node to take pages from pageserver @@ -46,6 +46,6 @@ fn test_redo_cases() { #[test] fn test_regress() {} -// Runs pg_regress on a compute node +// Runs recovery with minio #[test] fn test_pageserver_recovery() {}