diff --git a/Cargo.lock b/Cargo.lock index 947d31bfa4..6d6cc17c8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8aebca1129a03dc6dc2b127edd729435bbc4a37e1d5f4d7513165089ceb02634" +[[package]] +name = "crossbeam-channel" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dca26ee1f8d361640700bde38b2c37d8c22b3ce2d360e1fc1c74ea4b0aa7d775" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.3" @@ -1045,6 +1055,7 @@ version = "0.1.0" dependencies = [ "byteorder", "bytes", + "crossbeam-channel", "futures", "lazy_static", "log", diff --git a/Cargo.toml b/Cargo.toml index ea4024534d..a77fdcf370 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +crossbeam-channel = "0.5.0" rand = "0.8.3" regex = "1.4.5" bytes = "1.0.1" diff --git a/src/bin/pageserver.rs b/src/bin/pageserver.rs index 8e26ade010..15e5af1236 100644 --- a/src/bin/pageserver.rs +++ b/src/bin/pageserver.rs @@ -12,19 +12,25 @@ use pageserver::walreceiver; use std::io::Error; fn main() -> Result<(), Error> { + let mut threads = Vec::new(); + // Initialize logger stderrlog::new() .verbosity(3) .module("pageserver") .init().unwrap(); info!("starting..."); - // First, restore the latest base backup from S3. (We don't persist anything - // to local disk at the moment, so we need to do this at every startup) - restore_s3::restore_main(); + // Initialize the WAL applicator + let walredo_thread = thread::spawn(|| { + walredo::wal_applicator_main(); + }); + threads.push(walredo_thread); - - let mut threads = Vec::new(); + // Before opening up for connections, restore the latest base backup from S3. + // (We don't persist anything to local disk at the moment, so we need to do + // this at every startup) + restore_s3::restore_main(); // Launch the WAL receiver thread. It will try to connect to the WAL safekeeper, // and stream the WAL. If the connection is lost, it will reconnect on its own. @@ -37,7 +43,6 @@ fn main() -> Result<(), Error> { // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) - let page_server_thread = thread::spawn(|| { // thread code page_service::thread_main(); diff --git a/src/page_cache.rs b/src/page_cache.rs index 6db4defd91..2c741abaf2 100644 --- a/src/page_cache.rs +++ b/src/page_cache.rs @@ -1,18 +1,133 @@ // // Page Cache holds all the different page versions and WAL records // -// +// The Page Cache is a BTreeMap, keyed by the RelFileNode an blocknumber, and the LSN. +// The BTreeMap is protected by a Mutex, and each cache entry is protected by another +// per-entry mutex. // +use core::ops::Bound::Included; use std::collections::{BTreeMap, HashMap}; use std::error::Error; +use std::sync::Arc; +use std::sync::Condvar; use std::sync::Mutex; use bytes::Bytes; use lazy_static::lazy_static; use rand::Rng; use log::*; -use crate::walredo; +use crossbeam_channel::unbounded; +use crossbeam_channel::{Sender, Receiver}; + +pub struct PageCache { + shared: Mutex, + + // Channel for communicating with the WAL redo process here. + pub walredo_sender: Sender>, + pub walredo_receiver: Receiver>, +} + +// +// Shared data structure, holding page cache and related auxiliary information +// +struct PageCacheShared { + + // The actual page cache + pagecache: BTreeMap>, + + // Relation n_blocks cache + // + // This hashtable should be updated together with the pagecache. Now it is + // accessed unreasonably often through the smgr_nblocks(). It is better to just + // cache it in postgres smgr and ask only on restart. + relsize_cache: HashMap, + + // What page versions do we hold in the cache? If we get GetPage with + // LSN < first_valid_lsn, that's an error because we (no longer) hold that + // page version. If we get a request > last_valid_lsn, we need to wait until + // we receive all the WAL up to the request. + // + first_valid_lsn: u64, + last_valid_lsn: u64 +} + +lazy_static! { + pub static ref PAGECACHE : PageCache = init_page_cache(); +} +fn init_page_cache() -> PageCache +{ + // Initialize the channel between the page cache and the WAL applicator + let (s, r) = unbounded(); + + PageCache { + shared: Mutex::new( + PageCacheShared { + pagecache: BTreeMap::new(), + relsize_cache: HashMap::new(), + first_valid_lsn: 0, + last_valid_lsn: 0, + }), + + walredo_sender: s, + walredo_receiver: r, + } + +} + + +// +// We store two kinds of entries in the page cache: +// +// 1. Ready-made images of the block +// 2. WAL records, to be applied on top of the "previous" entry +// +// Some WAL records will initialize the page from scratch. For such records, +// the 'will_init' flag is set. They don't need the previous page image before +// applying. The 'will_init' flag is set for records containing a full-page image, +// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages +// stored directly in the cache entry in that you still need to run the WAL redo +// routine to generate the page image. +// +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct CacheKey { + pub tag: BufferTag, + pub lsn: u64 +} + +pub struct CacheEntry { + pub key: CacheKey, + + pub content: Mutex, + + // Condition variable used by the WAL redo service, to wake up + // requester. + // + // FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar + // or something else. + pub walredo_condvar: Condvar +} + +pub struct CacheEntryContent { + pub page_image: Option, + pub wal_record: Option, + pub apply_pending: bool, +} + +impl CacheEntry { + fn new(key: CacheKey) -> CacheEntry { + CacheEntry { + key: key, + content: Mutex::new(CacheEntryContent { + page_image: None, + wal_record: None, + apply_pending: false, + }), + walredo_condvar: Condvar::new(), + } + } +} + #[derive(Eq, PartialEq, Hash, Clone, Copy)] pub struct RelTag { @@ -38,65 +153,6 @@ pub struct WALRecord { pub rec: Bytes } -// -// Shared data structure, holding page cache and related auxiliary information -// -struct PageCacheShared { - - // The actual page cache - pagecache: BTreeMap, - - // Relation n_blocks cache - // - // This hashtable should be updated together with the pagecache. Now it is - // accessed unreasonably often through the smgr_nblocks(). It is better to just - // cache it in postgres smgr and ask only on restart. - relsize_cache: HashMap, - - // What page versions do we hold in the cache? If we get GetPage with - // LSN < first_valid_lsn, that's an error because we (no longer) hold that - // page version. If we get a request > last_valid_lsn, we need to wait until - // we receive all the WAL up to the request. - // - first_valid_lsn: u64, - last_valid_lsn: u64 -} - -lazy_static! { - static ref PAGECACHE: Mutex = Mutex::new( - PageCacheShared { - pagecache: BTreeMap::new(), - relsize_cache: HashMap::new(), - first_valid_lsn: 0, - last_valid_lsn: 0, - }); -} - -// -// We store two kinds of entries in the page cache: -// -// 1. Ready-made images of the block -// 2. WAL records, to be applied on top of the "previous" entry -// -// Some WAL records will initialize the page from scratch. For such records, -// the 'will_init' flag is set. They don't need the previous page image before -// applying. The 'will_init' flag is set for records containing a full-page image, -// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages -// stored directly in the cache entry in that you still need to run the WAL redo -// routine to generate the page image. -// -#[derive(PartialEq, Eq, PartialOrd, Ord)] -pub struct CacheKey { - pub tag: BufferTag, - pub lsn: u64 -} - -#[derive(Clone)] -enum CacheEntry { - PageImage(Bytes), - - WALRecord(WALRecord) -} // Public interface functions @@ -120,66 +176,133 @@ pub fn get_page_at_lsn(tag: BufferTag, lsn: u64) -> Result }; let maxkey = CacheKey { tag: tag, - lsn: lsn + 1 + lsn: lsn }; - let shared = PAGECACHE.lock().unwrap(); + let entry_rc: Arc; + { + let shared = PAGECACHE.shared.lock().unwrap(); - if lsn > shared.last_valid_lsn { - // TODO: Wait for the WAL receiver to catch up - } - if lsn < shared.first_valid_lsn { - return Err(format!("LSN {} has already been removed", lsn))?; - } - - let pagecache = &shared.pagecache; - let entries = pagecache.range(&minkey .. &maxkey); - - let mut records: Vec = Vec::new(); - - let mut base_img: Option = None; - - for (_key, e) in entries.rev() { - match e { - CacheEntry::PageImage(img) => { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img.clone()); - break; - } - CacheEntry::WALRecord(rec) => { - records.push(rec.clone()); - - if rec.will_init { - debug!("WAL record at LSN {} initializes the page", rec.lsn); - } - } + if lsn > shared.last_valid_lsn { + // TODO: Wait for the WAL receiver to catch up } + if lsn < shared.first_valid_lsn { + return Err(format!("LSN {} has already been removed", lsn))?; + } + + let pagecache = &shared.pagecache; + + let mut entries = pagecache.range((Included(&minkey), Included(&maxkey))); + + let entry_opt = entries.next_back(); + + if entry_opt.is_none() { + static ZERO_PAGE:[u8; 8192] = [0 as u8; 8192]; + return Ok(Bytes::from_static(&ZERO_PAGE)); + /* return Err("could not find page image")?; */ + } + let (_key, entry) = entry_opt.unwrap(); + entry_rc = entry.clone(); + + // Now that we have a reference to the cache entry, drop the lock on the map. + // It's important to do this before waiting on the condition variable below, + // and better to do it as soon as possible to maximize concurrency. } - let page_img: Bytes; + // Lock the cache entry and dig the page image out of it. + let page_img; + { + let mut entry_content = entry_rc.content.lock().unwrap(); - if !records.is_empty() { - records.reverse(); + if let Some(img) = &entry_content.page_image { + assert!(!entry_content.apply_pending); + page_img = img.clone(); + } else if entry_content.wal_record.is_some() { - page_img = walredo::apply_wal_records(tag, base_img, &records)?; + // + // If this page needs to be reconstructed by applying some WAL, + // send a request to the WAL redo thread. + // + if !entry_content.apply_pending { + assert!(!entry_content.apply_pending); + entry_content.apply_pending = true; - debug!("applied {} WAL records to produce page image at LSN {}", records.len(), lsn); + let s = &PAGECACHE.walredo_sender; + s.send(entry_rc.clone())?; + } - // Here, we could put the new page image back to the page cache, to save effort if the - // same (or later) page version is requested again. It's a tradeoff though, as each - // page image consumes some memory - } else if base_img.is_some() { - page_img = base_img.unwrap(); - } else { - let zero_page = vec![0 as u8; 8192]; - page_img = Bytes::from(zero_page); - /* return Err("could not find page image")?; */ + while entry_content.apply_pending { + entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); + } + page_img = entry_content.page_image.as_ref().unwrap().clone(); + } else { + // No base image, and no WAL record. Huh? + return Err(format!("no page image or WAL record for requested page"))?; + } } return Ok(page_img); } +// +// Collect all the WAL records that are needed to reconstruct a page +// image for the given cache entry. +// +// Returns an old page image (if any), and a vector of WAL records to apply +// over it. +// +pub fn collect_records_for_apply(entry: &CacheEntry) -> (Option, Vec) +{ + // Scan the BTreeMap backwards, starting from the given entry. + let shared = PAGECACHE.shared.lock().unwrap(); + let pagecache = &shared.pagecache; + + let minkey = CacheKey { + tag: entry.key.tag, + lsn: 0 + }; + let maxkey = CacheKey { + tag: entry.key.tag, + lsn: entry.key.lsn + }; + let entries = pagecache.range((Included(&minkey), Included(&maxkey))); + + // the last entry in the range should be the CacheEntry we were given + //let _last_entry = entries.next_back(); + //assert!(last_entry == entry); + + let mut base_img: Option = None; + let mut records: Vec = Vec::new(); + + // Scan backwards, collecting the WAL records, until we hit an + // old page image. + for (_key, e) in entries.rev() { + let e = e.content.lock().unwrap(); + + if let Some(img) = &e.page_image { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img.clone()); + break; + } else if let Some(rec) = &e.wal_record { + + records.push(rec.clone()); + + // If this WAL record initializes the page, no need to dig deeper. + if rec.will_init { + debug!("WAL record at LSN {} initializes the page", rec.lsn); + break; + } + } else { + panic!("no base image and no WAL record on cache entry"); + } + } + + records.reverse(); + return (base_img, records); +} + + // // Adds a WAL record to the page cache // @@ -190,10 +313,10 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord) lsn: rec.lsn }; - let entry = CacheEntry::WALRecord(rec); + let entry = CacheEntry::new(key.clone()); + entry.content.lock().unwrap().wal_record = Some(rec); - let mut shared = PAGECACHE.lock().unwrap(); - // let pagecache = &mut shared.pagecache; + let mut shared = PAGECACHE.shared.lock().unwrap(); let rel_tag = RelTag { spcnode: tag.spcnode, @@ -206,12 +329,12 @@ pub fn put_wal_record(tag: BufferTag, rec: WALRecord) *rel_entry = tag.blknum + 1; } - let oldentry = shared.pagecache.insert(key, entry); + let oldentry = shared.pagecache.insert(key, Arc::new(entry)); assert!(oldentry.is_none()); } // -// +// Memorize a full image of a page version // pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes) { @@ -220,12 +343,13 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes) lsn: lsn }; - let entry = CacheEntry::PageImage(img); + let entry = CacheEntry::new(key.clone()); + entry.content.lock().unwrap().page_image = Some(img); - let mut shared = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); let pagecache = &mut shared.pagecache; - let oldentry = pagecache.insert(key, entry); + let oldentry = pagecache.insert(key, Arc::new(entry)); assert!(oldentry.is_none()); debug!("inserted page image for {}/{}/{}_{} blk {} at {}", @@ -235,7 +359,7 @@ pub fn put_page_image(tag: BufferTag, lsn: u64, img: Bytes) // pub fn advance_last_valid_lsn(lsn: u64) { - let mut shared = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); // Can't move backwards. assert!(lsn >= shared.last_valid_lsn); @@ -246,7 +370,7 @@ pub fn advance_last_valid_lsn(lsn: u64) // pub fn _advance_first_valid_lsn(lsn: u64) { - let mut shared = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); // Can't move backwards. assert!(lsn >= shared.first_valid_lsn); @@ -260,7 +384,7 @@ pub fn _advance_first_valid_lsn(lsn: u64) pub fn init_valid_lsn(lsn: u64) { - let mut shared = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); assert!(shared.first_valid_lsn == 0); assert!(shared.last_valid_lsn == 0); @@ -271,7 +395,7 @@ pub fn init_valid_lsn(lsn: u64) pub fn get_last_valid_lsn() -> u64 { - let shared = PAGECACHE.lock().unwrap(); + let shared = PAGECACHE.shared.lock().unwrap(); return shared.last_valid_lsn; } @@ -294,7 +418,7 @@ pub fn _test_get_page_at_lsn() let mut tag: Option = None; { - let shared = PAGECACHE.lock().unwrap(); + let shared = PAGECACHE.shared.lock().unwrap(); let pagecache = &shared.pagecache; if pagecache.is_empty() { @@ -332,7 +456,7 @@ pub fn _test_get_page_at_lsn() // the replica's current replay LSN. pub fn relsize_inc(rel: &RelTag, to: Option) { - let mut shared = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); let entry = shared.relsize_cache.entry(*rel).or_insert(0); if let Some(to) = to { @@ -344,14 +468,14 @@ pub fn relsize_inc(rel: &RelTag, to: Option) pub fn relsize_get(rel: &RelTag) -> u32 { - let mut shared = PAGECACHE.lock().unwrap(); + let mut shared = PAGECACHE.shared.lock().unwrap(); let entry = shared.relsize_cache.entry(*rel).or_insert(0); *entry } pub fn relsize_exist(rel: &RelTag) -> bool { - let shared = PAGECACHE.lock().unwrap(); + let shared = PAGECACHE.shared.lock().unwrap(); let relsize_cache = &shared.relsize_cache; relsize_cache.contains_key(rel) } diff --git a/src/page_service.rs b/src/page_service.rs index 2647dadbcd..9ed495f924 100644 --- a/src/page_service.rs +++ b/src/page_service.rs @@ -208,7 +208,9 @@ impl FeMessage { pub fn thread_main() { // Create a new thread pool - let runtime = runtime::Runtime::new().unwrap(); + //let runtime = runtime::Runtime::new().unwrap(); + let runtime = runtime::Builder::new_current_thread().enable_all().build().unwrap(); + let listen_address = "127.0.0.1:5430"; info!("Starting page server on {}", listen_address); diff --git a/src/walredo.rs b/src/walredo.rs index c0d58bf56b..81d4b81e65 100644 --- a/src/walredo.rs +++ b/src/walredo.rs @@ -14,59 +14,154 @@ // TODO: Even though the postgres code runs in a separate process, // it's not a secure sandbox. // -use std::process::{Command, Stdio}; -use std::io::{Read, Write, Error}; +use tokio::runtime::Runtime; +use tokio::process::{Command}; +use std::process::Stdio; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::AsyncBufReadExt; +use std::io::Error; use std::assert; +use std::sync::{Arc}; +use log::*; use bytes::{Bytes, BytesMut, BufMut}; use crate::page_cache::BufferTag; +use crate::page_cache::CacheEntry; use crate::page_cache::WALRecord; +use crate::page_cache; +// +// Main entry point for the WAL applicator thread. +// +pub fn wal_applicator_main() +{ + // We block on waiting for requests on the walredo request channel, but + // use async I/O to communicate with the child process. Initialize the + // runtime for the async part. + let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + + // Loop forever, handling requests as they come. + let walredo_channel_receiver = &page_cache::PAGECACHE.walredo_receiver; + loop { + let request = walredo_channel_receiver.recv().unwrap(); + + handle_apply_request(&runtime, request); + } +} + +fn handle_apply_request(runtime: &Runtime, entry_rc: Arc) +{ + let tag = entry_rc.key.tag; + let (base_img, records) = page_cache::collect_records_for_apply(entry_rc.as_ref()); + + let mut entry = entry_rc.content.lock().unwrap(); + entry.apply_pending = false; + + let apply_result = runtime.block_on(apply_wal_records(tag, base_img, records)); + + if let Err(e) = apply_result { + error!("could not apply WAL records: {}", e); + } else { + entry.page_image = Some(apply_result.unwrap()); + } + + // Wake up the requester, whether the operation succeeded or not. + entry_rc.walredo_condvar.notify_all(); +} // // Apply given WAL records ('records') over an old page image. Returns // new page image. // -pub fn apply_wal_records(tag: BufferTag, base_img: Option, records: &Vec) -> Result +async fn apply_wal_records(tag: BufferTag, base_img: Option, records: Vec) -> Result { // // Start postgres binary in special WAL redo mode. // - let mut child = + let child = Command::new("postgres") .arg("--wal-redo") .stdin(Stdio::piped()) + .stderr(Stdio::piped()) .stdout(Stdio::piped()) .spawn() .expect("postgres --wal-redo command failed to start"); - let stdin = child.stdin.as_mut().expect("Failed to open stdin"); + let mut stdin = child.stdin.expect("failed to open child's stdin"); + let stderr = child.stderr.expect("failed to open childs' stderr"); + let mut stdout = child.stdout.expect("failed to open childs' stdout"); - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) - stdin.write(&build_begin_redo_for_block_msg(tag))?; - if base_img.is_some() { - stdin.write(&build_push_page_msg(tag, base_img.unwrap()))?; - } + // + // This async block sends all the commands to the process. + // + // For reasons I don't understand, this needs to be a "move" block; + // otherwise the stdin pipe doesn't get closed, despite the shutdown() + // call. + // + let f_stdin = async move { + // Send base image, if any. (If the record initializes the page, previous page + // version is not needed.) + stdin.write(&build_begin_redo_for_block_msg(tag)).await?; + if base_img.is_some() { + stdin.write(&build_push_page_msg(tag, base_img.unwrap())).await?; + } - // Send WAL records. - for rec in records.iter() { - let r = rec.clone(); + // Send WAL records. + for rec in records.iter() { + let r = rec.clone(); - stdin.write(&build_apply_record_msg(r.lsn, r.rec))?; - } + stdin.write(&build_apply_record_msg(r.lsn, r.rec)).await?; + } + debug!("sent {} WAL records to wal redo postgres process", records.len()); + + // Send GetPage command to get the result back + stdin.write(&build_get_page_msg(tag)).await?; + debug!("sent GetPage"); + stdin.flush().await?; + stdin.shutdown().await?; + debug!("stdin finished"); + Ok::<(), Error>(()) + }; + + // This async block reads the child's stderr, and forwards it to the logger + let f_stderr = async move { + let mut stderr_buffered = tokio::io::BufReader::new(stderr); + + let mut line = String::new(); + loop { + let res = stderr_buffered.read_line(&mut line).await; + if res.is_err() { + debug!("could not convert line to utf-8"); + continue; + } + if res.unwrap() == 0 { + break; + } + debug!("{}", line.trim()); + line.clear(); + } + Ok::<(), Error>(()) + }; // Read back new page image - stdin.write(&build_get_page_msg(tag))?; - let mut buf = vec![0u8; 8192]; - child.stdout.unwrap().read_exact(&mut buf)?; + let f_stdout = async move { + let mut buf = [0u8; 8192]; + + stdout.read_exact(&mut buf).await?; + debug!("got response"); + Ok::<[u8;8192], Error>(buf) + }; // Kill the process. This closes its stdin, which should signal the process // to terminate. TODO: SIGKILL if needed //child.wait(); - return Ok(Bytes::from(buf)); + let res = futures::try_join!(f_stdout, f_stdin, f_stderr)?; + + let buf = res.0; + + Ok::(Bytes::from(std::vec::Vec::from(buf))) } fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes