diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index 94e5df7784..469c8f27ec 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -6,7 +6,7 @@ use log::*; use std::fs::File; use std::io; use std::path::PathBuf; -use std::{net::IpAddr, thread}; +use std::thread; use clap::{App, Arg}; use daemonize::Daemonize; @@ -36,6 +36,11 @@ fn main() -> Result<(), io::Error> { .long("wal-producer") .takes_value(true) .help("connect to the WAL sender (postgres or wal_acceptor) on ip:port (default: 127.0.0.1:65432)")) + .arg(Arg::with_name("listen") + .short("l") + .long("listen") + .takes_value(true) + .help("listen for incoming page requests on ip:port (default: 127.0.0.1:5430)")) .arg(Arg::with_name("interactive") .short("i") .long("interactive") @@ -56,8 +61,8 @@ fn main() -> Result<(), io::Error> { data_dir: PathBuf::from("./"), daemonize: false, interactive: false, - wal_producer_ip: "127.0.0.1".parse::().unwrap(), - wal_producer_port: 65432, + wal_producer_addr: "127.0.0.1:65432".parse().unwrap(), + listen_addr: "127.0.0.1:5430".parse().unwrap(), skip_recovery: false, }; @@ -85,9 +90,11 @@ fn main() -> Result<(), io::Error> { } if let Some(addr) = arg_matches.value_of("wal_producer") { - let parts: Vec<&str> = addr.split(':').collect(); - conf.wal_producer_ip = parts[0].parse().unwrap(); - conf.wal_producer_port = parts[1].parse().unwrap(); + conf.wal_producer_addr = addr.parse().unwrap(); + } + + if let Some(addr) = arg_matches.value_of("listen") { + conf.listen_addr = addr.parse().unwrap(); } start_pageserver(conf) @@ -160,22 +167,24 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { // Launch the WAL receiver thread. It will try to connect to the WAL safekeeper, // and stream the WAL. If the connection is lost, it will reconnect on its own. // We just fire and forget it here. + let conf1 = conf.clone(); let walreceiver_thread = thread::Builder::new() .name("WAL receiver thread".into()) .spawn(|| { // thread code - walreceiver::thread_main(conf); + walreceiver::thread_main(conf1); }) .unwrap(); threads.push(walreceiver_thread); // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) + let conf2 = conf.clone(); let page_server_thread = thread::Builder::new() .name("Page Service thread".into()) .spawn(|| { // thread code - page_service::thread_main(); + page_service::thread_main(conf2); }) .unwrap(); threads.push(page_server_thread); diff --git a/src/control_plane.rs b/src/control_plane.rs index bee7cbd56b..cbffb4b20b 100644 --- a/src/control_plane.rs +++ b/src/control_plane.rs @@ -13,7 +13,7 @@ use std::process::Command; use std::str; use std::{ io::Write, - net::{IpAddr, Ipv4Addr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, }; use postgres::{Client, NoTls}; @@ -24,41 +24,96 @@ use postgres::{Client, NoTls}; // pub struct StorageControlPlane { wal_acceptors: Vec, - last_wal_acceptor_port: u32, - pager_servers: Vec, - last_page_server_port: u32, + page_servers: Vec, } impl StorageControlPlane { // postgres <-> page_server - // fn one_page_server(&mut self, pg_ip: IpAddr, pg_port: u32) -> StorageControlPlane {} + pub fn one_page_server(pg_addr: SocketAddr) -> StorageControlPlane { + let mut cplane = StorageControlPlane { + wal_acceptors: Vec::new(), + page_servers: Vec::new(), + }; + + let workdir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/pageserver1"); + + fs::create_dir(workdir.clone()).unwrap(); + + let pserver = PageServerNode{ + page_service_addr: "127.0.0.1:65200".parse().unwrap(), + wal_producer_addr: pg_addr, + data_dir: workdir + }; + pserver.start(); + + cplane.page_servers.push(pserver); + cplane + } // // postgres <-> wal_acceptor x3 <-> page_server // fn local(&mut self) -> StorageControlPlane { // } - fn get_page_server_conn_info() {} + pub fn page_server_addr(&self) -> &SocketAddr { + &self.page_servers[0].page_service_addr + } fn get_wal_acceptor_conn_info() {} } pub struct PageServerNode { - page_service_ip: IpAddr, - page_service_port: u32, - wal_producer_ip: IpAddr, - wal_producer_port: u32, + page_service_addr: SocketAddr, + wal_producer_addr: SocketAddr, data_dir: PathBuf, } impl PageServerNode { // TODO: method to force redo on a specific relation - pub fn start() {} + fn binary_path(&self) -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join("./target/debug/pageserver") + } + + pub fn start(&self) { + let status = Command::new(self.binary_path()) + .args(&["-D", self.data_dir.to_str().unwrap()]) + .args(&["-w", self.wal_producer_addr.to_string().as_str()]) + .args(&["-l", self.page_service_addr.to_string().as_str()]) + .arg("-d") + .arg("--skip-recovery") + .env_clear() + .status() + .expect("failed to execute initdb"); + + if !status.success() { + panic!("pageserver start failed"); + } + } + + pub fn stop(&self) { + let pifile = self.data_dir.join("pageserver.pid"); + let pid = fs::read_to_string(pifile).unwrap(); + let status = Command::new("kill") + .arg(pid) + .env_clear() + .status() + .expect("failed to execute kill"); + + if !status.success() { + panic!("kill start failed"); + } + } +} + +impl Drop for PageServerNode { + fn drop(&mut self) { + self.stop(); + // fs::remove_dir_all(self.data_dir.clone()).unwrap(); + } } pub struct WalAcceptorNode { - port: u32, - ip: IpAddr, + listen: SocketAddr, data_dir: PathBuf, } @@ -72,7 +127,7 @@ impl WalAcceptorNode {} pub struct ComputeControlPlane { pg_install_dir: PathBuf, work_dir: PathBuf, - last_assigned_port: u32, + last_assigned_port: u16, nodes: Vec, } @@ -92,7 +147,7 @@ impl ComputeControlPlane { } // TODO: check port availability and - fn get_port(&mut self) -> u32 { + fn get_port(&mut self) -> u16 { let port = self.last_assigned_port + 1; self.last_assigned_port += 1; port @@ -132,8 +187,7 @@ impl ComputeControlPlane { // listen for selected port node.append_conf( "postgresql.conf", - format!( - "\ + format!("\ max_wal_senders = 10\n\ max_replication_slots = 10\n\ hot_standby = on\n\ @@ -142,12 +196,7 @@ impl ComputeControlPlane { wal_level = replica\n\ listen_addresses = '{address}'\n\ port = {port}\n\ - ", - address = node.ip, - port = node.port - ) - .as_str(), - ); + ", address = node.ip, port = node.port).as_str()); node } @@ -157,7 +206,7 @@ impl ComputeControlPlane { pub struct PostgresNode { _node_id: usize, - port: u32, + port: u16, ip: IpAddr, pgdata: PathBuf, pg_install_dir: PathBuf, @@ -204,6 +253,10 @@ impl PostgresNode { self.pg_ctl("stop", true); } + pub fn addr(&self) -> SocketAddr { + SocketAddr::new(self.ip, self.port) + } + // XXX: cache that in control plane fn whoami(&self) -> String { let output = Command::new("whoami") @@ -241,6 +294,6 @@ impl Drop for PostgresNode { // TODO: put logs to a separate location to run `tail -F` on them fn drop(&mut self) { self.pg_ctl("stop", false); - fs::remove_dir_all(self.pgdata.clone()).unwrap(); + // fs::remove_dir_all(self.pgdata.clone()).unwrap(); } } diff --git a/src/lib.rs b/src/lib.rs index 1dfc13f74d..a45dd484a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use std::net::IpAddr; +use std::net::SocketAddr; use std::path::PathBuf; #[allow(dead_code)] @@ -15,11 +15,12 @@ pub mod tui_event; mod tui_logger; #[allow(dead_code)] +#[derive(Clone)] pub struct PageServerConf { pub data_dir: PathBuf, pub daemonize: bool, pub interactive: bool, - pub wal_producer_ip: IpAddr, - pub wal_producer_port: u32, + pub wal_producer_addr: SocketAddr, + pub listen_addr: SocketAddr, pub skip_recovery: bool, } diff --git a/src/page_service.rs b/src/page_service.rs index d9160db985..f811fb260a 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -19,6 +19,7 @@ use std::io; use log::*; use crate::page_cache; +use crate::PageServerConf; type Result = std::result::Result; @@ -205,7 +206,7 @@ impl FeMessage { /////////////////////////////////////////////////////////////////////////////// -pub fn thread_main() { +pub fn thread_main(conf: PageServerConf) { // Create a new thread pool // @@ -214,11 +215,10 @@ pub fn thread_main() { //let runtime = runtime::Runtime::new().unwrap(); let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap(); - let listen_address = "127.0.0.1:5430"; - info!("Starting page server on {}", listen_address); + info!("Starting page server on {}", conf.listen_addr); runtime.block_on(async { - let _unused = page_service_main(listen_address).await; + let _unused = page_service_main(conf.listen_addr.to_string().as_str()).await; }); } diff --git a/src/walreceiver.rs b/src/walreceiver.rs index b5ba88ebf6..3031a7cce9 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -45,7 +45,7 @@ pub fn thread_main(conf: PageServerConf) { async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // Connect to the database in replication mode. - let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_ip, conf.wal_producer_port); + let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_addr.ip(), conf.wal_producer_addr.port()); debug!("connecting to {}...", conn_str); let (mut rclient, connection) = connect_replication( conn_str.as_str(), diff --git a/tests/control_plane.rs b/tests/control_plane.rs index 8e607b83f6..e00ee02e6a 100644 --- a/tests/control_plane.rs +++ b/tests/control_plane.rs @@ -1,4 +1,5 @@ use pageserver::control_plane::ComputeControlPlane; +use pageserver::control_plane::StorageControlPlane; #[test] fn test_actions() { diff --git a/tests/pageserver.rs b/tests/pageserver.rs index 308a6cd530..9ab48dd453 100644 --- a/tests/pageserver.rs +++ b/tests/pageserver.rs @@ -1,10 +1,44 @@ +use std::thread::sleep; +use std::time::Duration; + +use pageserver::control_plane::ComputeControlPlane; +use pageserver::control_plane::StorageControlPlane; + // XXX: force all redo at the end -// -- restart + seqscan won'r read deleted stuff +// -- restart + seqscan won't read deleted stuff // -- pageserver api endpoint to check all rels // Handcrafted cases with wal records that are (were) problematic for redo. #[test] -fn test_redo_cases() {} +fn test_redo_cases() { + // Allocate postgres instance + let mut compute_cplane = ComputeControlPlane::local(); + let node = compute_cplane.new_vanilla_node(); + + // Start pageserver that reads WAL directly from that postgres + let storage_cplane = StorageControlPlane::one_page_server(node.addr()); + let pageserver_addr = storage_cplane.page_server_addr(); + + // Configure that node to take pages from pageserver + node.append_conf("postgresql.conf", format!("\ + page_server_connstring = 'host={} port={}'\n\ + ", pageserver_addr.ip(), pageserver_addr.port()).as_str()); + + println!("pew!"); + sleep(Duration::from_secs(5)); + + node.start(); + node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); + node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'"); + + let count: i64 = node + .safe_psql("postgres", "SELECT count(*) FROM t") + .first() + .unwrap() + .get(0); + + assert_eq!(count, 100000); +} // Runs pg_regress on a compute node #[test]