From b484b896b669d321675c88e55aa279a4ebca59be Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Wed, 5 May 2021 09:28:36 +0300 Subject: [PATCH] Refactor the functionality page_cache.rs. This moves things around: - The PageCache is split into two structs: Repository and Timeline. A Repository holds multiple Timelines. In order to get a page version, you must first get a reference to the Repository, then the Timeline in the repository, and finally call the get_page_at_lsn() function on the Timeline object. This sounds complicated, but because each connection from a compute node, and each WAL receiver, only deals with one timeline at a time, the callers can get the reference to the Timeline object once and hold onto it. The Timeline corresponds most closely to the old PageCache object. - Repository and Timeline are now abstract traits, so that we can support multiple implementations. I don't actually expect us to have multiple implementations for long. We have the RocksDB implementation now, but as soon as we have a different implementation that's usable, I expect that we will retire the RocksDB implementation. But I think this abstraction works as good documentation in any case: it's now easier to see what the interface for storing and loading pages from the repository is, by looking at the Repository/Timeline traits. They abstract traits are in repository.rs, and the RocksDB implementation of them is in repository/rocksdb.rs. - page_cache.rs is now a "switchboard" to get a handle to the repository. Currently, the page server can only handle one repository at a time, so there isn't much there, but in the future we might do multi-tenancy there. --- pageserver/src/bin/pageserver.rs | 4 +- pageserver/src/lib.rs | 1 + pageserver/src/page_cache.rs | 920 +-------------------------- pageserver/src/page_service.rs | 40 +- pageserver/src/repository.rs | 144 +++++ pageserver/src/repository/rocksdb.rs | 811 +++++++++++++++++++++++ pageserver/src/restore_local_repo.rs | 58 +- pageserver/src/tui.rs | 10 +- pageserver/src/walreceiver.rs | 29 +- pageserver/src/walredo.rs | 4 +- 10 files changed, 1040 insertions(+), 981 deletions(-) create mode 100644 pageserver/src/repository.rs create mode 100644 pageserver/src/repository/rocksdb.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 1572d0522a..16bdb43fd1 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -16,7 +16,7 @@ use daemonize::Daemonize; use slog::{Drain, FnValue}; -use pageserver::{page_service, tui, zenith_repo_dir, PageServerConf}; +use pageserver::{page_cache, page_service, tui, zenith_repo_dir, PageServerConf}; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; const DEFAULT_GC_PERIOD_SEC: u64 = 10; @@ -173,6 +173,8 @@ fn start_pageserver(conf: &PageServerConf) -> Result<()> { }, } + page_cache::init(conf); + // GetPage@LSN requests are served by another thread. (It uses async I/O, // but the code in page_service sets up it own thread pool for that) let conf_copy = conf.clone(); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 84e80f3ecb..63570e59cf 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -7,6 +7,7 @@ use std::time::Duration; pub mod basebackup; pub mod page_cache; pub mod page_service; +pub mod repository; pub mod restore_local_repo; pub mod tui; pub mod tui_event; diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 562bbd392b..5891444d2e 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -1,917 +1,25 @@ -// -// Page Cache holds all the different page versions and WAL records -// -// Currently, the page cache uses RocksDB to store WAL wal records and -// full page images, keyed by the RelFileNode, blocknumber, and the -// LSN. +//! This module acts as a switchboard to access different repositories managed by this +//! page server. Currently, a Page Server can only manage one repository, so there +//! isn't much here. If we implement multi-tenancy, this will probably be changed into +//! a hash map, keyed by the tenant ID. -use crate::restore_local_repo::restore_timeline; -use crate::waldecoder::Oid; -use crate::walredo::WalRedoManager; -use crate::ZTimelineId; -use crate::{zenith_repo_dir, PageServerConf}; -use anyhow::{bail, Context}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use crate::PageServerConf; +//use crate::repository::Repository; +use crate::repository::rocksdb::RocksRepository; use lazy_static::lazy_static; -use log::*; -use std::cmp::min; -use std::collections::HashMap; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::{Duration, Instant}; -use std::{convert::TryInto, ops::AddAssign}; -use zenith_utils::lsn::{AtomicLsn, Lsn}; -use zenith_utils::seqwait::SeqWait; - -// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. -static TIMEOUT: Duration = Duration::from_secs(60); - -pub struct PageCache { - // RocksDB handle - db: rocksdb::DB, - - // WAL redo manager - walredo_mgr: WalRedoManager, - - // What page versions do we hold in the cache? If we get GetPage with - // LSN < first_valid_lsn, that's an error because we (no longer) hold that - // page version. If we get a request > last_valid_lsn, we need to wait until - // we receive all the WAL up to the request. The SeqWait provides functions - // for that. - // - // last_record_lsn points to the end of last processed WAL record. - // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL - // after the end of last record, but not the whole next record yet. In the - // page cache, we care about last_valid_lsn, but if the WAL receiver needs to - // restart the streaming, it needs to restart at the end of last record, so - // we track them separately. last_record_lsn should perhaps be in - // walreceiver.rs instead of here, but it seems convenient to keep all three - // values together. - // - first_valid_lsn: AtomicLsn, - last_valid_lsn: SeqWait, - last_record_lsn: AtomicLsn, - - // Counters, for metrics collection. - pub num_entries: AtomicU64, - pub num_page_images: AtomicU64, - pub num_wal_records: AtomicU64, - pub num_getpage_requests: AtomicU64, -} - -#[derive(Clone)] -pub struct PageCacheStats { - pub num_entries: u64, - pub num_page_images: u64, - pub num_wal_records: u64, - pub num_getpage_requests: u64, -} - -impl AddAssign for PageCacheStats { - fn add_assign(&mut self, other: Self) { - self.num_entries += other.num_entries; - self.num_page_images += other.num_page_images; - self.num_wal_records += other.num_wal_records; - self.num_getpage_requests += other.num_getpage_requests; - } -} lazy_static! { - pub static ref PAGECACHES: Mutex>> = - Mutex::new(HashMap::new()); + pub static ref REPOSITORY: Mutex>> = Mutex::new(None); } -// Get Page Cache for given timeline. It is assumed to already exist. -pub fn get_pagecache(_conf: &PageServerConf, timelineid: ZTimelineId) -> Option> { - let pcaches = PAGECACHES.lock().unwrap(); +pub fn init(conf: &PageServerConf) { + let mut m = REPOSITORY.lock().unwrap(); - match pcaches.get(&timelineid) { - Some(pcache) => Some(pcache.clone()), - None => None, - } + *m = Some(Arc::new(RocksRepository::new(conf))); } -pub fn get_or_restore_pagecache( - conf: &PageServerConf, - timelineid: ZTimelineId, -) -> anyhow::Result> { - let mut pcaches = PAGECACHES.lock().unwrap(); - match pcaches.get(&timelineid) { - Some(pcache) => Ok(pcache.clone()), - None => { - let pcache = init_page_cache(conf, timelineid); - - restore_timeline(conf, &pcache, timelineid)?; - - let result = Arc::new(pcache); - - pcaches.insert(timelineid, result.clone()); - - if conf.gc_horizon != 0 { - let conf_copy = conf.clone(); - let _gc_thread = thread::Builder::new() - .name("Garbage collection thread".into()) - .spawn(move || { - gc_thread_main(&conf_copy, timelineid); - }) - .unwrap(); - } - Ok(result) - } - } -} - -fn gc_thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { - info!("Garbage collection thread started {}", timelineid); - let pcache = get_pagecache(conf, timelineid).unwrap(); - - pcache.do_gc(conf).unwrap(); -} - -fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { - let path = zenith_repo_dir().join(timelineid.to_string()); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_use_fsync(true); - opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { - if (val[0] & UNUSED_VERSION_FLAG) != 0 { - rocksdb::compaction_filter::Decision::Remove - } else { - rocksdb::compaction_filter::Decision::Keep - } - }); - rocksdb::DB::open(&opts, &path).unwrap() -} - -fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { - PageCache { - db: open_rocksdb(&conf, timelineid), - - walredo_mgr: WalRedoManager::new(conf, timelineid), - - first_valid_lsn: AtomicLsn::new(0), - last_valid_lsn: SeqWait::new(Lsn(0)), - last_record_lsn: AtomicLsn::new(0), - - num_entries: AtomicU64::new(0), - num_page_images: AtomicU64::new(0), - num_wal_records: AtomicU64::new(0), - num_getpage_requests: AtomicU64::new(0), - } -} - -// -// We store two kinds of entries in the page cache: -// -// 1. Ready-made images of the block -// 2. WAL records, to be applied on top of the "previous" entry -// -// Some WAL records will initialize the page from scratch. For such records, -// the 'will_init' flag is set. They don't need the previous page image before -// applying. The 'will_init' flag is set for records containing a full-page image, -// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages -// stored directly in the cache entry in that you still need to run the WAL redo -// routine to generate the page image. -// -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] -pub struct CacheKey { - pub tag: BufferTag, - pub lsn: Lsn, -} - -impl CacheKey { - pub fn pack(&self, buf: &mut BytesMut) { - self.tag.pack(buf); - buf.put_u64(self.lsn.0); - } - pub fn unpack(buf: &mut BytesMut) -> CacheKey { - CacheKey { - tag: BufferTag::unpack(buf), - lsn: Lsn::from(buf.get_u64()), - } - } -} - -pub struct CacheEntryContent { - pub page_image: Option, - pub wal_record: Option, -} - -const PAGE_IMAGE_FLAG: u8 = 1u8; -const UNUSED_VERSION_FLAG: u8 = 2u8; - -impl CacheEntryContent { - pub fn pack(&self, buf: &mut BytesMut) { - if let Some(image) = &self.page_image { - buf.put_u8(PAGE_IMAGE_FLAG); - buf.put_u16(image.len() as u16); - buf.put_slice(&image[..]); - } else if let Some(rec) = &self.wal_record { - buf.put_u8(0); - rec.pack(buf); - } - } - pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { - if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 { - let mut dst = vec![0u8; buf.get_u16() as usize]; - buf.copy_to_slice(&mut dst); - CacheEntryContent { - page_image: Some(Bytes::from(dst)), - wal_record: None, - } - } else { - CacheEntryContent { - page_image: None, - wal_record: Some(WALRecord::unpack(buf)), - } - } - } -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] -pub struct RelTag { - pub spcnode: u32, - pub dbnode: u32, - pub relnode: u32, - pub forknum: u8, -} - -impl RelTag { - pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u32(self.spcnode); - buf.put_u32(self.dbnode); - buf.put_u32(self.relnode); - buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres - } - pub fn unpack(buf: &mut BytesMut) -> RelTag { - RelTag { - spcnode: buf.get_u32(), - dbnode: buf.get_u32(), - relnode: buf.get_u32(), - forknum: buf.get_u32() as u8, - } - } -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] -pub struct BufferTag { - pub rel: RelTag, - pub blknum: u32, -} - -impl BufferTag { - pub fn pack(&self, buf: &mut BytesMut) { - self.rel.pack(buf); - buf.put_u32(self.blknum); - } - pub fn unpack(buf: &mut BytesMut) -> BufferTag { - BufferTag { - rel: RelTag::unpack(buf), - blknum: buf.get_u32(), - } - } -} - -#[derive(Debug, Clone)] -pub struct WALRecord { - pub lsn: Lsn, // LSN at the *end* of the record - pub will_init: bool, - pub truncate: bool, - pub rec: Bytes, - // Remember the offset of main_data in rec, - // so that we don't have to parse the record again. - // If record has no main_data, this offset equals rec.len(). - pub main_data_offset: u32, -} - -impl WALRecord { - pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u64(self.lsn.0); - buf.put_u8(self.will_init as u8); - buf.put_u8(self.truncate as u8); - buf.put_u32(self.main_data_offset); - buf.put_u32(self.rec.len() as u32); - buf.put_slice(&self.rec[..]); - } - pub fn unpack(buf: &mut BytesMut) -> WALRecord { - let lsn = Lsn::from(buf.get_u64()); - let will_init = buf.get_u8() != 0; - let truncate = buf.get_u8() != 0; - let main_data_offset = buf.get_u32(); - let mut dst = vec![0u8; buf.get_u32() as usize]; - buf.copy_to_slice(&mut dst); - WALRecord { - lsn, - will_init, - truncate, - rec: Bytes::from(dst), - main_data_offset, - } - } -} - -impl PageCache { - // Public GET interface functions - - /// - /// GetPage@LSN - /// - /// Returns an 8k page image - /// - pub fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> anyhow::Result { - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - - let lsn = self.wait_lsn(req_lsn)?; - - // Look up cache entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let key = CacheKey { tag, lsn }; - - let mut buf = BytesMut::new(); - key.pack(&mut buf); - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); - - if iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - if key.tag == tag { - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - let page_img: Bytes; - if let Some(img) = &content.page_image { - page_img = img.clone(); - } else if content.wal_record.is_some() { - // Request the WAL redo manager to apply the WAL records for us. - let (base_img, records) = self.collect_records_for_apply(tag, lsn); - page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; - - self.put_page_image(tag, lsn, page_img.clone()); - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); - } - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = - u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = - u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - debug!( - "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", - page_lsn_hi, - page_lsn_lo, - tag.rel.spcnode, - tag.rel.dbnode, - tag.rel.relnode, - tag.rel.forknum, - tag.blknum - ); - return Ok(page_img); - } - } - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); - Ok(Bytes::from_static(&ZERO_PAGE)) - /* return Err("could not find page image")?; */ - } - - /// - /// Get size of relation at given LSN. - /// - pub fn relsize_get(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { - self.wait_lsn(lsn)?; - self.relsize_get_nowait(rel, lsn) - } - - /// - /// Does relation exist at given LSN? - /// - pub fn relsize_exist(&self, rel: &RelTag, req_lsn: Lsn) -> anyhow::Result { - let lsn = self.wait_lsn(req_lsn)?; - - let key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - key.pack(&mut buf); - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); - if iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - debug!("Relation {:?} exists at {}", rel, lsn); - return Ok(true); - } - } - debug!("Relation {:?} doesn't exist at {}", rel, lsn); - Ok(false) - } - - // Other public functions, for updating the page cache. - // These are used by the WAL receiver and WAL redo. - - /// - /// Collect all the WAL records that are needed to reconstruct a page - /// image for the given cache entry. - /// - /// Returns an old page image (if any), and a vector of WAL records to apply - /// over it. - /// - pub fn collect_records_for_apply( - &self, - tag: BufferTag, - lsn: Lsn, - ) -> (Option, Vec) { - let mut buf = BytesMut::new(); - let key = CacheKey { tag, lsn }; - key.pack(&mut buf); - - let mut base_img: Option = None; - let mut records: Vec = Vec::new(); - - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); - - // Scan backwards, collecting the WAL records, until we hit an - // old page image. - while iter.valid() { - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - if key.tag != tag { - break; - } - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - if let Some(img) = &content.page_image { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img.clone()); - break; - } else if let Some(rec) = &content.wal_record { - records.push(rec.clone()); - // If this WAL record initializes the page, no need to dig deeper. - if rec.will_init { - break; - } - } else { - panic!("no base image and no WAL record on cache entry"); - } - iter.prev(); - } - records.reverse(); - (base_img, records) - } - - /// - /// Adds a WAL record to the page cache - /// - pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { - let lsn = rec.lsn; - let key = CacheKey { tag, lsn }; - - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - }; - - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - let _res = self.db.put(&key_buf[..], &val_buf[..]); - //trace!("put_wal_record lsn: {}", lsn); - - self.num_entries.fetch_add(1, Ordering::Relaxed); - self.num_wal_records.fetch_add(1, Ordering::Relaxed); - } - - /// - /// Adds a relation-wide WAL record (like truncate) to the page cache, - /// associating it with all pages started with specified block number - /// - pub fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { - let mut key = CacheKey { tag, lsn: rec.lsn }; - - // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(); - let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; - - let content = CacheEntryContent { - page_image: None, - wal_record: Some(rec), - }; - // set new relation size - trace!("Truncate relation {:?}", tag); - let mut key_buf = BytesMut::new(); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - for blknum in tag.blknum..old_rel_size { - key_buf.clear(); - key.tag.blknum = blknum; - key.pack(&mut key_buf); - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); - } - let n = (old_rel_size - tag.blknum) as u64; - self.num_entries.fetch_add(n, Ordering::Relaxed); - self.num_wal_records.fetch_add(n, Ordering::Relaxed); - Ok(()) - } - - /// - /// Memorize a full image of a page version - /// - pub fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { - let key = CacheKey { tag, lsn }; - let content = CacheEntryContent { - page_image: Some(img), - wal_record: None, - }; - - let mut key_buf = BytesMut::new(); - key.pack(&mut key_buf); - let mut val_buf = BytesMut::new(); - content.pack(&mut val_buf); - - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(&key_buf[..], &val_buf[..]); - - //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", - // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); - self.num_page_images.fetch_add(1, Ordering::Relaxed); - } - - pub fn create_database( - &self, - lsn: Lsn, - db_id: Oid, - tablespace_id: Oid, - src_db_id: Oid, - src_tablespace_id: Oid, - ) -> anyhow::Result<()> { - let mut buf = BytesMut::new(); - let key = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: src_tablespace_id, - dbnode: src_db_id, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - key.pack(&mut buf); - let mut iter = self.db.raw_iterator(); - iter.seek(&buf[..]); - let mut n = 0; - while iter.valid() { - let k = iter.key().unwrap(); - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let mut key = CacheKey::unpack(&mut buf); - if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { - break; - } - key.tag.rel.spcnode = tablespace_id; - key.tag.rel.dbnode = db_id; - key.lsn = lsn; - buf.clear(); - key.pack(&mut buf); - - self.db.put(&buf[..], v)?; - n += 1; - iter.next(); - } - info!( - "Create database {}/{}, copy {} entries", - tablespace_id, db_id, n - ); - Ok(()) - } - - /// Remember that WAL has been received and added to the page cache up to the given LSN - pub fn advance_last_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last valid LSN backwards (was {}, new {})", - old, lsn - ); - } - } - - /// - /// Remember the (end of) last valid WAL record remembered in the page cache. - /// - /// NOTE: this updates last_valid_lsn as well. - /// - pub fn advance_last_record_lsn(&self, lsn: Lsn) { - // Can't move backwards. - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old <= lsn); - - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old, lsn - ); - } - } - - /// - /// Remember the beginning of valid WAL. - /// - /// TODO: This should be called by garbage collection, so that if an older - /// page is requested, we will return an error to the requestor. - pub fn _advance_first_valid_lsn(&self, lsn: Lsn) { - // Can't overtake last_valid_lsn (except when we're - // initializing the system and last_valid_lsn hasn't been set yet. - let last_valid_lsn = self.last_valid_lsn.load(); - assert!(last_valid_lsn == Lsn(0) || lsn < last_valid_lsn); - - let old = self.first_valid_lsn.fetch_max(lsn); - // Can't move backwards. - assert!(lsn >= old); - } - - pub fn init_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - assert!(old == Lsn(0)); - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old == Lsn(0)); - let old = self.first_valid_lsn.fetch_max(lsn); - assert!(old == Lsn(0)); - } - - pub fn get_last_valid_lsn(&self) -> Lsn { - self.last_valid_lsn.load() - } - - // - // Get statistics to be displayed in the user interface. - // - pub fn get_stats(&self) -> PageCacheStats { - PageCacheStats { - num_entries: self.num_entries.load(Ordering::Relaxed), - num_page_images: self.num_page_images.load(Ordering::Relaxed), - num_wal_records: self.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - } - } - - // Internal functions - - // - // Internal function to get relation size at given LSN. - // - // The caller must ensure that WAL has been received up to 'lsn'. - // - fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { - assert!(lsn <= self.last_valid_lsn.load()); - - let mut key = CacheKey { - tag: BufferTag { - rel: *rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut buf = BytesMut::new(); - let mut iter = self.db.raw_iterator(); - - loop { - buf.clear(); - key.pack(&mut buf); - iter.seek_for_prev(&buf[..]); - if iter.valid() { - let k = iter.key().unwrap(); - let v = iter.value().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let tag = BufferTag::unpack(&mut buf); - if tag.rel == *rel { - buf.clear(); - buf.extend_from_slice(&v); - let content = CacheEntryContent::unpack(&mut buf); - if let Some(rec) = &content.wal_record { - if rec.truncate { - if tag.blknum > 0 { - key.tag.blknum = tag.blknum - 1; - continue; - } - break; - } - } - let relsize = tag.blknum + 1; - debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); - return Ok(relsize); - } - } - break; - } - debug!("Size of relation {:?} at {} is zero", rel, lsn); - Ok(0) - } - - fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { - let mut buf = BytesMut::new(); - loop { - thread::sleep(conf.gc_period); - let last_lsn = self.get_last_valid_lsn(); - - // checked_sub() returns None on overflow. - if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { - let mut maxkey = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: u32::MAX, - dbnode: u32::MAX, - relnode: u32::MAX, - forknum: u8::MAX, - }, - blknum: u32::MAX, - }, - lsn: Lsn::MAX, - }; - let now = Instant::now(); - let mut reconstructed = 0u64; - let mut truncated = 0u64; - let mut inspected = 0u64; - let mut deleted = 0u64; - loop { - buf.clear(); - maxkey.pack(&mut buf); - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(&buf[..]); - if iter.valid() { - let k = iter.key().unwrap(); - let v = iter.value().unwrap(); - - inspected += 1; - - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - - // Construct boundaries for old records cleanup - maxkey.tag = key.tag; - let last_lsn = key.lsn; - maxkey.lsn = min(horizon, last_lsn); // do not remove last version - - let mut minkey = maxkey.clone(); - minkey.lsn = Lsn(0); // first version - - // reconstruct most recent page version - if (v[0] & PAGE_IMAGE_FLAG) == 0 { - trace!("Reconstruct most recent page {:?}", key); - // force reconstruction of most recent page version - let (base_img, records) = - self.collect_records_for_apply(key.tag, key.lsn); - let new_img = self - .walredo_mgr - .request_redo(key.tag, key.lsn, base_img, records)?; - - self.put_page_image(key.tag, key.lsn, new_img.clone()); - - reconstructed += 1; - } - - buf.clear(); - maxkey.pack(&mut buf); - - iter.seek_for_prev(&buf[..]); - if iter.valid() { - // do not remove last version - if last_lsn > horizon { - // locate most recent record before horizon - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - if key.tag == maxkey.tag { - let v = iter.value().unwrap(); - if (v[0] & PAGE_IMAGE_FLAG) == 0 { - trace!("Reconstruct horizon page {:?}", key); - let (base_img, records) = - self.collect_records_for_apply(key.tag, key.lsn); - let new_img = self - .walredo_mgr - .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); - - truncated += 1; - } - } - } - // remove records prior to horizon - loop { - iter.prev(); - if !iter.valid() { - break; - } - let k = iter.key().unwrap(); - buf.clear(); - buf.extend_from_slice(&k); - let key = CacheKey::unpack(&mut buf); - if key.tag != maxkey.tag { - break; - } - let v = iter.value().unwrap(); - if (v[0] & UNUSED_VERSION_FLAG) == 0 { - let mut v = v.to_owned(); - v[0] |= UNUSED_VERSION_FLAG; - self.db.put(k, &v[..])?; - deleted += 1; - } else { - break; - } - } - } - maxkey = minkey; - } else { - break; - } - } - info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted", - now.elapsed(), inspected, reconstructed, truncated, deleted); - } - } - } - - // - // Wait until WAL has been received up to the given LSN. - // - fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { - // When invalid LSN is requested, it means "don't wait, return latest version of the page" - // This is necessary for bootstrap. - if lsn == Lsn(0) { - let last_valid_lsn = self.last_valid_lsn.load(); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - last_valid_lsn, - lsn - ); - lsn = last_valid_lsn; - } - - self.last_valid_lsn - .wait_for_timeout(lsn, TIMEOUT) - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {} to arrive", - lsn - ) - })?; - - Ok(lsn) - } -} - -// -// Get statistics to be displayed in the user interface. -// -// This combines the stats from all PageCache instances -// -pub fn get_stats() -> PageCacheStats { - let pcaches = PAGECACHES.lock().unwrap(); - - let mut stats = PageCacheStats { - num_entries: 0, - num_page_images: 0, - num_wal_records: 0, - num_getpage_requests: 0, - }; - - pcaches.iter().for_each(|(_sys_id, pcache)| { - stats += pcache.get_stats(); - }); - stats +pub fn get_repository() -> Arc { + let o = &REPOSITORY.lock().unwrap(); + Arc::clone(o.as_ref().unwrap()) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index e4e8a8d876..a75935e15d 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -24,6 +24,7 @@ use zenith_utils::lsn::Lsn; use crate::basebackup; use crate::page_cache; +use crate::repository::{BufferTag, RelTag, Repository, Timeline}; use crate::restore_local_repo; use crate::walreceiver; use crate::PageServerConf; @@ -698,8 +699,9 @@ impl Connection { let connstr: String = String::from(caps.get(2).unwrap().as_str()); // Check that the timeline exists - let pcache = page_cache::get_or_restore_pagecache(&self.conf, timelineid); - if pcache.is_err() { + let repository = page_cache::get_repository(); + let timeline = repository.get_or_restore_timeline(timelineid); + if timeline.is_err() { return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("client requested callmemaybe on timeline {} which does not exist in page server", timelineid))); @@ -731,13 +733,14 @@ impl Connection { fn handle_pagerequests(&mut self, timelineid: ZTimelineId) -> Result<()> { // Check that the timeline exists - let pcache = page_cache::get_or_restore_pagecache(&self.conf, timelineid); - if pcache.is_err() { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - format!("client requested pagestream on timeline {} which does not exist in page server", timelineid))); - } - let pcache = pcache.unwrap(); + let repository = page_cache::get_repository(); + let timeline = repository + .get_timeline(timelineid) + .map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + format!("client requested pagestream on timeline {} which does not exist in page server", timelineid)) + })?; /* switch client to COPYBOTH */ self.stream.write_u8(b'W')?; @@ -760,14 +763,14 @@ impl Connection { match message { Some(FeMessage::ZenithExistsRequest(req)) => { - let tag = page_cache::RelTag { + let tag = RelTag { spcnode: req.spcnode, dbnode: req.dbnode, relnode: req.relnode, forknum: req.forknum, }; - let exist = pcache.relsize_exist(&tag, req.lsn).unwrap_or(false); + let exist = timeline.get_relsize_exists(tag, req.lsn).unwrap_or(false); self.write_message(&BeMessage::ZenithStatusResponse(ZenithStatusResponse { ok: exist, @@ -775,14 +778,14 @@ impl Connection { }))? } Some(FeMessage::ZenithNblocksRequest(req)) => { - let tag = page_cache::RelTag { + let tag = RelTag { spcnode: req.spcnode, dbnode: req.dbnode, relnode: req.relnode, forknum: req.forknum, }; - let n_blocks = pcache.relsize_get(&tag, req.lsn).unwrap_or(0); + let n_blocks = timeline.get_relsize(tag, req.lsn).unwrap_or(0); self.write_message(&BeMessage::ZenithNblocksResponse(ZenithStatusResponse { ok: true, @@ -790,8 +793,8 @@ impl Connection { }))? } Some(FeMessage::ZenithReadRequest(req)) => { - let buf_tag = page_cache::BufferTag { - rel: page_cache::RelTag { + let buf_tag = BufferTag { + rel: RelTag { spcnode: req.spcnode, dbnode: req.dbnode, relnode: req.relnode, @@ -800,7 +803,7 @@ impl Connection { blknum: req.blkno, }; - let msg = match pcache.get_page_at_lsn(buf_tag, req.lsn) { + let msg = match timeline.get_page_at_lsn(buf_tag, req.lsn) { Ok(p) => BeMessage::ZenithReadResponse(ZenithReadResponse { ok: true, n_blocks: 0, @@ -826,8 +829,9 @@ impl Connection { fn handle_basebackup_request(&mut self, timelineid: ZTimelineId) -> Result<()> { // check that the timeline exists - let pcache = page_cache::get_or_restore_pagecache(&self.conf, timelineid); - if pcache.is_err() { + let repository = page_cache::get_repository(); + let timeline = repository.get_or_restore_timeline(timelineid); + if timeline.is_err() { return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("client requested basebackup on timeline {} which does not exist in page server", timelineid))); diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs new file mode 100644 index 0000000000..88b3698d48 --- /dev/null +++ b/pageserver/src/repository.rs @@ -0,0 +1,144 @@ +pub mod rocksdb; + +use crate::waldecoder::Oid; +use crate::ZTimelineId; +use anyhow::Result; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use std::sync::Arc; +use zenith_utils::lsn::Lsn; + +/// +/// A repository corresponds to one .zenith directory. One repository holds multiple +/// timelines, forked off from the same initial call to 'initdb'. +/// +pub trait Repository { + // FIXME: I wish these would return an abstract `&dyn Timeline` instead + fn get_timeline(&self, timelineid: ZTimelineId) -> Result>; + fn get_or_restore_timeline( + &self, + timelineid: ZTimelineId, + ) -> Result>; + + //fn get_stats(&self) -> RepositoryStats; +} + +pub trait Timeline { + + /// Look up given page in the cache. + fn get_page_at_lsn(&self, tag: BufferTag, lsn: Lsn) -> Result; + + /// Get size of relation + fn get_relsize(&self, tag: RelTag, lsn: Lsn) -> Result; + + /// Does relation exist? + fn get_relsize_exists(&self, tag: RelTag, lsn: Lsn) -> Result; + + // Functions used by WAL receiver + + fn put_wal_record(&self, tag: BufferTag, rec: WALRecord); + fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()>; + fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes); + fn create_database( + &self, + lsn: Lsn, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, + ) -> Result<()>; + + fn advance_last_record_lsn(&self, lsn: Lsn); + fn advance_last_valid_lsn(&self, lsn: Lsn); + fn init_valid_lsn(&self, lsn: Lsn); + fn get_last_valid_lsn(&self) -> Lsn; +} + +#[derive(Clone)] +pub struct RepositoryStats { + pub num_entries: Lsn, + pub num_page_images: Lsn, + pub num_wal_records: Lsn, + pub num_getpage_requests: Lsn, +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Hash, Ord, Clone, Copy)] +pub struct RelTag { + pub spcnode: u32, + pub dbnode: u32, + pub relnode: u32, + pub forknum: u8, +} + +impl RelTag { + pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u32(self.spcnode); + buf.put_u32(self.dbnode); + buf.put_u32(self.relnode); + buf.put_u32(self.forknum as u32); // encode forknum as u32 to provide compatibility with wal_redo_postgres + } + pub fn unpack(buf: &mut BytesMut) -> RelTag { + RelTag { + spcnode: buf.get_u32(), + dbnode: buf.get_u32(), + relnode: buf.get_u32(), + forknum: buf.get_u32() as u8, + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy)] +pub struct BufferTag { + pub rel: RelTag, + pub blknum: u32, +} + +impl BufferTag { + pub fn pack(&self, buf: &mut BytesMut) { + self.rel.pack(buf); + buf.put_u32(self.blknum); + } + pub fn unpack(buf: &mut BytesMut) -> BufferTag { + BufferTag { + rel: RelTag::unpack(buf), + blknum: buf.get_u32(), + } + } +} + +#[derive(Debug, Clone)] +pub struct WALRecord { + pub lsn: Lsn, // LSN at the *end* of the record + pub will_init: bool, + pub truncate: bool, + pub rec: Bytes, + // Remember the offset of main_data in rec, + // so that we don't have to parse the record again. + // If record has no main_data, this offset equals rec.len(). + pub main_data_offset: u32, +} + +impl WALRecord { + pub fn pack(&self, buf: &mut BytesMut) { + buf.put_u64(self.lsn.0); + buf.put_u8(self.will_init as u8); + buf.put_u8(self.truncate as u8); + buf.put_u32(self.main_data_offset); + buf.put_u32(self.rec.len() as u32); + buf.put_slice(&self.rec[..]); + } + pub fn unpack(buf: &mut BytesMut) -> WALRecord { + let lsn = Lsn::from(buf.get_u64()); + let will_init = buf.get_u8() != 0; + let truncate = buf.get_u8() != 0; + let main_data_offset = buf.get_u32(); + let mut dst = vec![0u8; buf.get_u32() as usize]; + buf.copy_to_slice(&mut dst); + WALRecord { + lsn, + will_init, + truncate, + rec: Bytes::from(dst), + main_data_offset, + } + } +} diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs new file mode 100644 index 0000000000..af6a4c9f1f --- /dev/null +++ b/pageserver/src/repository/rocksdb.rs @@ -0,0 +1,811 @@ +// +// A Repository holds all the different page versions and WAL records +// +// This implementation uses RocksDB to store WAL wal records and +// full page images, keyed by the RelFileNode, blocknumber, and the +// LSN. + +use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; +use crate::restore_local_repo::restore_timeline; +use crate::waldecoder::Oid; +use crate::walredo::WalRedoManager; +use crate::ZTimelineId; +use crate::{zenith_repo_dir, PageServerConf}; +use anyhow::{bail, Context, Result}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use log::*; +use std::cmp::min; +use std::collections::HashMap; +use std::convert::TryInto; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, Instant}; +use zenith_utils::lsn::{AtomicLsn, Lsn}; +use zenith_utils::seqwait::SeqWait; + +// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. +static TIMEOUT: Duration = Duration::from_secs(60); + +pub struct RocksRepository { + conf: PageServerConf, + timelines: Mutex>>, +} + +pub struct RocksTimeline { + // RocksDB handle + db: rocksdb::DB, + + // WAL redo manager + walredo_mgr: WalRedoManager, + + // What page versions do we hold in the cache? If we get GetPage with + // LSN < first_valid_lsn, that's an error because we (no longer) hold that + // page version. If we get a request > last_valid_lsn, we need to wait until + // we receive all the WAL up to the request. The SeqWait provides functions + // for that. + // + // last_record_lsn points to the end of last processed WAL record. + // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL + // after the end of last record, but not the whole next record yet. In the + // page cache, we care about last_valid_lsn, but if the WAL receiver needs to + // restart the streaming, it needs to restart at the end of last record, so + // we track them separately. last_record_lsn should perhaps be in + // walreceiver.rs instead of here, but it seems convenient to keep all three + // values together. + // + first_valid_lsn: AtomicLsn, + last_valid_lsn: SeqWait, + last_record_lsn: AtomicLsn, + + // Counters, for metrics collection. + pub num_entries: AtomicU64, + pub num_page_images: AtomicU64, + pub num_wal_records: AtomicU64, + pub num_getpage_requests: AtomicU64, +} + +// +// We store two kinds of entries in the repository: +// +// 1. Ready-made images of the block +// 2. WAL records, to be applied on top of the "previous" entry +// +// Some WAL records will initialize the page from scratch. For such records, +// the 'will_init' flag is set. They don't need the previous page image before +// applying. The 'will_init' flag is set for records containing a full-page image, +// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages +// stored directly in the cache entry in that you still need to run the WAL redo +// routine to generate the page image. +// +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +pub struct CacheKey { + pub tag: BufferTag, + pub lsn: Lsn, +} + +impl CacheKey { + pub fn pack(&self, buf: &mut BytesMut) { + self.tag.pack(buf); + buf.put_u64(self.lsn.0); + } + pub fn unpack(buf: &mut BytesMut) -> CacheKey { + CacheKey { + tag: BufferTag::unpack(buf), + lsn: Lsn::from(buf.get_u64()), + } + } +} + +pub struct CacheEntryContent { + pub page_image: Option, + pub wal_record: Option, +} + +const PAGE_IMAGE_FLAG: u8 = 1u8; +const UNUSED_VERSION_FLAG: u8 = 2u8; + +impl CacheEntryContent { + pub fn pack(&self, buf: &mut BytesMut) { + if let Some(image) = &self.page_image { + buf.put_u8(PAGE_IMAGE_FLAG); + buf.put_u16(image.len() as u16); + buf.put_slice(&image[..]); + } else if let Some(rec) = &self.wal_record { + buf.put_u8(0); + rec.pack(buf); + } + } + pub fn unpack(buf: &mut BytesMut) -> CacheEntryContent { + if (buf.get_u8() & PAGE_IMAGE_FLAG) != 0 { + let mut dst = vec![0u8; buf.get_u16() as usize]; + buf.copy_to_slice(&mut dst); + CacheEntryContent { + page_image: Some(Bytes::from(dst)), + wal_record: None, + } + } else { + CacheEntryContent { + page_image: None, + wal_record: Some(WALRecord::unpack(buf)), + } + } + } +} + +impl RocksRepository { + pub fn new(conf: &PageServerConf) -> RocksRepository { + RocksRepository { + conf: conf.clone(), + timelines: Mutex::new(HashMap::new()), + } + } +} + +// Get handle to a given timeline. It is assumed to already exist. +impl Repository for RocksRepository { + fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { + let timelines = self.timelines.lock().unwrap(); + + match timelines.get(&timelineid) { + Some(timeline) => Ok(timeline.clone()), + None => bail!("timeline not found"), + } + } + + fn get_or_restore_timeline(&self, timelineid: ZTimelineId) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + + match timelines.get(&timelineid) { + Some(timeline) => Ok(timeline.clone()), + None => { + let timeline = RocksTimeline::new(&self.conf, timelineid); + + restore_timeline(&self.conf, &timeline, timelineid)?; + + let timeline_rc = Arc::new(timeline); + + timelines.insert(timelineid, timeline_rc.clone()); + + if self.conf.gc_horizon != 0 { + let conf_copy = self.conf.clone(); + let timeline_rc_copy = timeline_rc.clone(); + let _gc_thread = thread::Builder::new() + .name("Garbage collection thread".into()) + .spawn(move || { + // FIXME + timeline_rc_copy.do_gc(&conf_copy).expect("GC thread died"); + }) + .unwrap(); + } + Ok(timeline_rc) + } + } + } +} + +impl RocksTimeline { + fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB { + let path = zenith_repo_dir().join(timelineid.to_string()); + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_use_fsync(true); + opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { + if (val[0] & UNUSED_VERSION_FLAG) != 0 { + rocksdb::compaction_filter::Decision::Remove + } else { + rocksdb::compaction_filter::Decision::Keep + } + }); + rocksdb::DB::open(&opts, &path).unwrap() + } + + fn new(conf: &PageServerConf, timelineid: ZTimelineId) -> RocksTimeline { + RocksTimeline { + db: RocksTimeline::open_rocksdb(&conf, timelineid), + + walredo_mgr: WalRedoManager::new(conf, timelineid), + + first_valid_lsn: AtomicLsn::new(0), + last_valid_lsn: SeqWait::new(Lsn(0)), + last_record_lsn: AtomicLsn::new(0), + + num_entries: AtomicU64::new(0), + num_page_images: AtomicU64::new(0), + num_wal_records: AtomicU64::new(0), + num_getpage_requests: AtomicU64::new(0), + } + } +} + +impl RocksTimeline { + /// + /// Collect all the WAL records that are needed to reconstruct a page + /// image for the given cache entry. + /// + /// Returns an old page image (if any), and a vector of WAL records to apply + /// over it. + /// + fn collect_records_for_apply( + &self, + tag: BufferTag, + lsn: Lsn, + ) -> (Option, Vec) { + let mut buf = BytesMut::new(); + let key = CacheKey { tag, lsn }; + key.pack(&mut buf); + + let mut base_img: Option = None; + let mut records: Vec = Vec::new(); + + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + + // Scan backwards, collecting the WAL records, until we hit an + // old page image. + while iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag != tag { + break; + } + let v = iter.value().unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(img) = &content.page_image { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img.clone()); + break; + } else if let Some(rec) = &content.wal_record { + records.push(rec.clone()); + // If this WAL record initializes the page, no need to dig deeper. + if rec.will_init { + break; + } + } else { + panic!("no base image and no WAL record on cache entry"); + } + iter.prev(); + } + records.reverse(); + (base_img, records) + } + + // Internal functions + + // + // Internal function to get relation size at given LSN. + // + // The caller must ensure that WAL has been received up to 'lsn'. + // + fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> anyhow::Result { + assert!(lsn <= self.last_valid_lsn.load()); + + let mut key = CacheKey { + tag: BufferTag { + rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + let mut iter = self.db.raw_iterator(); + + loop { + buf.clear(); + key.pack(&mut buf); + iter.seek_for_prev(&buf[..]); + if iter.valid() { + let k = iter.key().unwrap(); + let v = iter.value().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == rel { + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + if let Some(rec) = &content.wal_record { + if rec.truncate { + if tag.blknum > 0 { + key.tag.blknum = tag.blknum - 1; + continue; + } + break; + } + } + let relsize = tag.blknum + 1; + debug!("Size of relation {:?} at {} is {}", rel, lsn, relsize); + return Ok(relsize); + } + } + break; + } + debug!("Size of relation {:?} at {} is zero", rel, lsn); + Ok(0) + } + + fn do_gc(&self, conf: &PageServerConf) -> anyhow::Result { + let mut buf = BytesMut::new(); + loop { + thread::sleep(conf.gc_period); + let last_lsn = self.get_last_valid_lsn(); + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { + let mut maxkey = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: u32::MAX, + dbnode: u32::MAX, + relnode: u32::MAX, + forknum: u8::MAX, + }, + blknum: u32::MAX, + }, + lsn: Lsn::MAX, + }; + let now = Instant::now(); + let mut reconstructed = 0u64; + let mut truncated = 0u64; + let mut inspected = 0u64; + let mut deleted = 0u64; + loop { + buf.clear(); + maxkey.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + if iter.valid() { + let k = iter.key().unwrap(); + let v = iter.value().unwrap(); + + inspected += 1; + + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + + // Construct boundaries for old records cleanup + maxkey.tag = key.tag; + let last_lsn = key.lsn; + maxkey.lsn = min(horizon, last_lsn); // do not remove last version + + let mut minkey = maxkey.clone(); + minkey.lsn = Lsn(0); // first version + + // reconstruct most recent page version + if (v[0] & PAGE_IMAGE_FLAG) == 0 { + trace!("Reconstruct most recent page {:?}", key); + // force reconstruction of most recent page version + let (base_img, records) = + self.collect_records_for_apply(key.tag, key.lsn); + let new_img = self + .walredo_mgr + .request_redo(key.tag, key.lsn, base_img, records)?; + + self.put_page_image(key.tag, key.lsn, new_img.clone()); + + reconstructed += 1; + } + + buf.clear(); + maxkey.pack(&mut buf); + + iter.seek_for_prev(&buf[..]); + if iter.valid() { + // do not remove last version + if last_lsn > horizon { + // locate most recent record before horizon + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag == maxkey.tag { + let v = iter.value().unwrap(); + if (v[0] & PAGE_IMAGE_FLAG) == 0 { + trace!("Reconstruct horizon page {:?}", key); + let (base_img, records) = + self.collect_records_for_apply(key.tag, key.lsn); + let new_img = self + .walredo_mgr + .request_redo(key.tag, key.lsn, base_img, records)?; + self.put_page_image(key.tag, key.lsn, new_img.clone()); + + truncated += 1; + } + } + } + // remove records prior to horizon + loop { + iter.prev(); + if !iter.valid() { + break; + } + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag != maxkey.tag { + break; + } + let v = iter.value().unwrap(); + if (v[0] & UNUSED_VERSION_FLAG) == 0 { + let mut v = v.to_owned(); + v[0] |= UNUSED_VERSION_FLAG; + self.db.put(k, &v[..])?; + deleted += 1; + } else { + break; + } + } + } + maxkey = minkey; + } else { + break; + } + } + info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted", + now.elapsed(), inspected, reconstructed, truncated, deleted); + } + } + } + + // + // Wait until WAL has been received up to the given LSN. + // + fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. + if lsn == Lsn(0) { + let last_valid_lsn = self.last_valid_lsn.load(); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + last_valid_lsn, + lsn + ); + lsn = last_valid_lsn; + } + + self.last_valid_lsn + .wait_for_timeout(lsn, TIMEOUT) + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn + ) + })?; + + Ok(lsn) + } +} + +impl Timeline for RocksTimeline { + // Public GET interface functions + + /// + /// GetPage@LSN + /// + /// Returns an 8k page image + /// + fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result { + self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); + + let lsn = self.wait_lsn(req_lsn)?; + + // Look up cache entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let key = CacheKey { tag, lsn }; + + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + + if iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let key = CacheKey::unpack(&mut buf); + if key.tag == tag { + let v = iter.value().unwrap(); + buf.clear(); + buf.extend_from_slice(&v); + let content = CacheEntryContent::unpack(&mut buf); + let page_img: Bytes; + if let Some(img) = &content.page_image { + page_img = img.clone(); + } else if content.wal_record.is_some() { + // Request the WAL redo manager to apply the WAL records for us. + let (base_img, records) = self.collect_records_for_apply(tag, lsn); + page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; + + self.put_page_image(tag, lsn, page_img.clone()); + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); + } + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = + u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = + u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + debug!( + "Returning page with LSN {:X}/{:X} for {}/{}/{}.{} blk {}", + page_lsn_hi, + page_lsn_lo, + tag.rel.spcnode, + tag.rel.dbnode, + tag.rel.relnode, + tag.rel.forknum, + tag.blknum + ); + return Ok(page_img); + } + } + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + debug!("Page {:?} at {}({}) not found", tag, req_lsn, lsn); + Ok(Bytes::from_static(&ZERO_PAGE)) + /* return Err("could not find page image")?; */ + } + + /// + /// Get size of relation at given LSN. + /// + fn get_relsize(&self, rel: RelTag, lsn: Lsn) -> Result { + self.wait_lsn(lsn)?; + self.relsize_get_nowait(rel, lsn) + } + + /// + /// Does relation exist at given LSN? + /// + fn get_relsize_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { + let lsn = self.wait_lsn(req_lsn)?; + + let key = CacheKey { + tag: BufferTag { + rel, + blknum: u32::MAX, + }, + lsn, + }; + let mut buf = BytesMut::new(); + key.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(&buf[..]); + if iter.valid() { + let k = iter.key().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let tag = BufferTag::unpack(&mut buf); + if tag.rel == rel { + debug!("Relation {:?} exists at {}", rel, lsn); + return Ok(true); + } + } + debug!("Relation {:?} doesn't exist at {}", rel, lsn); + Ok(false) + } + + // Other public functions, for updating the repository. + // These are used by the WAL receiver and WAL redo. + + /// + /// Adds a WAL record to the repository + /// + fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { + let lsn = rec.lsn; + let key = CacheKey { tag, lsn }; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + }; + + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + let _res = self.db.put(&key_buf[..], &val_buf[..]); + //trace!("put_wal_record lsn: {}", lsn); + + self.num_entries.fetch_add(1, Ordering::Relaxed); + self.num_wal_records.fetch_add(1, Ordering::Relaxed); + } + + /// + /// Adds a relation-wide WAL record (like truncate) to the repository, + /// associating it with all pages started with specified block number + /// + fn put_rel_wal_record(&self, tag: BufferTag, rec: WALRecord) -> anyhow::Result<()> { + let mut key = CacheKey { tag, lsn: rec.lsn }; + + // What was the size of the relation before this record? + let last_lsn = self.last_valid_lsn.load(); + let old_rel_size = self.relsize_get_nowait(tag.rel, last_lsn)?; + + let content = CacheEntryContent { + page_image: None, + wal_record: Some(rec), + }; + // set new relation size + trace!("Truncate relation {:?}", tag); + + let mut key_buf = BytesMut::new(); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + for blknum in tag.blknum..old_rel_size { + key_buf.clear(); + key.tag.blknum = blknum; + key.pack(&mut key_buf); + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + } + let n = (old_rel_size - tag.blknum) as u64; + self.num_entries.fetch_add(n, Ordering::Relaxed); + self.num_wal_records.fetch_add(n, Ordering::Relaxed); + Ok(()) + } + + /// + /// Memorize a full image of a page version + /// + fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { + let key = CacheKey { tag, lsn }; + let content = CacheEntryContent { + page_image: Some(img), + wal_record: None, + }; + + let mut key_buf = BytesMut::new(); + key.pack(&mut key_buf); + let mut val_buf = BytesMut::new(); + content.pack(&mut val_buf); + + trace!("put_wal_record lsn: {}", key.lsn); + let _res = self.db.put(&key_buf[..], &val_buf[..]); + + //debug!("inserted page image for {}/{}/{}_{} blk {} at {}", + // tag.spcnode, tag.dbnode, tag.relnode, tag.forknum, tag.blknum, lsn); + self.num_page_images.fetch_add(1, Ordering::Relaxed); + } + + fn create_database( + &self, + lsn: Lsn, + db_id: Oid, + tablespace_id: Oid, + src_db_id: Oid, + src_tablespace_id: Oid, + ) -> anyhow::Result<()> { + let mut buf = BytesMut::new(); + let key = CacheKey { + tag: BufferTag { + rel: RelTag { + spcnode: src_tablespace_id, + dbnode: src_db_id, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + lsn: Lsn(0), + }; + key.pack(&mut buf); + let mut iter = self.db.raw_iterator(); + iter.seek(&buf[..]); + let mut n = 0; + while iter.valid() { + let k = iter.key().unwrap(); + let v = iter.value().unwrap(); + buf.clear(); + buf.extend_from_slice(&k); + let mut key = CacheKey::unpack(&mut buf); + if key.tag.rel.spcnode != src_tablespace_id || key.tag.rel.dbnode != src_db_id { + break; + } + key.tag.rel.spcnode = tablespace_id; + key.tag.rel.dbnode = db_id; + key.lsn = lsn; + buf.clear(); + key.pack(&mut buf); + + self.db.put(&buf[..], v)?; + n += 1; + iter.next(); + } + info!( + "Create database {}/{}, copy {} entries", + tablespace_id, db_id, n + ); + Ok(()) + } + + /// Remember that WAL has been received and added to the timeline up to the given LSN + fn advance_last_valid_lsn(&self, lsn: Lsn) { + let old = self.last_valid_lsn.advance(lsn); + + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last valid LSN backwards (was {}, new {})", + old, lsn + ); + } + } + + /// + /// Remember the (end of) last valid WAL record remembered for the timeline. + /// + /// NOTE: this updates last_valid_lsn as well. + /// + fn advance_last_record_lsn(&self, lsn: Lsn) { + // Can't move backwards. + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old <= lsn); + + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); + } + } + + /// + /// Remember the beginning of valid WAL. + /// + /// TODO: This should be called by garbage collection, so that if an older + /// page is requested, we will return an error to the requestor. + /* + fn _advance_first_valid_lsn(&self, lsn: Lsn) { + // Can't overtake last_valid_lsn (except when we're + // initializing the system and last_valid_lsn hasn't been set yet. + let last_valid_lsn = self.last_valid_lsn.load(); + assert!(last_valid_lsn == Lsn(0) || lsn < last_valid_lsn); + + let old = self.first_valid_lsn.fetch_max(lsn); + // Can't move backwards. + assert!(lsn >= old); + } + */ + + fn init_valid_lsn(&self, lsn: Lsn) { + let old = self.last_valid_lsn.advance(lsn); + assert!(old == Lsn(0)); + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old == Lsn(0)); + let old = self.first_valid_lsn.fetch_max(lsn); + assert!(old == Lsn(0)); + } + + fn get_last_valid_lsn(&self) -> Lsn { + self.last_valid_lsn.load() + } + + // + // Get statistics to be displayed in the user interface. + // + // FIXME + /* + fn get_stats(&self) -> TimelineStats { + TimelineStats { + num_entries: self.num_entries.load(Ordering::Relaxed), + num_page_images: self.num_page_images.load(Ordering::Relaxed), + num_wal_records: self.num_wal_records.load(Ordering::Relaxed), + num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), + } + } + */ +} diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 2882c671d3..6b3cb2da15 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -26,10 +26,7 @@ use std::path::{Path, PathBuf}; use anyhow::Result; use bytes::Bytes; -use crate::page_cache; -use crate::page_cache::BufferTag; -use crate::page_cache::PageCache; -use crate::page_cache::RelTag; +use crate::repository::{BufferTag, RelTag, Timeline, WALRecord}; use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; @@ -48,10 +45,10 @@ const GLOBALTABLESPACE_OID: u32 = 1664; // pub fn restore_timeline( conf: &PageServerConf, - pcache: &PageCache, - timeline: ZTimelineId, + timeline: &dyn Timeline, + timelineid: ZTimelineId, ) -> Result<()> { - let timelinepath = PathBuf::from("timelines").join(timeline.to_string()); + let timelinepath = PathBuf::from("timelines").join(timelineid.to_string()); if !timelinepath.exists() { anyhow::bail!("timeline {} does not exist in the page server's repository"); @@ -59,7 +56,7 @@ pub fn restore_timeline( // Scan .zenith/timelines//snapshots let snapshotspath = PathBuf::from("timelines") - .join(timeline.to_string()) + .join(timelineid.to_string()) .join("snapshots"); let mut last_snapshot_lsn: Lsn = Lsn(0); @@ -72,7 +69,7 @@ pub fn restore_timeline( // FIXME: pass filename as Path instead of str? let filename_str = filename.into_string().unwrap(); - restore_snapshot(conf, pcache, timeline, &filename_str)?; + restore_snapshot(conf, timeline, timelineid, &filename_str)?; info!("restored snapshot at {:?}", filename_str); } @@ -83,9 +80,9 @@ pub fn restore_timeline( ); // TODO return error? } - pcache.init_valid_lsn(last_snapshot_lsn); + timeline.init_valid_lsn(last_snapshot_lsn); - restore_wal(conf, pcache, timeline, last_snapshot_lsn)?; + restore_wal(timeline, timelineid, last_snapshot_lsn)?; Ok(()) } @@ -110,12 +107,12 @@ pub fn find_latest_snapshot(_conf: &PageServerConf, timeline: ZTimelineId) -> Re fn restore_snapshot( conf: &PageServerConf, - pcache: &PageCache, - timeline: ZTimelineId, + timeline: &dyn Timeline, + timelineid: ZTimelineId, snapshot: &str, ) -> Result<()> { let snapshotpath = PathBuf::from("timelines") - .join(timeline.to_string()) + .join(timelineid.to_string()) .join("snapshots") .join(snapshot); @@ -131,8 +128,6 @@ fn restore_snapshot( // Load any relation files into the page server _ => restore_relfile( - conf, - pcache, timeline, snapshot, GLOBALTABLESPACE_OID, @@ -160,8 +155,6 @@ fn restore_snapshot( // Load any relation files into the page server _ => restore_relfile( - conf, - pcache, timeline, snapshot, DEFAULTTABLESPACE_OID, @@ -175,8 +168,8 @@ fn restore_snapshot( let entry = entry?; restore_nonrelfile( conf, - pcache, timeline, + timelineid, snapshot, pg_constants::PG_XACT_FORKNUM, &entry.path(), @@ -188,9 +181,7 @@ fn restore_snapshot( } fn restore_relfile( - _conf: &PageServerConf, - pcache: &PageCache, - _timeline: ZTimelineId, + timeline: &dyn Timeline, snapshot: &str, spcoid: u32, dboid: u32, @@ -225,7 +216,7 @@ fn restore_relfile( }, blknum, }; - pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); /* if oldest_lsn == 0 || p.lsn < oldest_lsn { oldest_lsn = p.lsn; @@ -254,8 +245,8 @@ fn restore_relfile( fn restore_nonrelfile( _conf: &PageServerConf, - pcache: &PageCache, - _timeline: ZTimelineId, + timeline: &dyn Timeline, + _timelineid: ZTimelineId, snapshot: &str, forknum: u32, path: &Path, @@ -283,7 +274,7 @@ fn restore_nonrelfile( }, blknum, }; - pcache.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); /* if oldest_lsn == 0 || p.lsn < oldest_lsn { oldest_lsn = p.lsn; @@ -312,13 +303,8 @@ fn restore_nonrelfile( // Scan WAL on a timeline, starting from gien LSN, and load all the records // into the page cache. -fn restore_wal( - _conf: &PageServerConf, - pcache: &PageCache, - timeline: ZTimelineId, - startpoint: Lsn, -) -> Result<()> { - let walpath = format!("timelines/{}/wal", timeline); +fn restore_wal(timeline: &dyn Timeline, timelineid: ZTimelineId, startpoint: Lsn) -> Result<()> { + let walpath = format!("timelines/{}/wal", timelineid); let mut waldecoder = WalStreamDecoder::new(startpoint); @@ -381,7 +367,7 @@ fn restore_wal( }, blknum: blk.blkno, }; - let rec = page_cache::WALRecord { + let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, truncate: false, @@ -389,11 +375,11 @@ fn restore_wal( main_data_offset: decoded.main_data_offset as u32, }; - pcache.put_wal_record(tag, rec); + timeline.put_wal_record(tag, rec); } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - pcache.advance_last_valid_lsn(lsn); + timeline.advance_last_valid_lsn(lsn); last_lsn = lsn; } else { break; diff --git a/pageserver/src/tui.rs b/pageserver/src/tui.rs index c0e1d657b3..360b3cb876 100644 --- a/pageserver/src/tui.rs +++ b/pageserver/src/tui.rs @@ -229,7 +229,7 @@ impl<'a> Widget for LogWidget<'a> { // Render a widget to show some metrics struct MetricsWidget {} -fn get_metric_u64(title: &str, value: u64) -> Spans { +fn _get_metric_u64(title: &str, value: u64) -> Spans { Spans::from(vec![ Span::styled(format!("{:<20}", title), Style::default()), Span::raw(": "), @@ -260,9 +260,11 @@ impl tui::widgets::Widget for MetricsWidget { block.render(area, buf); + #[allow(unused_mut)] let mut lines: Vec = Vec::new(); - let page_cache_stats = crate::page_cache::get_stats(); + // FIXME + //let page_cache_stats = crate::page_cache::get_stats(); // This is not used since LSNs were removed from page cache stats. // Maybe it will be used in the future? @@ -275,7 +277,7 @@ impl tui::widgets::Widget for MetricsWidget { lines.push(get_metric_str("Valid LSN range", &lsnrange)); lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str)); */ - +/* lines.push(get_metric_u64( "# of cache entries", page_cache_stats.num_entries, @@ -292,7 +294,7 @@ impl tui::widgets::Widget for MetricsWidget { "# of GetPage@LSN calls", page_cache_stats.num_getpage_requests, )); - +*/ let text = Text::from(lines); Paragraph::new(text).render(inner_area, buf); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 4868395738..801895509c 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -7,7 +7,7 @@ //! use crate::page_cache; -use crate::page_cache::{BufferTag, RelTag}; +use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; use crate::waldecoder::*; use crate::PageServerConf; use crate::ZTimelineId; @@ -128,7 +128,7 @@ fn thread_main(conf: &PageServerConf, timelineid: ZTimelineId) { fn walreceiver_main( runtime: &Runtime, - conf: &PageServerConf, + _conf: &PageServerConf, timelineid: ZTimelineId, wal_producer_connstr: &str, ) -> Result<(), Error> { @@ -152,13 +152,14 @@ fn walreceiver_main( let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; - let pcache = page_cache::get_pagecache(&conf, timelineid).unwrap(); + let repository = page_cache::get_repository(); + let timeline = repository.get_timeline(timelineid).unwrap(); // // Start streaming the WAL, from where we left off previously. // - let mut startpoint = pcache.get_last_valid_lsn(); - let last_valid_lsn = pcache.get_last_valid_lsn(); + let mut startpoint = timeline.get_last_valid_lsn(); + let last_valid_lsn = timeline.get_last_valid_lsn(); if startpoint == Lsn(0) { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn @@ -168,7 +169,7 @@ fn walreceiver_main( // different like having 'initdb' method on a pageserver (or importing some shared // empty database snapshot), so for now I just put start of first segment which // seems to be a valid record. - pcache.init_valid_lsn(Lsn(0x0100_0000)); + timeline.init_valid_lsn(Lsn(0x0100_0000)); startpoint = Lsn(0x0100_0000); } else { // There might be some padding after the last full record, skip it. @@ -230,7 +231,7 @@ fn walreceiver_main( blknum: blk.blkno, }; - let rec = page_cache::WALRecord { + let rec = WALRecord { lsn, will_init: blk.will_init || blk.apply_image, truncate: false, @@ -238,7 +239,7 @@ fn walreceiver_main( main_data_offset: decoded.main_data_offset as u32, }; - pcache.put_wal_record(tag, rec); + timeline.put_wal_record(tag, rec); } // include truncate wal record in all pages if decoded.xl_rmid == pg_constants::RM_SMGR_ID @@ -256,21 +257,21 @@ fn walreceiver_main( }, blknum: truncate.blkno, }; - let rec = page_cache::WALRecord { + let rec = WALRecord { lsn, will_init: false, truncate: true, rec: recdata.clone(), main_data_offset: decoded.main_data_offset as u32, }; - pcache.put_rel_wal_record(tag, rec)?; + timeline.put_rel_wal_record(tag, rec)?; } } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID && (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) == pg_constants::XLOG_DBASE_CREATE { let createdb = XlCreateDatabase::decode(&decoded); - pcache.create_database( + timeline.create_database( lsn, createdb.db_id, createdb.tablespace_id, @@ -280,7 +281,7 @@ fn walreceiver_main( } // Now that this record has been handled, let the page cache know that // it is up-to-date to this LSN - pcache.advance_last_record_lsn(lsn); + timeline.advance_last_record_lsn(lsn); } else { break; } @@ -292,7 +293,7 @@ fn walreceiver_main( // better reflect that, because GetPage@LSN requests might also point in the // middle of a record, if the request LSN was taken from the server's current // flush ptr. - pcache.advance_last_valid_lsn(endlsn); + timeline.advance_last_valid_lsn(endlsn); if !caught_up && endlsn >= end_of_wal { info!("caught up at LSN {}", endlsn); @@ -313,7 +314,7 @@ fn walreceiver_main( ); if reply_requested { // TODO: More thought should go into what values are sent here. - let last_lsn = PgLsn::from(u64::from(pcache.get_last_valid_lsn())); + let last_lsn = PgLsn::from(u64::from(timeline.get_last_valid_lsn())); let write_lsn = last_lsn; let flush_lsn = last_lsn; let apply_lsn = PgLsn::INVALID; diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index e5c978399d..9ac1ccb526 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -34,8 +34,8 @@ use tokio::process::{ChildStdin, ChildStdout, Command}; use tokio::time::timeout; use zenith_utils::lsn::Lsn; -use crate::page_cache::BufferTag; -use crate::page_cache::WALRecord; +use crate::repository::BufferTag; +use crate::repository::WALRecord; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::pg_constants;