From 8beaf76c85693133001a0b33b867a7028e9c30b9 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 23 Apr 2021 14:10:43 -0700 Subject: [PATCH 1/7] SeqWait: don't do wakeups under the lock Clippy pointed out that `drop(waiters)` didn't do anything, because there was a misplaced ";" causing `waiters` to be a unit type `()`. This change makes it do what was intended: the lock should be dropped first, then the wakeups should be processed. --- zenith_utils/src/seqwait.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index bd94b8b350..633a862fde 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -63,7 +63,7 @@ impl SeqWait { // This will steal the entire waiters map. // When we drop it all waiters will be woken. - mem::take(&mut internal.waiters); + mem::take(&mut internal.waiters) // Drop the lock as we exit this scope. }; From 6dfe196c406a1046c23d5d33e1e702685ba14bd6 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Thu, 22 Apr 2021 15:35:27 -0700 Subject: [PATCH 2/7] add .zenith to .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 768b75b413..8d2f8277b5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /tmp_install /tmp_check_cli .vscode +.zenith From fe79082e29ea083863ed61d23a9acb94fcb95fe0 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 23 Apr 2021 14:46:50 -0700 Subject: [PATCH 3/7] require documentation in seqwait.rs --- zenith_utils/src/lib.rs | 1 + zenith_utils/src/seqwait.rs | 2 ++ 2 files changed, 3 insertions(+) diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 8acd9cb84b..9de98202c6 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -1,4 +1,5 @@ //! zenith_utils is intended to be a place to put code that is shared //! between other crates in this repository. +/// SeqWait allows waiting for a future sequence number to arrive pub mod seqwait; diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index 633a862fde..c30304ab6a 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -1,3 +1,5 @@ +#![warn(missing_docs)] + use std::collections::BTreeMap; use std::mem; use std::sync::Mutex; From 93d7d2ae2ab1f4b198070020d24b7845a4bf411a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 24 Apr 2021 12:24:04 +0300 Subject: [PATCH 4/7] Refactor pagecache <-> Wal redo communication After the rocksdb patch (commit 6aa38d3f7d), the CacheEntry struct was used only momentarily in the communication between the page_cache and the walredo modules. It was in fact not stored in any cache anymore. For clarity, refactor the communication. There is now a WalRedoManager struct, with `request_redo` function, that can be used to request WAL replay of a particular page. It sends a request to a queue like before, but the queue has been replaced with tokio::sync::mpsc. Previously, the resulting page image was stored directly in the CacheEntry, and the requestor was notified using a condition variable. Now, the requestor includes a 'oneshot' channel in the request, and the WAL redo manager sends the response there. --- Cargo.lock | 1 - pageserver/Cargo.toml | 1 - pageserver/src/page_cache.rs | 127 +++----- pageserver/src/walredo.rs | 566 +++++++++++++++++++++-------------- 4 files changed, 376 insertions(+), 319 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6d668d58b2..da38779279 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1381,7 +1381,6 @@ dependencies = [ "chrono", "clap", "crc32c", - "crossbeam-channel", "daemonize", "futures", "hex", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index a198f6403a..5309a77591 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" [dependencies] chrono = "0.4.19" -crossbeam-channel = "0.5.0" rand = "0.8.3" regex = "1.4.5" bytes = "1.0.1" diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8060b385f1..6132731688 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -7,11 +7,9 @@ use crate::restore_local_repo::restore_timeline; use crate::waldecoder::Oid; use crate::ZTimelineId; -use crate::{walredo, zenith_repo_dir, PageServerConf}; +use crate::{zenith_repo_dir, PageServerConf}; use anyhow::{bail, Context}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crossbeam_channel::unbounded; -use crossbeam_channel::{Receiver, Sender}; use lazy_static::lazy_static; use log::*; use rocksdb; @@ -19,11 +17,12 @@ use std::cmp::min; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; use zenith_utils::seqwait::SeqWait; +use crate::walredo::WalRedoManager; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -34,9 +33,8 @@ pub struct PageCache { // RocksDB handle db: rocksdb::DB, - // Channel for communicating with the WAL redo process here. - pub walredo_sender: Sender>, - pub walredo_receiver: Receiver>, + // WAL redo manager + walredo_mgr: WalRedoManager, // Allows .await on the arrival of a particular LSN. seqwait_lsn: SeqWait, @@ -131,20 +129,11 @@ pub fn get_or_restore_pagecache( let result = Arc::new(pcache); + // Launch the WAL redo thread + result.walredo_mgr.launch(result.clone()); + pcaches.insert(timelineid, result.clone()); - // Initialize the WAL redo thread - // - // Now join_handle is not saved any where and we won'try restart tharead - // if it is dead. We may later stop that treads after some inactivity period - // and restart them on demand. - let conf_copy = conf.clone(); - let _walredo_thread = thread::Builder::new() - .name("WAL redo thread".into()) - .spawn(move || { - walredo::wal_redo_main(&conf_copy, timelineid); - }) - .unwrap(); if conf.gc_horizon != 0 { let conf_copy = conf.clone(); let _gc_thread = thread::Builder::new() @@ -162,7 +151,10 @@ pub fn get_or_restore_pagecache( fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { info!("Garbage collection thread started {}", timelineid); let pcache = get_pagecache(conf, timelineid).unwrap(); - pcache.do_gc(conf).unwrap(); + + let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + + runtime.block_on(pcache.do_gc(conf)).unwrap(); } fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { @@ -176,20 +168,19 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB } fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { - // Initialize the channel between the page cache and the WAL applicator - let (s, r) = unbounded(); PageCache { - db: open_rocksdb(&conf, timelineid), shared: Mutex::new(PageCacheShared { first_valid_lsn: 0, last_valid_lsn: 0, last_record_lsn: 0, }), - seqwait_lsn: SeqWait::new(0), - walredo_sender: s, - walredo_receiver: r, + db: open_rocksdb(&conf, timelineid), + + walredo_mgr: WalRedoManager::new(conf, timelineid), + + seqwait_lsn: SeqWait::new(0), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), @@ -234,19 +225,6 @@ impl CacheKey { } } -pub struct CacheEntry { - pub key: CacheKey, - - pub content: Mutex, - - // Condition variable used by the WAL redo service, to wake up - // requester. - // - // FIXME: this takes quite a lot of space. Consider using parking_lot::Condvar - // or something else. - pub walredo_condvar: Condvar, -} - pub struct CacheEntryContent { pub page_image: Option, pub wal_record: Option, @@ -283,16 +261,6 @@ impl CacheEntryContent { } } -impl CacheEntry { - fn new(key: CacheKey, content: CacheEntryContent) -> CacheEntry { - CacheEntry { - key, - content: Mutex::new(content), - walredo_condvar: Condvar::new(), - } - } -} - #[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] pub struct RelTag { pub spcnode: u32, @@ -378,7 +346,8 @@ impl WALRecord { // Public interface functions impl PageCache { - fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + let mut minbuf = BytesMut::new(); let mut maxbuf = BytesMut::new(); let cf = self @@ -429,10 +398,10 @@ impl PageCache { minkey.lsn = 0; // reconstruct most recent page version - if content.wal_record.is_some() { - trace!("Reconstruct most recent page {:?}", key); + if let Some(rec) = content.wal_record { + trace!("Reconstruct most recent page {:?}", key); // force reconstruction of most recent page version - self.reconstruct_page(key, content)?; + self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; reconstructed += 1; } @@ -449,12 +418,13 @@ impl PageCache { minbuf.clear(); minbuf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut minbuf); - if content.wal_record.is_some() { + if let Some(rec) = content.wal_record { minbuf.clear(); minbuf.extend_from_slice(&k); let key = CacheKey::unpack(&mut minbuf); + trace!("Reconstruct horizon page {:?}", key); - self.reconstruct_page(key, content)?; + self.walredo_mgr.request_redo(key.tag, rec.lsn).await?; truncated += 1; } } @@ -475,31 +445,6 @@ impl PageCache { } } - fn reconstruct_page(&self, key: CacheKey, content: CacheEntryContent) -> anyhow::Result { - let entry_rc = Arc::new(CacheEntry::new(key.clone(), content)); - - let mut entry_content = entry_rc.content.lock().unwrap(); - entry_content.apply_pending = true; - - let s = &self.walredo_sender; - s.send(entry_rc.clone())?; - - while entry_content.apply_pending { - entry_content = entry_rc.walredo_condvar.wait(entry_content).unwrap(); - } - // We should now have a page image. If we don't, it means that WAL redo - // failed to reconstruct it. WAL redo should've logged that error already. - let page_img = match &entry_content.page_image { - Some(p) => p.clone(), - None => { - error!("could not apply WAL to reconstruct page image for GetPage@LSN request"); - bail!("could not apply WAL to reconstruct page image"); - } - }; - self.put_page_image(key.tag, key.lsn, page_img.clone()); - Ok(page_img) - } - async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { let mut lsn = req_lsn; //When invalid LSN is requested, it means "don't wait, return latest version of the page" @@ -561,7 +506,7 @@ impl PageCache { return Ok(Bytes::from_static(&ZERO_PAGE)); /* return Err("could not find page image")?; */ } - let (k, v) = entry_opt.unwrap(); + let (_k, v) = entry_opt.unwrap(); buf.clear(); buf.extend_from_slice(&v); let content = CacheEntryContent::unpack(&mut buf); @@ -569,10 +514,9 @@ impl PageCache { if let Some(img) = &content.page_image { page_img = img.clone(); } else if content.wal_record.is_some() { - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - page_img = self.reconstruct_page(key, content)?; + + // Request the WAL redo manager to apply the WAL records for us. + page_img = self.walredo_mgr.request_redo(tag, lsn).await?; } else { // No base image, and no WAL record. Huh? bail!("no page image or WAL record for requested page"); @@ -602,10 +546,10 @@ impl PageCache { // Returns an old page image (if any), and a vector of WAL records to apply // over it. // - pub fn collect_records_for_apply(&self, entry: &CacheEntry) -> (Option, Vec) { + pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { let minkey = CacheKey { tag: BufferTag { - rel: entry.key.tag.rel, + rel: tag.rel, blknum: 0, }, lsn: 0, @@ -617,8 +561,15 @@ impl PageCache { let mut readopts = rocksdb::ReadOptions::default(); readopts.set_iterate_lower_bound(buf.to_vec()); + let key = CacheKey { + tag: BufferTag { + rel: tag.rel, + blknum: tag.blknum, + }, + lsn: lsn, + }; buf.clear(); - entry.key.pack(&mut buf); + key.pack(&mut buf); let iter = self.db.iterator_opt( rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), readopts, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 10fa9caf8e..4110ceef43 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -23,216 +23,333 @@ use std::io::prelude::*; use std::io::Error; use std::path::PathBuf; use std::process::Stdio; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use std::time::Instant; use tokio::io::AsyncBufReadExt; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; -use tokio::runtime::Runtime; +use tokio::sync::{oneshot, mpsc}; use tokio::time::timeout; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crate::page_cache; -use crate::page_cache::CacheEntry; +use crate::page_cache::BufferTag; +use crate::page_cache::PageCache; use crate::page_cache::WALRecord; use crate::ZTimelineId; -use crate::{page_cache::BufferTag, pg_constants, PageServerConf}; +use crate::{pg_constants, PageServerConf}; static TIMEOUT: Duration = Duration::from_secs(20); -// -// Main entry point for the WAL applicator thread. -// -pub fn wal_redo_main(conf: &PageServerConf, timelineid: ZTimelineId) { - info!("WAL redo thread started {}", timelineid); +/// +/// A WAL redo manager consists of two parts: WalRedoManager, and +/// WalRedoManagerInternal. WalRedoManager is the public struct +/// that can be used to send redo requests to the manager. +/// WalRedoManagerInternal is used by the manager thread itself. +/// +pub struct WalRedoManager { + conf: PageServerConf, + timelineid: ZTimelineId, - // We block on waiting for requests on the walredo request channel, but - // use async I/O to communicate with the child process. Initialize the - // runtime for the async part. - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + request_tx: mpsc::UnboundedSender, + request_rx: Mutex>>, +} - let pcache = page_cache::get_pagecache(conf, timelineid).unwrap(); +struct WalRedoManagerInternal { + _conf: PageServerConf, + timelineid: ZTimelineId, - // Loop forever, handling requests as they come. - let walredo_channel_receiver = &pcache.walredo_receiver; - loop { - let mut process: WalRedoProcess; - let datadir = format!("wal-redo/{}", timelineid); + pcache: Arc, + request_rx: mpsc::UnboundedReceiver, +} - info!("launching WAL redo postgres process {}", timelineid); - { - let _guard = runtime.enter(); - process = WalRedoProcess::launch(&datadir, &runtime).unwrap(); +#[derive(Debug)] +struct WalRedoRequest { + tag: BufferTag, + lsn: u64, + + response_channel: oneshot::Sender>, +} + +/// An error happened in WAL redo +#[derive(Debug, thiserror::Error)] +pub enum WalRedoError { + #[error(transparent)] + IoError(#[from] std::io::Error), +} + +/// +/// Public interface of WAL redo manager +/// +impl WalRedoManager +{ + /// + /// Create a new WalRedoManager. + /// + /// This only initializes the struct. You need to call WalRedoManager::launch to + /// start the thread that processes the requests. + pub fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> WalRedoManager { + + let (tx, rx) = mpsc::unbounded_channel(); + + WalRedoManager { + conf: conf.clone(), + timelineid, + request_tx: tx, + request_rx: Mutex::new(Some(rx)) } - info!("WAL redo postgres started"); + } - // Pretty arbitrarily, reuse the same Postgres process for 100 requests. - // After that, kill it and start a new one. This is mostly to avoid - // using up all shared buffers in Postgres's shared buffer cache; we don't - // want to write any pages to disk in the WAL redo process. - for _i in 1..100000 { - let request = walredo_channel_receiver.recv().unwrap(); + /// + /// Launch the WAL redo thread + /// + pub fn launch(&self, pcache: Arc) { - let result = handle_apply_request(&pcache, &process, &runtime, request); - if result.is_err() { - // Something went wrong with handling the request. It's not clear - // if the request was faulty, and the next request would succeed - // again, or if the 'postgres' process went haywire. To be safe, - // kill the 'postgres' process so that we will start from a clean - // slate, with a new process, for the next request. - break; - } - } + // Get mutable references to the values that we need to pass to the + // thread. + let request_rx = self.request_rx.lock().unwrap().take().unwrap(); + let conf_copy = self.conf.clone(); + let timelineid = self.timelineid; - // Time to kill the 'postgres' process. A new one will be launched on next - // iteration of the loop. - info!("killing WAL redo postgres process"); - let _ = runtime.block_on(process.stdin.get_mut().shutdown()); - let mut child = process.child; - drop(process.stdin); - let _ = runtime.block_on(child.wait()); + // Currently, the join handle is not saved anywhere and we + // won't try restart the thread if it dies. + let _walredo_thread = std::thread::Builder::new() + .name("WAL redo thread".into()) + .spawn(move || { + + // We block on waiting for requests on the walredo request channel, but + // use async I/O to communicate with the child process. Initialize the + // runtime for the async part. + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let mut internal = WalRedoManagerInternal { + _conf: conf_copy, + timelineid: timelineid, + pcache: pcache, + request_rx: request_rx, + }; + + runtime.block_on(internal.wal_redo_main()); + }) + .unwrap(); + } + + /// + /// Request the WAL redo manager to apply WAL records, to reconstruct the page image + /// of the given page version. + /// + pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result { + + // Create a channel where to receive the response + let (tx, rx) = oneshot::channel::>(); + + let request = WalRedoRequest { + tag, lsn, + response_channel: tx, + }; + + self.request_tx.send(request).expect("could not send WAL redo request"); + + rx.await.expect("could not receive response to WAL redo request") } } -fn transaction_id_set_status_bit( - xl_info: u8, - xl_rmid: u8, - xl_xid: u32, - record: WALRecord, - page: &mut BytesMut, -) { - let info = xl_info & pg_constants::XLOG_XACT_OPMASK; - let mut status = 0; - if info == pg_constants::XLOG_XACT_COMMIT { - status = pg_constants::TRANSACTION_STATUS_COMMITTED; - } else if info == pg_constants::XLOG_XACT_ABORT { - status = pg_constants::TRANSACTION_STATUS_ABORTED; - } else { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", - status, - record.lsn >> 32, - record.lsn & 0xffffffff, - record.main_data_offset, record.rec.len()); - return; - } +/// +/// WAL redo thread +/// +impl WalRedoManagerInternal { - trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", - status, - record.lsn >> 32, - record.lsn & 0xffffffff, - record.main_data_offset, record.rec.len()); + // + // Main entry point for the WAL applicator thread. + // + async fn wal_redo_main(&mut self) { - let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) - / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + info!("WAL redo thread started {}", self.timelineid); - let byteptr = &mut page[byteno..byteno + 1]; - let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) - * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + // Loop forever, handling requests as they come. + loop { + let mut process: WalRedoProcess; + let datadir = format!("wal-redo/{}", self.timelineid); - let mut curval = byteptr[0]; - curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; + info!("launching WAL redo postgres process {}", self.timelineid); - let mut byteval = [0]; - byteval[0] = curval; - byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift); - byteval[0] |= status << bshift; + process = WalRedoProcess::launch(&datadir).await.unwrap(); + info!("WAL redo postgres started"); - byteptr.copy_from_slice(&byteval); - trace!( - "xl_xid {} byteno {} curval {} byteval {}", - xl_xid, - byteno, - curval, - byteval[0] - ); -} + // Pretty arbitrarily, reuse the same Postgres process for 100000 requests. + // After that, kill it and start a new one. This is mostly to avoid + // using up all shared buffers in Postgres's shared buffer cache; we don't + // want to write any pages to disk in the WAL redo process. + for _i in 1..100000 { + let request = self.request_rx.recv().await.unwrap(); -fn handle_apply_request( - pcache: &page_cache::PageCache, - process: &WalRedoProcess, - runtime: &Runtime, - entry_rc: Arc, -) -> Result<(), Error> { - let tag = entry_rc.key.tag; - let lsn = entry_rc.key.lsn; - let (base_img, records) = pcache.collect_records_for_apply(entry_rc.as_ref()); + let result = self.handle_apply_request(&process, &request).await; + let result_ok = result.is_ok(); - let mut entry = entry_rc.content.lock().unwrap(); - assert!(entry.apply_pending); - entry.apply_pending = false; + // Send the result to the requester + let _ = request.response_channel.send(result); - let nrecords = records.len(); - - let start = Instant::now(); - - let apply_result: Result; - if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { - //TODO use base image if any - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - let zero_page_bytes: &[u8] = &ZERO_PAGE; - let mut page = BytesMut::from(zero_page_bytes); - - for record in records { - let mut buf = record.rec.clone(); - - // 1. Parse XLogRecord struct - // FIXME: refactor to avoid code duplication. - let _xl_tot_len = buf.get_u32_le(); - let xl_xid = buf.get_u32_le(); - let _xl_prev = buf.get_u64_le(); - let xl_info = buf.get_u8(); - let xl_rmid = buf.get_u8(); - buf.advance(2); // 2 bytes of padding - let _xl_crc = buf.get_u32_le(); - - if xl_rmid == pg_constants::RM_CLOG_ID { - let info = xl_info & !pg_constants::XLR_INFO_MASK; - if info == pg_constants::CLOG_ZEROPAGE { - page.clone_from_slice(zero_page_bytes); - trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", - record.lsn >> 32, - record.lsn & 0xffffffff, - record.main_data_offset, record.rec.len()); + if !result_ok { + // Something went wrong with handling the request. It's not clear + // if the request was faulty, and the next request would succeed + // again, or if the 'postgres' process went haywire. To be safe, + // kill the 'postgres' process so that we will start from a clean + // slate, with a new process, for the next request. + break; } - } else if xl_rmid == pg_constants::RM_XACT_ID { - transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page); } + + // Time to kill the 'postgres' process. A new one will be launched on next + // iteration of the loop. + info!("killing WAL redo postgres process"); + let _ = process.stdin.get_mut().shutdown().await; + let mut child = process.child; + drop(process.stdin); + let _ = child.wait().await; + } + } + + fn transaction_id_set_status_bit(&self, + xl_info: u8, + xl_rmid: u8, + xl_xid: u32, + record: WALRecord, + page: &mut BytesMut, + ) { + let info = xl_info & pg_constants::XLOG_XACT_OPMASK; + let mut status = 0; + if info == pg_constants::XLOG_XACT_COMMIT { + status = pg_constants::TRANSACTION_STATUS_COMMITTED; + } else if info == pg_constants::XLOG_XACT_ABORT { + status = pg_constants::TRANSACTION_STATUS_ABORTED; + } else { + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + return; } - apply_result = Ok::(page.freeze()); - } else { - apply_result = process.apply_wal_records(runtime, tag, base_img, records); + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + status, + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + + let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) + / pg_constants::CLOG_XACTS_PER_BYTE) as usize; + + let byteptr = &mut page[byteno..byteno + 1]; + let bshift: u8 = ((xl_xid % pg_constants::CLOG_XACTS_PER_BYTE) + * pg_constants::CLOG_BITS_PER_XACT as u32) as u8; + + let mut curval = byteptr[0]; + curval = (curval >> bshift) & pg_constants::CLOG_XACT_BITMASK; + + let mut byteval = [0]; + byteval[0] = curval; + byteval[0] &= !(((1 << pg_constants::CLOG_BITS_PER_XACT as u8) - 1) << bshift); + byteval[0] |= status << bshift; + + byteptr.copy_from_slice(&byteval); + trace!( + "xl_xid {} byteno {} curval {} byteval {}", + xl_xid, + byteno, + curval, + byteval[0] + ); } - let duration = start.elapsed(); + /// + /// Process one request for WAL redo. + /// + async fn handle_apply_request(&self, + process: &WalRedoProcess, + request: &WalRedoRequest, + ) -> Result { + let pcache = &self.pcache; + let tag = request.tag; + let lsn = request.lsn; + let (base_img, records) = pcache.collect_records_for_apply(tag, lsn); - let result; + let nrecords = records.len(); - trace!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", - nrecords, - duration.as_millis(), - lsn >> 32, - lsn & 0xffff_ffff - ); + let start = Instant::now(); - if let Err(e) = apply_result { - error!("could not apply WAL records: {}", e); - result = Err(e); - } else { - entry.page_image = Some(apply_result.unwrap()); - result = Ok(()); + let apply_result: Result; + if tag.rel.forknum == pg_constants::PG_XACT_FORKNUM as u8 { + //TODO use base image if any + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + let zero_page_bytes: &[u8] = &ZERO_PAGE; + let mut page = BytesMut::from(zero_page_bytes); + + for record in records { + let mut buf = record.rec.clone(); + + // 1. Parse XLogRecord struct + // FIXME: refactor to avoid code duplication. + let _xl_tot_len = buf.get_u32_le(); + let xl_xid = buf.get_u32_le(); + let _xl_prev = buf.get_u64_le(); + let xl_info = buf.get_u8(); + let xl_rmid = buf.get_u8(); + buf.advance(2); // 2 bytes of padding + let _xl_crc = buf.get_u32_le(); + + if xl_rmid == pg_constants::RM_CLOG_ID { + let info = xl_info & !pg_constants::XLR_INFO_MASK; + if info == pg_constants::CLOG_ZEROPAGE { + page.clone_from_slice(zero_page_bytes); + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", + record.lsn >> 32, + record.lsn & 0xffffffff, + record.main_data_offset, record.rec.len()); + } + } else if xl_rmid == pg_constants::RM_XACT_ID { + self.transaction_id_set_status_bit(xl_info, xl_rmid, xl_xid, record, &mut page); + } + } + + apply_result = Ok::(page.freeze()); + } else { + apply_result = process.apply_wal_records(tag, base_img, records).await; + } + + let duration = start.elapsed(); + + let result: Result; + + trace!( + "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", + nrecords, + duration.as_millis(), + lsn >> 32, + lsn & 0xffff_ffff + ); + + if let Err(e) = apply_result { + error!("could not apply WAL records: {}", e); + result = Err(WalRedoError::IoError(e)); + } else { + let img = apply_result.unwrap(); + + // Should we put the image back to the page cache? We don't have to, + // but then the next GetPage@LSN call will have to do the WAL redo + // again + pcache.put_page_image(tag, lsn, img.clone()); + + result = Ok(img); + } + + // The caller is responsible for sending the response + result } - - // Wake up the requester, whether the operation succeeded or not. - entry_rc.walredo_condvar.notify_all(); - - result } struct WalRedoProcess { @@ -248,17 +365,14 @@ impl WalRedoProcess { // Tests who run pageserver binary are setting proper PG_BIN_DIR // and PG_LIB_DIR so that WalRedo would start right postgres. We may later // switch to setting same things in pageserver config file. - fn launch(datadir: &str, runtime: &Runtime) -> Result { + async fn launch(datadir: &str) -> Result { // Create empty data directory for wal-redo postgres deleting old one. fs::remove_dir_all(datadir).ok(); - let initdb = runtime - .block_on( - Command::new("initdb") - .args(&["-D", datadir]) - .arg("-N") - .output(), - ) - .expect("failed to execute initdb"); + let initdb = Command::new("initdb") + .args(&["-D", datadir]) + .arg("-N") + .output() + .await.expect("failed to execute initdb"); if !initdb.status.success() { panic!( @@ -322,82 +436,76 @@ impl WalRedoProcess { // Apply given WAL records ('records') over an old page image. Returns // new page image. // - fn apply_wal_records( - &self, - runtime: &Runtime, + async fn apply_wal_records(&self, tag: BufferTag, base_img: Option, records: Vec, - ) -> Result { + ) -> Result { let mut stdin = self.stdin.borrow_mut(); let mut stdout = self.stdout.borrow_mut(); - runtime.block_on(async { - // - // This async block sends all the commands to the process. - // - // For reasons I don't understand, this needs to be a "move" block; - // otherwise the stdin pipe doesn't get closed, despite the shutdown() - // call. - // - let f_stdin = async { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) + + let f_stdin = async { + // Send base image, if any. (If the record initializes the page, previous page + // version is not needed.) + timeout( + TIMEOUT, + stdin.write_all(&build_begin_redo_for_block_msg(tag)), + ) + .await??; + if base_img.is_some() { timeout( TIMEOUT, - stdin.write_all(&build_begin_redo_for_block_msg(tag)), + stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), ) - .await??; - if base_img.is_some() { - timeout( - TIMEOUT, - stdin.write_all(&build_push_page_msg(tag, base_img.unwrap())), - ) .await??; - } + } - // Send WAL records. - for rec in records.iter() { - let r = rec.clone(); + // Send WAL records. + for rec in records.iter() { + let r = rec.clone(); - stdin - .write_all(&build_apply_record_msg(r.lsn, r.rec)) - .await?; + stdin + .write_all(&build_apply_record_msg(r.lsn, r.rec)) + .await?; - //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", - // r.lsn >> 32, r.lsn & 0xffff_ffff); - } - //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", - // records.len(), lsn >> 32, lsn & 0xffff_ffff); + //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", + // r.lsn >> 32, r.lsn & 0xffff_ffff); + } + //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", + // records.len(), lsn >> 32, lsn & 0xffff_ffff); - // Send GetPage command to get the result back - timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; - timeout(TIMEOUT, stdin.flush()).await??; - //debug!("sent GetPage for {}", tag.blknum); - Ok::<(), Error>(()) - }; + // Send GetPage command to get the result back + timeout(TIMEOUT, stdin.write_all(&build_get_page_msg(tag))).await??; + timeout(TIMEOUT, stdin.flush()).await??; + //debug!("sent GetPage for {}", tag.blknum); + Ok::<(), Error>(()) + }; - // Read back new page image - let f_stdout = async { - let mut buf = [0u8; 8192]; + // Read back new page image + let f_stdout = async { + let mut buf = [0u8; 8192]; - timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; - //debug!("got response for {}", tag.blknum); - Ok::<[u8; 8192], Error>(buf) - }; + timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; + //debug!("got response for {}", tag.blknum); + Ok::<[u8; 8192], Error>(buf) + }; - // Kill the process. This closes its stdin, which should signal the process - // to terminate. TODO: SIGKILL if needed - //child.wait(); + // Kill the process. This closes its stdin, which should signal the process + // to terminate. TODO: SIGKILL if needed + //child.wait(); - let res = futures::try_join!(f_stdout, f_stdin)?; + let res = futures::try_join!(f_stdout, f_stdin)?; - let buf = res.0; + let buf = res.0; - Ok::(Bytes::from(std::vec::Vec::from(buf))) - }) + Ok::(Bytes::from(std::vec::Vec::from(buf))) } } +// Functions for constructing messages to send to the postgres WAL redo +// process. See vendor/postgres/src/backend/tcop/zenith_wal_redo.c for +// explanation of the protocol. + fn build_begin_redo_for_block_msg(tag: BufferTag) -> Bytes { let len = 4 + 5 * 4; let mut buf = BytesMut::with_capacity(1 + len); From 021462da3e2f49c1cc313e7190c28b615399e3d0 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 24 Apr 2021 17:54:26 +0300 Subject: [PATCH 5/7] Refactor put_wal_record() so that it doesn't need to be marked 'async'. It was only marked as async because it calls relsize_get(), but relsize_get() will in fact never block when it's called with the max LSN value, like put_wal_record() does. Refactor to avoid marking put_wal_record() as 'async'. --- pageserver/src/page_cache.rs | 21 ++++++++++++++------- pageserver/src/walreceiver.rs | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 6132731688..a3c6a8b703 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -634,9 +634,13 @@ impl PageCache { // Adds a relation-wide WAL record (like truncate) to the page cache, // associating it with all pages started with specified block number // - pub async fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { + pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { let mut key = CacheKey { tag, lsn: rec.lsn }; - let old_rel_size = self.relsize_get(&tag.rel, u64::MAX).await?; + + // What was the size of the relation before this record? + let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; + let content = CacheEntryContent { page_image: None, wal_record: Some(rec), @@ -762,11 +766,14 @@ impl PageCache { shared.last_record_lsn } - pub async fn relsize_get(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { - let mut lsn = req_lsn; - if lsn != u64::MAX { - lsn = self.wait_lsn(lsn).await?; - } + pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + self.wait_lsn(lsn).await?; + self.relsize_get_nowait(rel, lsn) + } + + fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + + assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); let mut key = CacheKey { tag: BufferTag { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 65c543c64e..6671c13d25 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -266,7 +266,7 @@ async fn walreceiver_main( rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; - pcache.put_rel_wal_record(tag, rec).await?; + pcache.put_rel_wal_record(tag, rec)?; } } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) From 0fc05569e0a74ad8e752dd7be44acbab42622bac Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 24 Apr 2021 17:54:28 +0300 Subject: [PATCH 6/7] Improve comments in page_cache.rs. Explain the mix of async and other functions in the page cache. --- pageserver/src/page_cache.rs | 47 +++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index a3c6a8b703..cf97b02258 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -1,7 +1,32 @@ // // Page Cache holds all the different page versions and WAL records // -// The Page Cache is currenusing RocksDB for storing wal records and full page images, keyed by the RelFileNode, blocknumber, and the LSN. +// Currently, the page cache uses RocksDB to store WAL wal records and +// full page images, keyed by the RelFileNode, blocknumber, and the +// LSN. +// +// On async vs blocking: +// +// All the functions that can block for any significant length use +// Tokio async, and are marked as 'async'. That currently includes all +// the "Get" functions that get a page image or a relation size. The +// "Put" functions that add base images or WAL records to the cache +// cannot block. +// +// However, we have a funny definition of blocking: waiting on a Mutex +// that protects the in-memory data structures is not considered as +// blocking, as those are short waits, and there is no deadlock +// risk. It is *not* OK to do I/O or wait on other threads while +// holding a Mutex, however. +// +// Another wart is that we currently consider the RocksDB operations +// to be non-blocking, and we do those while holding a lock, and +// without async. That's fantasy, as RocksDB will do I/O, possibly a +// lot of it. But that's not a correctness issue, since the RocksDB +// calls will not call back to any of the other functions in page +// server. RocksDB is just a stopgap solution, to be replaced with +// something else, so it doesn't seem worth it to wrangle those calls +// into async model. // use crate::restore_local_repo::restore_timeline; @@ -445,6 +470,9 @@ impl PageCache { } } + // + // Wait until WAL has been received up to the given LSN. + // async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { let mut lsn = req_lsn; //When invalid LSN is requested, it means "don't wait, return latest version of the page" @@ -457,6 +485,7 @@ impl PageCache { lsn ); } + self.seqwait_lsn .wait_for_timeout(lsn, TIMEOUT) .await @@ -766,11 +795,19 @@ impl PageCache { shared.last_record_lsn } + // + // Get size of relation at given LSN. + // pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { self.wait_lsn(lsn).await?; self.relsize_get_nowait(rel, lsn) } + // + // Internal function to get relation size at given LSN. + // + // The caller must ensure that WAL has been received up to 'lsn'. + // fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); @@ -848,6 +885,9 @@ impl PageCache { Ok(false) } + // + // Get statistics to be displayed in the user interface. + // pub fn get_stats(&self) -> PageCacheStats { PageCacheStats { num_entries: self.num_entries.load(Ordering::Relaxed), @@ -911,6 +951,11 @@ impl PageCache { } } +// +// Get statistics to be displayed in the user interface. +// +// This combines the stats from all PageCache instances +// pub fn get_stats() -> PageCacheStats { let pcaches = PAGECACHES.lock().unwrap(); From 5e0cc89de80e6ad2a1caefc21e7205c7e5b3baa3 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Sat, 24 Apr 2021 17:54:31 +0300 Subject: [PATCH 7/7] Re-group functions in page_cache.rs, and add comments. --- pageserver/src/page_cache.rs | 919 ++++++++++++++++++----------------- 1 file changed, 467 insertions(+), 452 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index cf97b02258..8bd2621964 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -368,9 +368,474 @@ impl WALRecord { } } -// Public interface functions - impl PageCache { + + // Public GET interface functions + + /// + /// GetPage@LSN + /// + /// Returns an 8k page image + /// + pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + + let lsn = self.wait_lsn(req_lsn).await?; + + // Look up cache entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let minkey = CacheKey { tag, lsn: 0 }; + let maxkey = CacheKey { tag, lsn }; + + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = rocksdb::ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + buf.clear(); + maxkey.pack(&mut buf); + let mut iter = self.db.iterator_opt( + rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), + readopts, + ); + let entry_opt = iter.next(); + + if entry_opt.is_none() { + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); + return Ok(Bytes::from_static(&ZERO_PAGE)); + /* return Err("could not find page image")?; */ + } + let (_k, v) = entry_opt.unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + let page_img: Bytes; + if let Some(img) = &content.page_image { + page_img = img.clone(); + } else if content.wal_record.is_some() { + + // Request the WAL redo manager to apply the WAL records for us. + page_img = self.walredo_mgr.request_redo(tag, lsn).await?; + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); + } + + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + debug!( + "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", + page_lsn_hi, + page_lsn_lo, + tag.rel.spcnode, + tag.rel.dbnode, + tag.rel.relnode, + tag.rel.forknum, + tag.blknum + ); + + Ok(page_img) + } + + /// + /// Get size of relation at given LSN. + /// + pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + self.wait_lsn(lsn).await?; + return self.relsize_get_nowait(rel, lsn); + } + + /// + /// Does relation exist at given LSN? + /// + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + let lsn = self.wait_lsn(req_lsn).await?; + + let key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, _v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + debug!("Relation {:?} exists at {}", rel, lsn); + return Ok(true); + } + } + debug!("Relation {:?} doesn't exist at {}", rel, lsn); + Ok(false) + } + + // Other public functions, for updating the page cache. + // These are used by the WAL receiver and WAL redo. + + /// + /// Collect all the WAL records that are needed to reconstruct a page + /// image for the given cache entry. + /// + /// Returns an old page image (if any), and a vector of WAL records to apply + /// over it. + /// + pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { + let minkey = CacheKey { + tag: BufferTag { + rel: tag.rel, + blknum: 0, + }, + lsn: 0, + }; + + let mut buf = BytesMut::new(); + minkey.pack(&mut buf); + + let mut readopts = rocksdb::ReadOptions::default(); + readopts.set_iterate_lower_bound(buf.to_vec()); + + let key = CacheKey { + tag: BufferTag { + rel: tag.rel, + blknum: tag.blknum, + }, + lsn: lsn, + }; + buf.clear(); + key.pack(&mut buf); + let iter = self.db.iterator_opt( + rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), + readopts, + ); + + let mut base_img: Option = None; + let mut records: Vec = Vec::new(); + + // Scan backwards, collecting the WAL records, until we hit an + // old page image. + for (_k, v) in iter { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(img) = &content.page_image { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img.clone()); + break; + } else if let Some(rec) = &content.wal_record { + records.push(rec.clone()); + + // If this WAL record initializes the page, no need to dig deeper. + if rec.will_init { + break; + } + } else { + panic!("no base image and no WAL record on cache entry"); + } + } + + records.reverse(); + (base_img, records) + } + + /// + /// Adds a WAL record to the page cache + /// + pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { + let lsn = rec.lsn; + let key = CacheKey { tag, lsn }; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, + }; + + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + let _res = self.db.put(&key_buf[..], &val_buf[..]); + //trace!("put_wal_record lsn: {}", lsn); + + self.num_entries.fetch_add(1, Ordering::Relaxed); + self.num_wal_records.fetch_add(1, Ordering::Relaxed); + } + + /// + /// Adds a relation-wide WAL record (like truncate) to the page cache, + /// associating it with all pages started with specified block number + /// + pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { + let mut key = CacheKey { tag, lsn: rec.lsn }; + + // What was the size of the relation before this record? + let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + apply_pending: false, + }; + // set new relation size + trace!("Truncate relation {:?}", tag); + let mut key_buf = BytesMut::new(); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + for blknum in tag.blknum..old_rel_size { + key_buf.clear(); + key.tag.blknum = blknum; + key.pack(&mut key_buf); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + } + let n = (old_rel_size - tag.blknum) as u64; + self.num_entries.fetch_add(n, Ordering::Relaxed); + self.num_wal_records.fetch_add(n, Ordering::Relaxed); + Ok(()) + } + + /// + /// Memorize a full image of a page version + /// + pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { + let key = CacheKey { tag, lsn }; + let content = CacheEntryContent { + page_image: Some(img), + wal_record: None, + apply_pending: false, + }; + + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + + //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", + // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); + self.num_page_images.fetch_add(1, Ordering::Relaxed); + } + + pub fn create_database( + &self, + lsn: u64, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, + ) -> anyhow::Result<()> { + let mut buf = BytesMut::new(); + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + lsn: 0, + }; + key.pack(&mut buf); + let iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Forward, + )); + let mut n = 0; + for (k, v) in iter { + buf.clear(); + buf.extend_from_slice(&k); + let mut key = CacheKey::unpack(&mut buf); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; + } + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + buf.clear(); + key.pack(&mut buf); + + self.db.put(&buf[..], v)?; + n += 1; + } + info!( + "Create database {}/{}, copy {} entries", + tablespace_id, db_id, n + ); + Ok(()) + } + + /// Remember that WAL has been received and added to the page cache up to the given LSN + pub fn advance_last_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + let oldlsn = shared.last_valid_lsn; + if lsn >= oldlsn { + shared.last_valid_lsn = lsn; + self.seqwait_lsn.advance(lsn); + + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + } else { + warn!( + "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", + oldlsn >> 32, + oldlsn & 0xffffffff, + lsn >> 32, + lsn & 0xffffffff + ); + } + } + + /// + /// Remember the (end of) last valid WAL record remembered in the page cache. + /// + /// NOTE: this updates last_valid_lsn as well. + /// + pub fn advance_last_record_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.last_valid_lsn); + assert!(lsn >= shared.last_record_lsn); + + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + self.seqwait_lsn.advance(lsn); + + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); + } + + /// + /// Remember the beginning of valid WAL. + /// + /// TODO: This should be called by garbage collection, so that if an older + /// page is requested, we will return an error to the requestor. + pub fn _advance_first_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + // Can't move backwards. + assert!(lsn >= shared.first_valid_lsn); + + // Can't overtake last_valid_lsn (except when we're + // initializing the system and last_valid_lsn hasn't been set yet. + assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + + shared.first_valid_lsn = lsn; + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + } + + pub fn init_valid_lsn(&self, lsn: u64) { + let mut shared = self.shared.lock().unwrap(); + + assert!(shared.first_valid_lsn == 0); + assert!(shared.last_valid_lsn == 0); + assert!(shared.last_record_lsn == 0); + + shared.first_valid_lsn = lsn; + shared.last_valid_lsn = lsn; + shared.last_record_lsn = lsn; + + self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_record_lsn.store(lsn, Ordering::Relaxed); + } + + pub fn get_last_valid_lsn(&self) -> u64 { + let shared = self.shared.lock().unwrap(); + + shared.last_record_lsn + } + + // + // Get statistics to be displayed in the user interface. + // + pub fn get_stats(&self) -> PageCacheStats { + PageCacheStats { + num_entries: self.num_entries.load(Ordering::Relaxed), + num_page_images: self.num_page_images.load(Ordering::Relaxed), + num_wal_records: self.num_wal_records.load(Ordering::Relaxed), + num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), + last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), + last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), + } + } + + // Internal functions + + // + // Internal function to get relation size at given LSN. + // + // The caller must ensure that WAL has been received up to 'lsn'. + // + fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + + assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); + + let mut key = CacheKey { + tag: BufferTag { + rel: *rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + + loop { + buf.clear(); + key.pack(&mut buf); + let mut iter = self.db.iterator(rocksdb::IteratorMode::From( + &buf[..], + rocksdb::Direction::Reverse, + )); + if let Some((k, v)) = iter.next() { + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == *rel { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(rec) = &content.wal_record { + if rec.truncate { + if tag.blknum > 0 { + key.tag.blknum = tag.blknum - 1; + continue; + } + break; + } + } + let relsize = tag.blknum + 1; + debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); + return Ok(relsize); + } + } + break; + } + debug!("Size of relation {:?} at {} is zero", rel, lsn); + Ok(0) + } + async fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { let mut minbuf = BytesMut::new(); @@ -499,456 +964,6 @@ impl PageCache { Ok(lsn) } - - // - // GetPage@LSN - // - // Returns an 8k page image - // - pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - - let lsn = self.wait_lsn(req_lsn).await?; - - // Look up cache entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let minkey = CacheKey { tag, lsn: 0 }; - let maxkey = CacheKey { tag, lsn }; - - let mut buf = BytesMut::new(); - minkey.pack(&mut buf); - - let mut readopts = rocksdb::ReadOptions::default(); - readopts.set_iterate_lower_bound(buf.to_vec()); - - buf.clear(); - maxkey.pack(&mut buf); - let mut iter = self.db.iterator_opt( - rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), - readopts, - ); - let entry_opt = iter.next(); - - if entry_opt.is_none() { - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); - return Ok(Bytes::from_static(&ZERO_PAGE)); - /* return Err("could not find page image")?; */ - } - let (_k, v) = entry_opt.unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - let page_img: Bytes; - if let Some(img) = &content.page_image { - page_img = img.clone(); - } else if content.wal_record.is_some() { - - // Request the WAL redo manager to apply the WAL records for us. - page_img = self.walredo_mgr.request_redo(tag, lsn).await?; - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); - } - - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - debug!( - "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", - page_lsn_hi, - page_lsn_lo, - tag.rel.spcnode, - tag.rel.dbnode, - tag.rel.relnode, - tag.rel.forknum, - tag.blknum - ); - - Ok(page_img) - } - - // - // Collect all the WAL records that are needed to reconstruct a page - // image for the given cache entry. - // - // Returns an old page image (if any), and a vector of WAL records to apply - // over it. - // - pub fn collect_records_for_apply(&self, tag: BufferTag, lsn: u64) -> (Option, Vec) { - let minkey = CacheKey { - tag: BufferTag { - rel: tag.rel, - blknum: 0, - }, - lsn: 0, - }; - - let mut buf = BytesMut::new(); - minkey.pack(&mut buf); - - let mut readopts = rocksdb::ReadOptions::default(); - readopts.set_iterate_lower_bound(buf.to_vec()); - - let key = CacheKey { - tag: BufferTag { - rel: tag.rel, - blknum: tag.blknum, - }, - lsn: lsn, - }; - buf.clear(); - key.pack(&mut buf); - let iter = self.db.iterator_opt( - rocksdb::IteratorMode::From(&buf[..], rocksdb::Direction::Reverse), - readopts, - ); - - let mut base_img: Option = None; - let mut records: Vec = Vec::new(); - - // Scan backwards, collecting the WAL records, until we hit an - // old page image. - for (_k, v) in iter { - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - if let Some(img) = &content.page_image { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img.clone()); - break; - } else if let Some(rec) = &content.wal_record { - records.push(rec.clone()); - - // If this WAL record initializes the page, no need to dig deeper. - if rec.will_init { - break; - } - } else { - panic!("no base image and no WAL record on cache entry"); - } - } - - records.reverse(); - (base_img, records) - } - - // - // Adds a WAL record to the page cache - // - pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { - let lsn = rec.lsn; - let key = CacheKey { tag, lsn }; - - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - apply_pending: false, - }; - - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - let _res = self.db.put(&key_buf[..], &val_buf[..]); - //trace!("put_wal_record lsn: {}", lsn); - - self.num_entries.fetch_add(1, Ordering::Relaxed); - self.num_wal_records.fetch_add(1, Ordering::Relaxed); - } - - // - // Adds a relation-wide WAL record (like truncate) to the page cache, - // associating it with all pages started with specified block number - // - pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { - let mut key = CacheKey { tag, lsn: rec.lsn }; - - // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); - let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; - - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - apply_pending: false, - }; - // set new relation size - trace!("Truncate relation {:?}", tag); - let mut key_buf = BytesMut::new(); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - for blknum in tag.blknum..old_rel_size { - key_buf.clear(); - key.tag.blknum = blknum; - key.pack(&mut key_buf); - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); - } - let n = (old_rel_size - tag.blknum) as u64; - self.num_entries.fetch_add(n, Ordering::Relaxed); - self.num_wal_records.fetch_add(n, Ordering::Relaxed); - Ok(()) - } - - // - // Memorize a full image of a page version - // - pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { - let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: Some(img), - wal_record: None, - apply_pending: false, - }; - - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); - - //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", - // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); - self.num_page_images.fetch_add(1, Ordering::Relaxed); - } - - // - pub fn advance_last_valid_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - let oldlsn = shared.last_valid_lsn; - if lsn >= oldlsn { - shared.last_valid_lsn = lsn; - self.seqwait_lsn.advance(lsn); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - } else { - warn!( - "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", - oldlsn >> 32, - oldlsn & 0xffffffff, - lsn >> 32, - lsn & 0xffffffff - ); - } - } - - // - // NOTE: this updates last_valid_lsn as well. - // - pub fn advance_last_record_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.last_valid_lsn); - assert!(lsn >= shared.last_record_lsn); - - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - self.seqwait_lsn.advance(lsn); - - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); - } - - // - pub fn _advance_first_valid_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - // Can't move backwards. - assert!(lsn >= shared.first_valid_lsn); - - // Can't overtake last_valid_lsn (except when we're - // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); - - shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - } - - pub fn init_valid_lsn(&self, lsn: u64) { - let mut shared = self.shared.lock().unwrap(); - - assert!(shared.first_valid_lsn == 0); - assert!(shared.last_valid_lsn == 0); - assert!(shared.last_record_lsn == 0); - - shared.first_valid_lsn = lsn; - shared.last_valid_lsn = lsn; - shared.last_record_lsn = lsn; - - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); - } - - pub fn get_last_valid_lsn(&self) -> u64 { - let shared = self.shared.lock().unwrap(); - - shared.last_record_lsn - } - - // - // Get size of relation at given LSN. - // - pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - self.wait_lsn(lsn).await?; - self.relsize_get_nowait(rel, lsn) - } - - // - // Internal function to get relation size at given LSN. - // - // The caller must ensure that WAL has been received up to 'lsn'. - // - fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - - assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); - - let mut key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - - loop { - buf.clear(); - key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, v)) = iter.next() { - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - if let Some(rec) = &content.wal_record { - if rec.truncate { - if tag.blknum > 0 { - key.tag.blknum = tag.blknum - 1; - continue; - } - break; - } - } - let relsize = tag.blknum + 1; - debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); - return Ok(relsize); - } - } - break; - } - debug!("Size of relation {:?} at {} is zero", rel, lsn); - Ok(0) - } - - pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { - let lsn = self.wait_lsn(req_lsn).await?; - - let key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); - let mut iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Reverse, - )); - if let Some((k, _v)) = iter.next() { - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - debug!("Relation {:?} exists at {}", rel, lsn); - return Ok(true); - } - } - debug!("Relation {:?} doesn't exist at {}", rel, lsn); - Ok(false) - } - - // - // Get statistics to be displayed in the user interface. - // - pub fn get_stats(&self) -> PageCacheStats { - PageCacheStats { - num_entries: self.num_entries.load(Ordering::Relaxed), - num_page_images: self.num_page_images.load(Ordering::Relaxed), - num_wal_records: self.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), - } - } - - pub fn create_database( - &self, - lsn: u64, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> anyhow::Result<()> { - let mut buf = BytesMut::new(); - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - lsn: 0, - }; - key.pack(&mut buf); - let iter = self.db.iterator(rocksdb::IteratorMode::From( - &buf[..], - rocksdb::Direction::Forward, - )); - let mut n = 0; - for (k, v) in iter { - buf.clear(); - buf.extend_from_slice(&k); - let mut key = CacheKey::unpack(&mut buf); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - buf.clear(); - key.pack(&mut buf); - - self.db.put(&buf[..], v)?; - n += 1; - } - info!( - "Create database {}/{}, copy {} entries", - tablespace_id, db_id, n - ); - Ok(()) - } } //