diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index c1ebc96685..756566caa3 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -27,4 +27,4 @@ jobs: - name: Run test run: | - cargo test --test test_pageserver -- --nocapture --test-threads=1 \ No newline at end of file + cargo test --test test_pageserver -- --nocapture --test-threads=1 diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index aba466a1e6..13c37f8259 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -7,6 +7,7 @@ use std::{fs::File, str::FromStr}; use std::io; use std::path::PathBuf; use std::thread; +use std::fs; use clap::{App, Arg}; use daemonize::Daemonize; @@ -20,7 +21,6 @@ use pageserver::page_service; use pageserver::restore_s3; use pageserver::tui; use pageserver::walreceiver; -use pageserver::walredo; use pageserver::PageServerConf; fn main() -> Result<(), io::Error> { @@ -61,7 +61,7 @@ fn main() -> Result<(), io::Error> { data_dir: PathBuf::from("./"), daemonize: false, interactive: false, - wal_producer_connstr: String::from_str("host=127.0.0.1 port=65432 user=zenith").unwrap(), + wal_producer_connstr: None, listen_addr: "127.0.0.1:5430".parse().unwrap(), skip_recovery: false, }; @@ -90,7 +90,7 @@ fn main() -> Result<(), io::Error> { } if let Some(addr) = arg_matches.value_of("wal_producer") { - conf.wal_producer_connstr = String::from_str(addr).unwrap(); + conf.wal_producer_connstr = Some(String::from_str(addr).unwrap()); } if let Some(addr) = arg_matches.value_of("listen") { @@ -148,43 +148,51 @@ fn start_pageserver(conf: PageServerConf) -> Result<(), io::Error> { info!("starting..."); - // Initialize the WAL applicator - let walredo_thread = thread::Builder::new() - .name("WAL redo thread".into()) - .spawn(|| { - walredo::wal_applicator_main(); - }) - .unwrap(); - threads.push(walredo_thread); - // Before opening up for connections, restore the latest base backup from S3. // (We don't persist anything to local disk at the moment, so we need to do // this at every startup) if !conf.skip_recovery { - restore_s3::restore_main(); + restore_s3::restore_main(&conf); } - // Launch the WAL receiver thread. It will try to connect to the WAL safekeeper, - // and stream the WAL. If the connection is lost, it will reconnect on its own. - // We just fire and forget it here. - let conf1 = conf.clone(); - let walreceiver_thread = thread::Builder::new() - .name("WAL receiver thread".into()) - .spawn(|| { - // thread code - walreceiver::thread_main(conf1); - }) - .unwrap(); - threads.push(walreceiver_thread); + // Create directory for wal-redo datadirs + match fs::create_dir(conf.data_dir.join("wal-redo")) { + Ok(_) => {}, + Err(e) => match e.kind() { + io::ErrorKind::AlreadyExists => {} + _ => { + panic!("Failed to create wal-redo data directory: {}", e); + } + } + } + + // Launch the WAL receiver thread if pageserver was started with --wal-producer + // option. It will try to connect to the WAL safekeeper, and stream the WAL. If + // the connection is lost, it will reconnect on its own. We just fire and forget + // it here. + // + // All other wal receivers are started on demand by "callmemaybe" command + // sent to pageserver. + let conf_copy = conf.clone(); + if let Some(wal_producer) = conf.wal_producer_connstr { + let conf = conf_copy.clone(); + let walreceiver_thread = thread::Builder::new() + .name("static WAL receiver thread".into()) + .spawn(move || { + walreceiver::thread_main(conf, &wal_producer); + }) + .unwrap(); + threads.push(walreceiver_thread); + } // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) - let conf2 = conf.clone(); + let conf = conf_copy.clone(); let page_server_thread = thread::Builder::new() .name("Page Service thread".into()) .spawn(|| { // thread code - page_service::thread_main(conf2); + page_service::thread_main(conf); }) .unwrap(); threads.push(page_server_thread); diff --git a/src/control_plane.rs b/src/control_plane.rs index 2fddda4dfb..8cb004863a 100644 --- a/src/control_plane.rs +++ b/src/control_plane.rs @@ -7,7 +7,7 @@ // local installations. // -use std::fs::{self, OpenOptions}; +use std::{fs::{self, OpenOptions}, rc::Rc}; use std::path::{Path, PathBuf}; use std::process::Command; use std::str; @@ -45,7 +45,7 @@ pub struct StorageControlPlane { impl StorageControlPlane { // postgres <-> page_server - pub fn one_page_server(pg_connstr: String) -> StorageControlPlane { + pub fn one_page_server() -> StorageControlPlane { let mut cplane = StorageControlPlane { wal_acceptors: Vec::new(), page_servers: Vec::new(), @@ -53,7 +53,6 @@ impl StorageControlPlane { let pserver = PageServerNode { page_service_addr: "127.0.0.1:65200".parse().unwrap(), - wal_producer_connstr: pg_connstr, data_dir: TEST_WORKDIR.join("pageserver") }; pserver.init(); @@ -73,26 +72,25 @@ impl StorageControlPlane { fn get_wal_acceptor_conn_info() {} + pub fn page_server_psql(&self, sql: &str) -> Vec { + let addr = &self.page_servers[0].page_service_addr; - pub fn simple_query_storage(&self, db: &str, user: &str, sql: &str) -> Vec { let connstring = format!( "host={} port={} dbname={} user={}", - self.page_server_addr().ip(), - self.page_server_addr().port(), - db, - user + addr.ip(), + addr.port(), + "no_db", + "no_user", ); - let mut client = Client::connect(connstring.as_str(), NoTls).unwrap(); - println!("Running {}", sql); + println!("Pageserver query: '{}'", sql); client.simple_query(sql).unwrap() } } pub struct PageServerNode { page_service_addr: SocketAddr, - wal_producer_connstr: String, data_dir: PathBuf, } @@ -119,17 +117,15 @@ impl PageServerNode { } pub fn start(&self) { - println!("Starting pageserver at '{}', wal_producer='{}'", self.page_service_addr, self.wal_producer_connstr); + println!("Starting pageserver at '{}'", self.page_service_addr); let status = Command::new(CARGO_BIN_DIR.join("pageserver")) .args(&["-D", self.data_dir.to_str().unwrap()]) - .args(&["-w", self.wal_producer_connstr.as_str()]) .args(&["-l", self.page_service_addr.to_string().as_str()]) .arg("-d") .arg("--skip-recovery") .env_clear() .env("PATH", PG_BIN_DIR.to_str().unwrap()) // path to postres-wal-redo binary - .env("PGDATA", self.data_dir.join("wal_redo_pgdata")) // postres-wal-redo pgdata .status() .expect("failed to start pageserver"); @@ -172,19 +168,22 @@ impl WalAcceptorNode {} // // ComputeControlPlane // -pub struct ComputeControlPlane { +pub struct ComputeControlPlane<'a> { pg_bin_dir: PathBuf, work_dir: PathBuf, last_assigned_port: u16, - nodes: Vec, + storage_cplane: &'a StorageControlPlane, + nodes: Vec>, } -impl ComputeControlPlane { - pub fn local() -> ComputeControlPlane { +impl ComputeControlPlane<'_> { + + pub fn local(storage_cplane : &StorageControlPlane) -> ComputeControlPlane { ComputeControlPlane { pg_bin_dir: PG_BIN_DIR.to_path_buf(), work_dir: TEST_WORKDIR.to_path_buf(), last_assigned_port: 65431, + storage_cplane: storage_cplane, nodes: Vec::new(), } } @@ -196,7 +195,7 @@ impl ComputeControlPlane { port } - pub fn new_vanilla_node(&mut self) -> &PostgresNode { + pub fn new_vanilla_node<'a>(&mut self) -> &Rc { // allocate new node entry with generated port let node_id = self.nodes.len() + 1; let node = PostgresNode { @@ -206,7 +205,7 @@ impl ComputeControlPlane { pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), pg_bin_dir: self.pg_bin_dir.clone(), }; - self.nodes.push(node); + self.nodes.push(Rc::new(node)); let node = self.nodes.last().unwrap(); // initialize data directory @@ -260,7 +259,7 @@ impl ComputeControlPlane { pgdata: self.work_dir.join(format!("compute/pg{}", node_id)), pg_bin_dir: self.pg_bin_dir.clone(), }; - self.nodes.push(node); + self.nodes.push(Rc::new(node)); let node = self.nodes.last().unwrap(); // initialize data directory w/o files @@ -297,6 +296,20 @@ impl ComputeControlPlane { node } + + pub fn new_node(&mut self) -> Rc { + let storage_cplane = self.storage_cplane; + let node = self.new_vanilla_node(); + + let pserver = storage_cplane.page_server_addr(); + + // Configure that node to take pages from pageserver + node.append_conf("postgresql.conf", format!("\ + page_server_connstring = 'host={} port={}'\n\ + ", pserver.ip(), pserver.port()).as_str()); + + node.clone() + } } /////////////////////////////////////////////////////////////////////////////// @@ -339,8 +352,11 @@ impl PostgresNode { } } - pub fn start(&self) { - println!("Started postgres node at '{}'", self.connstr()); + pub fn start(&self, storage_cplane: &StorageControlPlane) { + let _res = storage_cplane + .page_server_psql(format!("callmemaybe {}", self.connstr()).as_str()); + + println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl("start", true); } @@ -349,11 +365,11 @@ impl PostgresNode { } pub fn stop(&self) { - self.pg_ctl("stop", true); + self.pg_ctl("-m immediate stop", true); } pub fn connstr(&self) -> String { - format!("user={} host={} port={}", self.whoami(), self.ip, self.port) + format!("host={} port={} user={}", self.ip, self.port, self.whoami()) } // XXX: cache that in control plane @@ -370,7 +386,6 @@ impl PostgresNode { } pub fn safe_psql(&self, db: &str, sql: &str) -> Vec { - // XXX: user! let connstring = format!( "host={} port={} dbname={} user={}", self.ip, diff --git a/src/lib.rs b/src/lib.rs index 7b66b188de..a6d00e879b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,18 +19,18 @@ pub mod xlog_utils; mod tui_logger; #[allow(dead_code)] -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct PageServerConf { pub data_dir: PathBuf, pub daemonize: bool, pub interactive: bool, - pub wal_producer_connstr: String, + pub wal_producer_connstr: Option, pub listen_addr: SocketAddr, pub skip_recovery: bool, } #[allow(dead_code)] -#[derive(Debug,Clone)] +#[derive(Debug, Clone)] pub struct WalAcceptorConf { pub data_dir: PathBuf, pub no_sync: bool, diff --git a/src/page_cache.rs b/src/page_cache.rs index 10db125e94..918f7c6108 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -7,25 +7,27 @@ // use core::ops::Bound::Included; -use std::convert::TryInto; +use std::{convert::TryInto, ops::AddAssign}; use std::collections::{BTreeMap, HashMap}; use std::error::Error; -use std::sync::Arc; -use std::sync::Condvar; -use std::sync::Mutex; +use std::sync::{Arc,Condvar, Mutex}; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::time::Duration; +use std::thread; +// use tokio::sync::RwLock; use bytes::Bytes; use lazy_static::lazy_static; use rand::Rng; use log::*; +use crate::{PageServerConf, walredo}; + use crossbeam_channel::unbounded; use crossbeam_channel::{Sender, Receiver}; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. -static TIMEOUT: Duration = Duration::from_secs(20); +static TIMEOUT: Duration = Duration::from_secs(60); pub struct PageCache { shared: Mutex, @@ -49,6 +51,7 @@ pub struct PageCache { pub last_record_lsn: AtomicU64, } +#[derive(Clone)] pub struct PageCacheStats { pub num_entries: u64, pub num_page_images: u64, @@ -59,6 +62,21 @@ pub struct PageCacheStats { pub last_record_lsn: u64, } +impl AddAssign for PageCacheStats { + + fn add_assign(&mut self, other: Self) { + *self = Self { + num_entries: self.num_entries + other.num_entries, + num_page_images: self.num_page_images + other.num_page_images, + num_wal_records: self.num_wal_records + other.num_wal_records, + num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests, + first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn, + last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn, + last_record_lsn: self.last_record_lsn + other.last_record_lsn, + } + } +} + // // Shared data structure, holding page cache and related auxiliary information // @@ -94,8 +112,31 @@ struct PageCacheShared { } lazy_static! { - pub static ref PAGECACHE : PageCache = init_page_cache(); + pub static ref PAGECACHES : Mutex>> = Mutex::new(HashMap::new()); } + +pub fn get_pagecahe(conf: PageServerConf, sys_id : u64) -> Arc { + let mut pcaches = PAGECACHES.lock().unwrap(); + + if !pcaches.contains_key(&sys_id) { + pcaches.insert(sys_id, Arc::new(init_page_cache())); + + // Initialize the WAL redo thread + // + // Now join_handle is not saved any where and we won'try restart tharead + // if it is dead. We may later stop that treads after some inactivity period + // and restart them on demand. + let _walredo_thread = thread::Builder::new() + .name("WAL redo thread".into()) + .spawn(move || { + walredo::wal_redo_main(conf, sys_id); + }) + .unwrap(); + } + + pcaches.get(&sys_id).unwrap().clone() +} + fn init_page_cache() -> PageCache { // Initialize the channel between the page cache and the WAL applicator @@ -208,14 +249,16 @@ pub struct WALRecord { // Public interface functions +impl PageCache { + // // GetPage@LSN // // Returns an 8k page image // -pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result> +pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result> { - PAGECACHE.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); // Look up cache entry. If it's a page image, return that. If it's a WAL record, // ask the WAL redo service to reconstruct the page image from the WAL records. @@ -224,14 +267,14 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result let entry_rc: Arc; { - let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); let mut waited = false; while lsn > shared.last_valid_lsn { // TODO: Wait for the WAL receiver to catch up waited = true; trace!("not caught up yet: {}, requested {}", shared.last_valid_lsn, lsn); - let wait_result = PAGECACHE.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap(); + let wait_result = self.valid_lsn_condvar.wait_timeout(shared, TIMEOUT).unwrap(); shared = wait_result.0; if wait_result.1.timed_out() { @@ -283,7 +326,7 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result assert!(!entry_content.apply_pending); entry_content.apply_pending = true; - let s = &PAGECACHE.walredo_sender; + let s = &self.walredo_sender; s.send(entry_rc.clone())?; } @@ -323,10 +366,10 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result // Returns an old page image (if any), and a vector of WAL records to apply // over it. // -pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option, Vec) +pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { // Scan the BTreeMap backwards, starting from the given entry. - let shared = PAGECACHE.shared.lock().unwrap(); + let shared = self.shared.lock().unwrap(); let pagecache = &shared.pagecache; let minkey = CacheKey { @@ -377,7 +420,7 @@ pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option, Vec= shared.last_valid_lsn); shared.last_valid_lsn = lsn; - PAGECACHE.valid_lsn_condvar.notify_all(); + self.valid_lsn_condvar.notify_all(); - PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); } // // NOTE: this updates last_valid_lsn as well. // -pub fn advance_last_record_lsn(lsn: u64) +pub fn advance_last_record_lsn(&self, lsn: u64) { - let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); // Can't move backwards. assert!(lsn >= shared.last_valid_lsn); @@ -465,16 +508,16 @@ pub fn advance_last_record_lsn(lsn: u64) shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - PAGECACHE.valid_lsn_condvar.notify_all(); + self.valid_lsn_condvar.notify_all(); - PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); - PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); } // -pub fn _advance_first_valid_lsn(lsn: u64) +pub fn _advance_first_valid_lsn(&self, lsn: u64) { - let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); // Can't move backwards. assert!(lsn >= shared.first_valid_lsn); @@ -484,12 +527,12 @@ pub fn _advance_first_valid_lsn(lsn: u64) assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); shared.first_valid_lsn = lsn; - PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn, Ordering::Relaxed); } -pub fn init_valid_lsn(lsn: u64) +pub fn init_valid_lsn(&self, lsn: u64) { - let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); assert!(shared.first_valid_lsn == 0); assert!(shared.last_valid_lsn == 0); @@ -498,14 +541,15 @@ pub fn init_valid_lsn(lsn: u64) shared.first_valid_lsn = lsn; shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - PAGECACHE.first_valid_lsn.store(lsn, Ordering::Relaxed); - PAGECACHE.last_valid_lsn.store(lsn, Ordering::Relaxed); - PAGECACHE.last_record_lsn.store(lsn, Ordering::Relaxed); + + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); } -pub fn get_last_record_lsn() -> u64 +pub fn get_last_valid_lsn(&self) -> u64 { - let shared = PAGECACHE.shared.lock().unwrap(); + let shared = self.shared.lock().unwrap(); return shared.last_record_lsn; } @@ -517,7 +561,7 @@ pub fn get_last_record_lsn() -> u64 // 2. Request that page with GetPage@LSN, using Max LSN (i.e. get the latest page version) // // -pub fn _test_get_page_at_lsn() +pub fn _test_get_page_at_lsn(&self) { // for quick testing of the get_page_at_lsn() funcion. // @@ -527,7 +571,7 @@ pub fn _test_get_page_at_lsn() let mut tag: Option = None; { - let shared = PAGECACHE.shared.lock().unwrap(); + let shared = self.shared.lock().unwrap(); let pagecache = &shared.pagecache; if pagecache.is_empty() { @@ -548,7 +592,7 @@ pub fn _test_get_page_at_lsn() } info!("testing GetPage@LSN for block {}", tag.unwrap().blknum); - match get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { + match self.get_page_at_lsn(tag.unwrap(), 0xffff_ffff_ffff_eeee) { Ok(_img) => { // This prints out the whole page image. //println!("{:X?}", img); @@ -563,9 +607,9 @@ pub fn _test_get_page_at_lsn() // FIXME: Shouldn't relation size also be tracked with an LSN? // If a replica is lagging behind, it needs to get the size as it was on // the replica's current replay LSN. -pub fn relsize_inc(rel: &RelTag, to: Option) +pub fn relsize_inc(&self, rel: &RelTag, to: Option) { - let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); let entry = shared.relsize_cache.entry(*rel).or_insert(0); if let Some(to) = to { @@ -575,29 +619,51 @@ pub fn relsize_inc(rel: &RelTag, to: Option) } } -pub fn relsize_get(rel: &RelTag) -> u32 +pub fn relsize_get(&self, rel: &RelTag) -> u32 { - let mut shared = PAGECACHE.shared.lock().unwrap(); + let mut shared = self.shared.lock().unwrap(); let entry = shared.relsize_cache.entry(*rel).or_insert(0); *entry } -pub fn relsize_exist(rel: &RelTag) -> bool +pub fn relsize_exist(&self, rel: &RelTag) -> bool { - let shared = PAGECACHE.shared.lock().unwrap(); + let shared = self.shared.lock().unwrap(); let relsize_cache = &shared.relsize_cache; relsize_cache.contains_key(rel) } -pub fn get_stats() -> PageCacheStats +pub fn get_stats(&self) -> PageCacheStats { PageCacheStats { - num_entries: PAGECACHE.num_entries.load(Ordering::Relaxed), - num_page_images: PAGECACHE.num_page_images.load(Ordering::Relaxed), - num_wal_records: PAGECACHE.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: PAGECACHE.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: PAGECACHE.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: PAGECACHE.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: PAGECACHE.last_record_lsn.load(Ordering::Relaxed), + num_entries: self.num_entries.load(Ordering::Relaxed), + num_page_images: self.num_page_images.load(Ordering::Relaxed), + num_wal_records: self.num_wal_records.load(Ordering::Relaxed), + num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), + last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), + last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), } } + +} + +pub fn get_stats() -> PageCacheStats +{ + let pcaches = PAGECACHES.lock().unwrap(); + + let mut stats = PageCacheStats { + num_entries: 0, + num_page_images: 0, + num_wal_records: 0, + num_getpage_requests: 0, + first_valid_lsn: 0, + last_valid_lsn: 0, + last_record_lsn: 0, + }; + + pcaches.iter().for_each(|(_sys_id, pcache)| { + stats += pcache.get_stats(); + }); + stats +} diff --git a/src/page_service.rs b/src/page_service.rs index 761e78e841..a527ce7538 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -15,11 +15,13 @@ use tokio::runtime; use tokio::task; use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, Bytes, BytesMut}; -use std::io; +use std::{io}; +use std::thread; use log::*; use crate::page_cache; use crate::PageServerConf; +use crate::walreceiver; type Result = std::result::Result; @@ -221,43 +223,38 @@ pub fn thread_main(conf: PageServerConf) { info!("Starting page server on {}", conf.listen_addr); runtime.block_on(async { - let _unused = page_service_main(conf.listen_addr.to_string().as_str()).await; + let listener = TcpListener::bind(conf.listen_addr).await.unwrap(); + + loop { + let (socket, peer_addr) = listener.accept().await.unwrap(); + debug!("accepted connection from {}", peer_addr); + let mut conn_handler = Connection::new(conf.clone(), socket); + + task::spawn(async move { + if let Err(err) = conn_handler.run().await { + error!("error: {}", err); + } + }); + } }); } -async fn page_service_main(listen_address: &str) { - - let listener = TcpListener::bind(listen_address).await.unwrap(); - - loop { - let (socket, peer_addr) = listener.accept().await.unwrap(); - - debug!("accepted connection from {}", peer_addr); - - let mut conn_handler = Connection::new(socket); - - task::spawn(async move { - if let Err(err) = conn_handler.run().await { - error!("error: {}", err); - } - }); - } -} - #[derive(Debug)] struct Connection { stream: BufWriter, buffer: BytesMut, - init_done: bool + init_done: bool, + conf: PageServerConf, } impl Connection { - pub fn new(socket: TcpStream) -> Connection { + pub fn new(conf: PageServerConf, socket: TcpStream) -> Connection { Connection { stream: BufWriter::new(socket), buffer: BytesMut::with_capacity(10 * 1024), - init_done: false + init_done: false, + conf: conf } } @@ -433,10 +430,40 @@ impl Connection { async fn process_query(&mut self, q : &FeQueryMessage) -> Result<()> { trace!("got query {:?}", q.body); - if q.body.starts_with(b"pagestream") { - self.handle_pagerequests().await - } else if q.body.starts_with(b"controlfile") { + if q.body.starts_with(b"controlfile") { + self.handle_controlfile().await + + } else if q.body.starts_with(b"pagestream ") { + let (_l,r) = q.body.split_at("pagestream ".len()); + let mut r = r.to_vec(); + r.pop(); + let sysid = String::from_utf8(r).unwrap().trim().to_string(); + let sysid: u64 = sysid.parse().unwrap(); // XXX + + self.handle_pagerequests(sysid).await + + } else if q.body.starts_with(b"callmemaybe ") { + + let (_l,r) = q.body.split_at("callmemaybe ".len()); + let mut r = r.to_vec(); + r.pop(); + let connstr = String::from_utf8(r).unwrap().trim().to_string(); + + let conf_copy = self.conf.clone(); + let _walreceiver_thread = thread::Builder::new() + .name("WAL receiver thread".into()) + .spawn(move || { + walreceiver::thread_main(conf_copy,&connstr); + }) + .unwrap(); + + // generick ack: + self.write_message_noflush(&BeMessage::RowDescription).await?; + self.write_message_noflush(&BeMessage::DataRow).await?; + self.write_message_noflush(&BeMessage::CommandComplete).await?; + self.write_message(&BeMessage::ReadyForQuery).await + } else if q.body.starts_with(b"status") { self.write_message_noflush(&BeMessage::RowDescription).await?; self.write_message_noflush(&BeMessage::DataRow).await?; @@ -456,10 +483,9 @@ impl Connection { self.write_message_noflush(&BeMessage::ControlFile).await?; self.write_message_noflush(&BeMessage::CommandComplete).await?; self.write_message(&BeMessage::ReadyForQuery).await - } - async fn handle_pagerequests(&mut self) -> Result<()> { + async fn handle_pagerequests(&mut self, sysid: u64) -> Result<()> { /* switch client to COPYBOTH */ self.stream.write_u8(b'W').await?; @@ -468,13 +494,15 @@ impl Connection { self.stream.write_i16(0).await?; /* numAttributes */ self.stream.flush().await?; + let pcache = page_cache::get_pagecahe(self.conf.clone(), sysid); + loop { let message = self.read_message().await?; // XXX: none seems to appear a lot in log. // Do we have conditions for busy-loop here? if let Some(m) = &message { - info!("query: {:?}", m); + info!("query({}): {:?}", sysid, m); }; if message.is_none() { @@ -492,7 +520,7 @@ impl Connection { forknum: req.forknum, }; - let exist = page_cache::relsize_exist(&tag); + let exist = pcache.relsize_exist(&tag); self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: exist, @@ -520,7 +548,7 @@ impl Connection { forknum: req.forknum, }; - let n_blocks = page_cache::relsize_get(&tag); + let n_blocks = pcache.relsize_get(&tag); self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, @@ -535,27 +563,25 @@ impl Connection { forknum: req.forknum, blknum: req.blkno }; - let lsn = req.lsn; - let msg; - { - let p = page_cache::get_page_at_lsn(buf_tag, lsn); - if p.is_ok() { - msg = ZenithReadResponse { + let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) { + Ok(p) => { + BeMessage::ZenithReadResponse(ZenithReadResponse { ok: true, n_blocks: 0, - page: p.unwrap() - }; - } else { + page: p + }) + }, + Err(e) => { const ZERO_PAGE:[u8; 8192] = [0; 8192]; - msg = ZenithReadResponse { + error!("get_page_at_lsn: {}", e); + BeMessage::ZenithReadResponse(ZenithReadResponse { ok: false, n_blocks: 0, page: Bytes::from_static(&ZERO_PAGE) - } + }) } - } - let msg = BeMessage::ZenithReadResponse(msg); + }; self.write_message(&msg).await? @@ -568,7 +594,7 @@ impl Connection { forknum: req.forknum, }; - page_cache::relsize_inc(&tag, None); + pcache.relsize_inc(&tag, None); self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: true, @@ -583,7 +609,7 @@ impl Connection { forknum: req.forknum, }; - page_cache::relsize_inc(&tag, Some(req.blkno)); + pcache.relsize_inc(&tag, Some(req.blkno)); self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: true, diff --git a/src/restore_s3.rs b/src/restore_s3.rs index 67b08e2007..17ed726c81 100644 --- a/src/restore_s3.rs +++ b/src/restore_s3.rs @@ -22,7 +22,7 @@ use tokio::runtime; use futures::future; -use crate::page_cache; +use crate::{PageServerConf, page_cache}; struct Storage { region: Region, @@ -30,12 +30,12 @@ struct Storage { bucket: String } -pub fn restore_main() { +pub fn restore_main(conf: &PageServerConf) { // Create a new thread pool let runtime = runtime::Runtime::new().unwrap(); runtime.block_on(async { - let result = restore_chunk().await; + let result = restore_chunk(conf).await; match result { Ok(_) => { return; }, @@ -55,7 +55,7 @@ pub fn restore_main() { // // Load it all into the page cache. // -async fn restore_chunk() -> Result<(), S3Error> { +async fn restore_chunk(conf: &PageServerConf) -> Result<(), S3Error> { let backend = Storage { region: Region::Custom { @@ -79,6 +79,8 @@ async fn restore_chunk() -> Result<(), S3Error> { // List out contents of directory let results: Vec = bucket.list("relationdata/".to_string(), Some("".to_string())).await?; + // TODO: get that from backup + let sys_id: u64 = 42; let mut oldest_lsn = 0; let mut slurp_futures: Vec<_> = Vec::new(); @@ -98,7 +100,7 @@ async fn restore_chunk() -> Result<(), S3Error> { oldest_lsn = p.lsn; } let b = bucket.clone(); - let f = slurp_base_file(b, key.to_string(), p); + let f = slurp_base_file(conf, sys_id, b, key.to_string(), p); slurp_futures.push(f); } @@ -110,7 +112,9 @@ async fn restore_chunk() -> Result<(), S3Error> { if oldest_lsn == 0 { panic!("no base backup found"); } - page_cache::init_valid_lsn(oldest_lsn); + + let pcache = page_cache::get_pagecahe(conf.clone(), sys_id); + pcache.init_valid_lsn(oldest_lsn); info!("{} files to restore...", slurp_futures.len()); @@ -266,7 +270,7 @@ fn parse_rel_file_path(path: &str) -> Result= 8192 { let tag = page_cache::BufferTag { @@ -290,7 +296,7 @@ async fn slurp_base_file(bucket: Bucket, s3path: String, parsed: ParsedBaseImage blknum: blknum }; - page_cache::put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); + pcache.put_page_image(tag, parsed.lsn, bytes.copy_to_bytes(8192)); blknum += 1; } diff --git a/src/walreceiver.rs b/src/walreceiver.rs index d0c9c4b18b..ad4c82fc14 100644 --- a/src/walreceiver.rs +++ b/src/walreceiver.rs @@ -22,9 +22,9 @@ use postgres_protocol::message::backend::ReplicationMessage; // // This is the entry point for the WAL receiver thread. // -pub fn thread_main(conf: PageServerConf) { +pub fn thread_main(conf: PageServerConf, wal_producer_connstr: &String) { - info!("WAL receiver thread started"); + info!("WAL receiver thread started: '{}'", wal_producer_connstr); let runtime = runtime::Builder::new_current_thread() .enable_all() @@ -33,7 +33,7 @@ pub fn thread_main(conf: PageServerConf) { runtime.block_on( async { loop { - let _res = walreceiver_main(&conf).await; + let _res = walreceiver_main(conf.clone(), wal_producer_connstr).await; // TODO: print/log the error info!("WAL streaming connection failed, retrying in 1 second...: {:?}", _res); @@ -42,12 +42,12 @@ pub fn thread_main(conf: PageServerConf) { }); } -async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { +async fn walreceiver_main(conf: PageServerConf, wal_producer_connstr: &String) -> Result<(), Error> { // Connect to the database in replication mode. - debug!("connecting to {}...", conf.wal_producer_connstr); + debug!("connecting to {}...", wal_producer_connstr); let (mut rclient, connection) = connect_replication( - conf.wal_producer_connstr.as_str(), + wal_producer_connstr.as_str(), NoTls, ReplicationMode::Physical ).await?; @@ -63,13 +63,16 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { let _identify_system = rclient.identify_system().await?; + let sysid : u64 = _identify_system.systemid().parse().unwrap(); + let pcache = page_cache::get_pagecahe(conf, sysid); + // // Start streaming the WAL, from where we left off previously. // - let mut startpoint = page_cache::get_last_record_lsn(); + let mut startpoint = pcache.get_last_valid_lsn(); if startpoint == 0 { // FIXME: Or should we just error out? - page_cache::init_valid_lsn(u64::from(_identify_system.xlogpos())); + pcache.init_valid_lsn(u64::from(_identify_system.xlogpos())); startpoint = u64::from(_identify_system.xlogpos()); } else { // There might be some padding after the last full record, skip it. @@ -126,12 +129,12 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { rec: recdata.clone() }; - page_cache::put_wal_record(tag, rec); + pcache.put_wal_record(tag, rec); } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - page_cache::advance_last_record_lsn(lsn); + pcache.advance_last_valid_lsn(lsn); } else { break; @@ -144,7 +147,7 @@ async fn walreceiver_main(conf: &PageServerConf) -> Result<(), Error> { // better reflect that, because GetPage@LSN requests might also point in the // middle of a record, if the request LSN was taken from the server's current // flush ptr. - page_cache::advance_last_valid_lsn(endlsn); + pcache.advance_last_valid_lsn(endlsn); } ReplicationMessage::PrimaryKeepAlive(_keepalive) => { diff --git a/src/walredo.rs b/src/walredo.rs index 4f5a6fbfee..5f11c25a9a 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -16,7 +16,7 @@ // use tokio::runtime::Runtime; use tokio::process::{Command, Child, ChildStdin, ChildStdout}; -use std::process::Stdio; +use std::{path::PathBuf, process::Stdio}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::AsyncBufReadExt; use tokio::time::timeout; @@ -24,13 +24,14 @@ use std::io::Error; use std::cell::RefCell; use std::assert; use std::sync::{Arc}; +use std::fs; use log::*; use std::time::Instant; use std::time::Duration; use bytes::{Bytes, BytesMut, BufMut}; -use crate::page_cache::BufferTag; +use crate::{PageServerConf, page_cache::BufferTag}; use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; use crate::page_cache; @@ -40,25 +41,28 @@ static TIMEOUT: Duration = Duration::from_secs(20); // // Main entry point for the WAL applicator thread. // -pub fn wal_applicator_main() +pub fn wal_redo_main(conf: PageServerConf, sys_id: u64) { - info!("WAL redo thread started"); + info!("WAL redo thread started {}", sys_id); // We block on waiting for requests on the walredo request channel, but // use async I/O to communicate with the child process. Initialize the // runtime for the async part. let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let pcache = page_cache::get_pagecahe(conf.clone(), sys_id); + // Loop forever, handling requests as they come. - let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver; + let walredo_channel_receiver = &pcache.walredo_receiver; loop { let mut process: WalRedoProcess; + let datadir = conf.data_dir.join(format!("wal-redo/{}", sys_id)); - info!("launching WAL redo postgres process"); + info!("launching WAL redo postgres process {}", sys_id); { let _guard = runtime.enter(); - process = WalRedoProcess::launch().unwrap(); + process = WalRedoProcess::launch(&datadir, &runtime).unwrap(); } // Pretty arbitrarily, reuse the same Postgres process for 100 requests. @@ -66,10 +70,9 @@ pub fn wal_applicator_main() // using up all shared buffers in Postgres's shared buffer cache; we don't // want to write any pages to disk in the WAL redo process. for _i in 1..100 { - let request = walredo_channel_receiver.recv().unwrap(); - let result = handle_apply_request(&process, &runtime, request); + let result = handle_apply_request(&pcache, &process, &runtime, request); if result.is_err() { // On error, kill the process. break; @@ -84,11 +87,11 @@ pub fn wal_applicator_main() } } -fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) -> Result<(), Error> +fn handle_apply_request(pcache: &page_cache::PageCache, process: &WalRedoProcess, runtime: &Runtime, entry_rc: Arc) -> Result<(), Error> { let tag = entry_rc.key.tag; let lsn = entry_rc.key.lsn; - let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref()); + let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref()); let mut entry = entry_rc.content.lock().unwrap(); entry.apply_pending = false; @@ -110,7 +113,7 @@ fn handle_apply_request(process: &WalRedoProcess, runtime: &Runtime, entry_rc: A result = Err(e); } else { entry.page_image = Some(apply_result.unwrap()); - page_cache::PAGECACHE.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + pcache.num_page_images.fetch_add(1, std::sync::atomic::Ordering::Relaxed); result = Ok(()); } @@ -128,19 +131,39 @@ struct WalRedoProcess { impl WalRedoProcess { - fn launch() -> Result { - // - // Start postgres binary in special WAL redo mode. - // + // + // Start postgres binary in special WAL redo mode. + // + // Tests who run pageserver binary are setting proper PG_BIN_DIR + // and PG_LIB_DIR so that WalRedo would start right postgres. We may later + // switch to setting same things in pageserver config file. + fn launch(datadir: &PathBuf, runtime: &Runtime) -> Result { + + // Create empty data directory for wal-redo postgres deleting old one. + fs::remove_dir_all(datadir.to_str().unwrap()).ok(); + let initdb = runtime.block_on(Command::new("initdb") + .args(&["-D", datadir.to_str().unwrap()]) + .arg("-N") + .status() + ).expect("failed to execute initdb"); + + if !initdb.success() { + panic!("initdb failed"); + } + + // Start postgres itself let mut child = Command::new("postgres") .arg("--wal-redo") .stdin(Stdio::piped()) .stderr(Stdio::piped()) .stdout(Stdio::piped()) + .env("PGDATA", datadir.to_str().unwrap()) .spawn() .expect("postgres --wal-redo command failed to start"); + info!("launched WAL redo postgres process on {}", datadir.to_str().unwrap()); + let stdin = child.stdin.take().expect("failed to open child's stdin"); let stderr = child.stderr.take().expect("failed to open child's stderr"); let stdout = child.stdout.take().expect("failed to open child's stdout"); @@ -159,7 +182,7 @@ impl WalRedoProcess { if res.unwrap() == 0 { break; } - debug!("{}", line.trim()); + debug!("wal-redo-postgres: {}", line.trim()); line.clear(); } Ok::<(), Error>(()) diff --git a/tests/test_control_plane.rs b/tests/test_control_plane.rs index 8e607b83f6..8357070a36 100644 --- a/tests/test_control_plane.rs +++ b/tests/test_control_plane.rs @@ -1,21 +1,6 @@ -use pageserver::control_plane::ComputeControlPlane; #[test] fn test_actions() { - let mut cplane = ComputeControlPlane::local(); - let node = cplane.new_vanilla_node(); - - node.start(); - node.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); - node.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'"); - - let count: i64 = node - .safe_psql("postgres", "SELECT count(*) FROM t") - .first() - .unwrap() - .get(0); - - assert_eq!(count, 100000); } #[test] diff --git a/tests/test_pageserver.rs b/tests/test_pageserver.rs index 96512fe470..5ee78736d2 100644 --- a/tests/test_pageserver.rs +++ b/tests/test_pageserver.rs @@ -11,19 +11,9 @@ use pageserver::control_plane::StorageControlPlane; // Handcrafted cases with wal records that are (were) problematic for redo. #[test] fn test_redo_cases() { - // Allocate postgres instance, but don't start - let mut compute_cplane = ComputeControlPlane::local(); - // Create compute node without files, only datadir structure - let node = compute_cplane.new_minimal_node(); - // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(node.connstr()); - let pageserver_addr = storage_cplane.page_server_addr(); - - // Configure that node to take pages from pageserver - node.append_conf("postgresql.conf", format!("\ - page_server_connstring = 'host={} port={}'\n\ - ", pageserver_addr.ip(), pageserver_addr.port()).as_str()); + let storage_cplane = StorageControlPlane::one_page_server(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); // Request info needed to build control file storage_cplane.simple_query_storage("postgres", node.whoami().as_str(), "controlfile"); @@ -31,7 +21,8 @@ fn test_redo_cases() { node.setup_controlfile(); // start postgres - node.start(); + let node = compute_cplane.new_node(); + node.start(&storage_cplane); println!("await pageserver connection..."); sleep(Duration::from_secs(3)); @@ -61,30 +52,57 @@ fn test_redo_cases() { // Runs pg_regress on a compute node #[test] fn test_regress() { - // Allocate postgres instance, but don't start - let mut compute_cplane = ComputeControlPlane::local(); - let node = compute_cplane.new_vanilla_node(); - // Start pageserver that reads WAL directly from that postgres - let storage_cplane = StorageControlPlane::one_page_server(node.connstr()); - let pageserver_addr = storage_cplane.page_server_addr(); - - // Configure that node to take pages from pageserver - node.append_conf("postgresql.conf", format!("\ - page_server_connstring = 'host={} port={}'\n\ - ", pageserver_addr.ip(), pageserver_addr.port()).as_str()); + let storage_cplane = StorageControlPlane::one_page_server(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); // start postgres - node.start(); + let node = compute_cplane.new_node(); + node.start(&storage_cplane); println!("await pageserver connection..."); sleep(Duration::from_secs(3)); - ////////////////////////////////////////////////////////////////// - - pageserver::control_plane::regress_check(node); + pageserver::control_plane::regress_check(&node); } -// Runs recovery with minio +// Run two postgres instances on one pageserver #[test] -fn test_pageserver_recovery() {} +fn test_pageserver_multitenancy() { + // Start pageserver that reads WAL directly from that postgres + let storage_cplane = StorageControlPlane::one_page_server(); + let mut compute_cplane = ComputeControlPlane::local(&storage_cplane); + + // Allocate postgres instance, but don't start + let node1 = compute_cplane.new_node(); + let node2 = compute_cplane.new_node(); + node1.start(&storage_cplane); + node2.start(&storage_cplane); + + // XXX: add some extension func to postgres to check walsender conn + // XXX: or better just drop that + println!("await pageserver connection..."); + sleep(Duration::from_secs(3)); + + // check node1 + node1.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); + node1.safe_psql("postgres", "INSERT INTO t SELECT generate_series(1,100000), 'payload'"); + let count: i64 = node1 + .safe_psql("postgres", "SELECT sum(key) FROM t") + .first() + .unwrap() + .get(0); + println!("sum = {}", count); + assert_eq!(count, 5000050000); + + // check node2 + node2.safe_psql("postgres", "CREATE TABLE t(key int primary key, value text)"); + node2.safe_psql("postgres", "INSERT INTO t SELECT generate_series(100000,200000), 'payload'"); + let count: i64 = node2 + .safe_psql("postgres", "SELECT sum(key) FROM t") + .first() + .unwrap() + .get(0); + println!("sum = {}", count); + assert_eq!(count, 15000150000); +} diff --git a/vendor/postgres b/vendor/postgres index d6b1fc3603..b0fe036e41 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit d6b1fc36035a8b667fa12cf2d4ffa1d738630f43 +Subproject commit b0fe036e411f3de53c3eedd45c964d2263ad91a7