diff --git a/Cargo.lock b/Cargo.lock index 6d6cc17c8f..aef4a83785 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + [[package]] name = "arrayref" version = "0.3.6" @@ -315,6 +324,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -1055,6 +1079,7 @@ version = "0.1.0" dependencies = [ "byteorder", "bytes", + "clap", "crossbeam-channel", "futures", "lazy_static", @@ -1660,6 +1685,12 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "strsim" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + [[package]] name = "subtle" version = "2.4.0" @@ -1700,6 +1731,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.24" @@ -1898,6 +1938,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + [[package]] name = "unicode-xid" version = "0.2.1" @@ -1937,6 +1983,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34b2f665b594b07095e3ac3f718e13c2197143416fae4c5706cffb7b1af8d7f1" +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index a77fdcf370..9dd4a9b37b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ futures = "0.3.13" lazy_static = "1.4.0" log = "0.4.14" stderrlog = "0.5.1" +clap = "2.33.0" rust-s3 = { git = "https://github.com/hlinnaka/rust-s3", features = ["no-verify-ssl"] } tokio = { version = "1.3.0", features = ["full"] } tokio-stream = { version = "0.1.4" } diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index 74a038292d..6135543f55 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -2,58 +2,119 @@ // Main entry point for the Page Server executable // -use std::thread; use log::*; +use std::io::Error; +use std::{net::IpAddr, thread}; + +use clap::{App, Arg}; use pageserver::page_service; use pageserver::restore_s3; use pageserver::walreceiver; use pageserver::walredo; - -use std::io::Error; +use pageserver::PageServerConf; fn main() -> Result<(), Error> { + let arg_matches = App::new("Zenith page server") + .about("Materializes WAL stream to pages and serves them to the postgres") + .arg(Arg::with_name("datadir") + .short("D") + .long("dir") + .takes_value(true) + .help("Path to the page server data directory")) + .arg(Arg::with_name("wal_producer") + .short("w") + .long("wal-producer") + .takes_value(true) + .help("connect to the WAL sender (postgres or wal_acceptor) on ip:port (default: 127.0.0.1:65432)")) + .arg(Arg::with_name("daemonize") + .short("d") + .long("daemonize") + .takes_value(false) + .help("Run in the background")) + .arg(Arg::with_name("skip_recovery") + .long("skip-recovery") + .takes_value(false) + .help("Skip S3 recovery procedy and start empty")) + .get_matches(); + + let mut conf = PageServerConf { + data_dir: String::from("."), + daemonize: false, + wal_producer_ip: "127.0.0.1".parse::().unwrap(), + wal_producer_port: 65432, + skip_recovery: false, + }; + + if let Some(dir) = arg_matches.value_of("datadir") { + conf.data_dir = String::from(dir); + } + + if arg_matches.is_present("daemonize") { + conf.daemonize = true; + } + + if arg_matches.is_present("skip_recovery") { + conf.skip_recovery = true; + } + + if let Some(addr) = arg_matches.value_of("wal_producer") { + let parts: Vec<&str> = addr.split(':').collect(); + conf.wal_producer_ip = parts[0].parse().unwrap(); + conf.wal_producer_port = parts[1].parse().unwrap(); + } + + start_pageserver(conf) +} + +fn start_pageserver(conf: PageServerConf) -> Result<(), Error> { let mut threads = Vec::new(); // Initialize logger stderrlog::new() .verbosity(3) .module("pageserver") - .init().unwrap(); + .init() + .unwrap(); info!("starting..."); // Initialize the WAL applicator let walredo_thread = thread::Builder::new() - .name("WAL redo thread".into()).spawn( - || { - walredo::wal_applicator_main(); - }).unwrap(); + .name("WAL redo thread".into()) + .spawn(|| { + walredo::wal_applicator_main(); + }) + .unwrap(); threads.push(walredo_thread); - + // Before opening up for connections, restore the latest base backup from S3. // (We don't persist anything to local disk at the moment, so we need to do // this at every startup) - restore_s3::restore_main(); + if !conf.skip_recovery { + restore_s3::restore_main(); + } // Launch the WAL receiver thread. It will try to connect to the WAL safekeeper, // and stream the WAL. If the connection is lost, it will reconnect on its own. // We just fire and forget it here. let walreceiver_thread = thread::Builder::new() - .name("WAL receiver thread".into()).spawn( - || { - // thread code - walreceiver::thread_main(); - }).unwrap(); + .name("WAL receiver thread".into()) + .spawn(|| { + // thread code + walreceiver::thread_main(conf); + }) + .unwrap(); threads.push(walreceiver_thread); // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) let page_server_thread = thread::Builder::new() - .name("Page Service thread".into()).spawn( - || { - // thread code - page_service::thread_main(); - }).unwrap(); + .name("Page Service thread".into()) + .spawn(|| { + // thread code + page_service::thread_main(); + }) + .unwrap(); threads.push(page_server_thread); // never returns. diff --git a/src/control_plane.rs b/src/control_plane.rs index db2a456710..e260813e64 100644 --- a/src/control_plane.rs +++ b/src/control_plane.rs @@ -7,58 +7,91 @@ // local installations. // -use std::{io::Write, net::{IpAddr, Ipv4Addr}}; -use std::process::{Command}; -use std::path::{Path, PathBuf}; use std::fs::{self, OpenOptions}; +use std::path::{Path, PathBuf}; +use std::process::Command; use std::str; +use std::{ + io::Write, + net::{IpAddr, Ipv4Addr}, +}; use postgres::{Client, NoTls}; -// // -// // I'm intendedly modelling storage and compute control planes as a separate entities -// // as it is closer to the actual setup. -// // -// pub struct StorageControlPlane { -// keepers: Vec, -// pagers: Vec -// } +// +// I'm intendedly modelling storage and compute control planes as a separate entities +// as it is closer to the actual setup. +// +pub struct StorageControlPlane { + wal_acceptors: Vec, + last_wal_acceptor_port: u32, + pager_servers: Vec, + last_page_server_port: u32, +} -// impl StorageControlPlane { -// fn new_pageserver(&mut self) { +impl StorageControlPlane { + // postgres <-> page_server + // fn one_page_server(&mut self, pg_ip: IpAddr, pg_port: u32) -> StorageControlPlane {} -// } + // // postgres <-> wal_acceptor x3 <-> page_server + // fn local(&mut self) -> StorageControlPlane { + // } -// fn new_safekeeper(&mut self) { + fn get_page_server_conn_info() {} -// } -// } + fn get_wal_acceptor_conn_info() {} +} +pub struct PageServerNode { + page_service_ip: IpAddr, + page_service_port: u32, + wal_producer_ip: IpAddr, + wal_producer_port: u32, + data_dir: PathBuf, +} + +impl PageServerNode { + // TODO: method to force redo on a specific relation + + pub fn start() {} +} + +pub struct WalAcceptorNode { + port: u32, + ip: IpAddr, + data_dir: PathBuf, +} + +impl WalAcceptorNode {} + +/////////////////////////////////////////////////////////////////////////////// + +// +// ComputeControlPlane +// pub struct ComputeControlPlane { pg_install_dir: PathBuf, work_dir: PathBuf, last_assigned_port: u32, - nodes: Vec + nodes: Vec, } impl ComputeControlPlane { pub fn local() -> ComputeControlPlane { // postgres configure and `make temp-install` are using this path - let pg_install_dir = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("../tmp_install/"); + let pg_install_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("../tmp_install/"); - let work_dir = Path::new(env!("CARGO_MANIFEST_DIR")) - .join("tmp_install/"); + let work_dir = Path::new(env!("CARGO_MANIFEST_DIR")).join("tmp_install/"); ComputeControlPlane { pg_install_dir: pg_install_dir, work_dir: work_dir, last_assigned_port: 65431, - nodes: Vec::new() + nodes: Vec::new(), } } - // TODO: check port availability and + // TODO: check port availability and fn get_port(&mut self) -> u32 { let port = self.last_assigned_port + 1; self.last_assigned_port += 1; @@ -72,8 +105,8 @@ impl ComputeControlPlane { node_id: node_id, port: self.get_port(), ip: IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - pgdata: self.work_dir.join(format!("compute/pg{}",node_id)), - pg_install_dir: self.pg_install_dir.clone() + pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), + pg_install_dir: self.pg_install_dir.clone(), }; self.nodes.push(node); let node = self.nodes.last().unwrap(); @@ -97,7 +130,10 @@ impl ComputeControlPlane { // ", node.ip).as_str()); // listen for selected port - node.append_conf("postgresql.conf", format!("\ + node.append_conf( + "postgresql.conf", + format!( + "\ max_wal_senders = 10\n\ max_replication_slots = 10\n\ hot_standby = on\n\ @@ -107,24 +143,27 @@ impl ComputeControlPlane { listen_addresses = '{address}'\n\ port = {port}\n\ ", - address = node.ip, - port = node.port - ).as_str()); + address = node.ip, + port = node.port + ) + .as_str(), + ); node } } +/////////////////////////////////////////////////////////////////////////////// + pub struct PostgresNode { node_id: usize, port: u32, ip: IpAddr, pgdata: PathBuf, - pg_install_dir: PathBuf + pg_install_dir: PathBuf, } impl PostgresNode { - pub fn append_conf(&self, config: &str, opts: &str) { OpenOptions::new() .append(true) @@ -138,9 +177,11 @@ impl PostgresNode { let pg_ctl_path = self.pg_install_dir.join("bin/pg_ctl"); let pg_ctl = Command::new(pg_ctl_path) .args(&[ - "-D", self.pgdata.to_str().unwrap(), - "-l", self.pgdata.join("log").to_str().unwrap(), - action + "-D", + self.pgdata.to_str().unwrap(), + "-l", + self.pgdata.join("log").to_str().unwrap(), + action, ]) .env_clear() .status() @@ -178,8 +219,13 @@ impl PostgresNode { pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { // XXX: user! - let connstring = format!("host={} port={} dbname={} user={}", - self.ip, self.port, db, self.whoami()); + let connstring = format!( + "host={} port={} dbname={} user={}", + self.ip, + self.port, + db, + self.whoami() + ); let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); client.query(sql, &[]).unwrap() } @@ -198,4 +244,3 @@ impl Drop for PostgresNode { fs::remove_dir_all(self.pgdata.clone()).unwrap(); } } - diff --git a/src/lib.rs b/src/lib.rs index ca5c6c9779..875a447e14 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,19 @@ +use std::net::IpAddr; + +#[allow(dead_code)] +pub mod control_plane; + +pub mod page_cache; pub mod page_service; pub mod restore_s3; -pub mod walreceiver; -pub mod control_plane; pub mod waldecoder; -pub mod page_cache; +pub mod walreceiver; pub mod walredo; + +pub struct PageServerConf { + pub data_dir: String, + pub daemonize: bool, + pub wal_producer_ip: IpAddr, + pub wal_producer_port: u32, + pub skip_recovery: bool, +} diff --git a/src/walreceiver.rs b/src/walreceiver.rs index e41413e0c0..59c927d39c 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -14,6 +14,7 @@ use tokio::time::{sleep, Duration}; use crate::waldecoder::WalStreamDecoder; use crate::page_cache; use crate::page_cache::BufferTag; +use crate::PageServerConf; use tokio_postgres::{connect_replication, NoTls, Error, ReplicationMode}; use postgres_protocol::message::backend::ReplicationMessage; @@ -21,7 +22,7 @@ use postgres_protocol::message::backend::ReplicationMessage; // // This is the entry point for the WAL receiver thread. // -pub fn thread_main() { +pub fn thread_main(conf: PageServerConf) { info!("Starting WAL receiver"); @@ -32,7 +33,7 @@ pub fn thread_main() { runtime.block_on( async { loop { - let _res = walreceiver_main().await; + let _res = walreceiver_main(&conf).await; // TODO: print/log the error info!("WAL streaming connection failed, retrying in 5 seconds..."); @@ -41,13 +42,16 @@ pub fn thread_main() { }); } -async fn walreceiver_main() -> Result<(), Error> { +async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // Connect to the database in replication mode. - debug!("connecting..."); - let (mut rclient, connection) = - connect_replication("host=localhost user=zenith port=65432", NoTls, ReplicationMode::Physical).await?; - + let conn_str = format!("host={} user=zenith port={}", conf.wal_producer_ip, conf.wal_producer_port); + debug!("connecting to {}...", conn_str); + let (mut rclient, connection) = connect_replication( + conn_str.as_str(), + NoTls, + ReplicationMode::Physical + ).await?; debug!("connected!"); // The connection object performs the actual communication with the database,