From 34f4207501413467d344a52ca6fa49a13deafba3 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 27 May 2021 20:07:50 +0300 Subject: [PATCH] Refactoring of the Repository/Timeline stuff - All timelines are now stored in the same rocksdb repository. The GET functions have been taught to follow the ancestors. - Change the way relation size is stored. Instead of inserting "tombstone" entries for blocks that are truncated away, store relation size as separate key-value entry for each relation - Add an abstraction for the key-value store: ObjectStore. It allows swapping RocksDB with some other key-value store easily. Perhaps we will write our own storage implementation using that interface, or perhaps we'll need a different abstraction, but this is a small improvement over status quo in any case. - Garbage Collection is broken and commented out. It's not clear where and how it should be implemented. --- Cargo.lock | 3 + pageserver/Cargo.toml | 2 +- pageserver/src/branches.rs | 5 +- pageserver/src/lib.rs | 3 + pageserver/src/object_repository.rs | 956 +++++++++++++++++++++++++++ pageserver/src/object_store.rs | 61 ++ pageserver/src/page_cache.rs | 11 +- pageserver/src/repository.rs | 40 +- pageserver/src/repository/rocksdb.rs | 955 -------------------------- pageserver/src/restore_local_repo.rs | 6 +- pageserver/src/rocksdb_storage.rs | 208 ++++++ pageserver/src/walreceiver.rs | 4 - pageserver/src/walredo.rs | 2 +- 13 files changed, 1266 insertions(+), 990 deletions(-) create mode 100644 pageserver/src/object_repository.rs create mode 100644 pageserver/src/object_store.rs delete mode 100644 pageserver/src/repository/rocksdb.rs create mode 100644 pageserver/src/rocksdb_storage.rs diff --git a/Cargo.lock b/Cargo.lock index 15480055cd..72e34638cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,9 @@ name = "bytes" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +dependencies = [ + "serde", +] [[package]] name = "cassowary" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index cf073ad4d6..cf6f720b79 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -10,7 +10,7 @@ edition = "2018" chrono = "0.4.19" rand = "0.8.3" regex = "1.4.5" -bytes = "1.0.1" +bytes = { version = "1.0.1", features = ['serde'] } byteorder = "1.4.3" futures = "0.3.13" lazy_static = "1.4.0" diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 84e737ba6d..8f52e7fc3c 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -98,8 +98,11 @@ pub fn init_repo(conf: &'static PageServerConf, repo_dir: &Path) -> Result<()> { // and we failed to run initdb again in the same directory. This has been solved for the // rapid init+start case now, but the general race condition remains if you restart the // server quickly. - let repo = crate::repository::rocksdb::RocksRepository::new( + let storage = crate::rocksdb_storage::RocksObjectStore::create(conf)?; + + let repo = crate::object_repository::ObjectRepository::new( conf, + std::sync::Arc::new(storage), std::sync::Arc::new(crate::walredo::DummyRedoManager {}), ); let timeline = repo.create_empty_timeline(tli, Lsn(lsn))?; diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 11e049e908..93563a0f7b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -8,10 +8,13 @@ use std::time::Duration; pub mod basebackup; pub mod branches; +pub mod object_repository; +pub mod object_store; pub mod page_cache; pub mod page_service; pub mod repository; pub mod restore_local_repo; +pub mod rocksdb_storage; pub mod tui; pub mod tui_event; mod tui_logger; diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs new file mode 100644 index 0000000000..ebb84a97b8 --- /dev/null +++ b/pageserver/src/object_repository.rs @@ -0,0 +1,956 @@ +//! +//! Implementation of the Repository/Timeline traits, using a key-value store +//! (ObjectStore) to for the actual storage. +//! +//! This maps the relation-oriented operations in the Timeline interface into +//! objects stored in an ObjectStore. Relation size is stored as a separate object +//! in the key-value store. If a page is written beyond the current end-of-file, +//! we also insert the new size as a new "page version" in the key-value store. +//! +//! Also, this implements Copy-on-Write forking of timelines. For each timeline, +//! we store the parent timeline in the object store, in a little metadata blob. +//! When we need to find a version of a page, we walk the timeline history backwards +//! until we find the page we're looking for, making a separate lookup into the +//! key-value store for each timeline. + +use crate::object_store::{ObjectKey, ObjectStore}; +use crate::repository::*; +use crate::restore_local_repo::import_timeline_wal; +use crate::walredo::WalRedoManager; +use crate::{PageServerConf, ZTimelineId}; +use anyhow::{bail, Context, Result}; +use bytes::Bytes; +use log::*; +use postgres_ffi::pg_constants; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::collections::HashSet; +use std::convert::TryInto; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::{AtomicLsn, Lsn}; +use zenith_utils::seqwait::SeqWait; + +/// +/// A repository corresponds to one .zenith directory. One repository holds multiple +/// timelines, forked off from the same initial call to 'initdb'. +/// +pub struct ObjectRepository { + obj_store: Arc, + conf: &'static PageServerConf, + timelines: Mutex>>, + walredo_mgr: Arc, +} + +// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. +static TIMEOUT: Duration = Duration::from_secs(600); + +impl ObjectRepository { + pub fn new( + conf: &'static PageServerConf, + obj_store: Arc, + walredo_mgr: Arc, + ) -> ObjectRepository { + ObjectRepository { + conf, + obj_store, + timelines: Mutex::new(HashMap::new()), + walredo_mgr, + } + } +} + +impl Repository for ObjectRepository { + /// Get Timeline handle for given zenith timeline ID. + fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + + match timelines.get(&timelineid) { + Some(timeline) => Ok(timeline.clone()), + None => { + let timeline = ObjectTimeline::open( + Arc::clone(&self.obj_store), + timelineid, + self.walredo_mgr.clone(), + )?; + + // Load any new WAL after the last checkpoint into the repository. + info!( + "Loading WAL for timeline {} starting at {}", + timelineid, + timeline.get_last_record_lsn() + ); + let wal_dir = self.conf.timeline_path(timelineid).join("wal"); + import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; + + let timeline_rc = Arc::new(timeline); + + if self.conf.gc_horizon != 0 { + ObjectTimeline::launch_gc_thread(self.conf, timeline_rc.clone()); + } + + timelines.insert(timelineid, timeline_rc.clone()); + + Ok(timeline_rc) + } + } + } + + /// Create a new, empty timeline. The caller is responsible for loading data into it + fn create_empty_timeline( + &self, + timelineid: ZTimelineId, + start_lsn: Lsn, + ) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + + // Write metadata key + let metadata = MetadataEntry { + last_valid_lsn: start_lsn, + last_record_lsn: start_lsn, + ancestor_timeline: None, + ancestor_lsn: start_lsn, + }; + self.obj_store.put( + &timeline_metadata_key(timelineid), + Lsn(0), + &MetadataEntry::ser(&metadata)?, + )?; + + info!("Created empty timeline {}", timelineid); + + let timeline = ObjectTimeline::open( + Arc::clone(&self.obj_store), + timelineid, + self.walredo_mgr.clone(), + )?; + + let timeline_rc = Arc::new(timeline); + let r = timelines.insert(timelineid, timeline_rc.clone()); + assert!(r.is_none()); + + // don't start the garbage collector for unit tests, either. + + Ok(timeline_rc) + } + + /// Branch a timeline + fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { + // just to check the source timeline exists + let _ = self.get_timeline(src)?; + + // Write a metadata key, noting the ancestor of th new timeline. There is initially + // no data in it, but all the read-calls know to look into the ancestor. + let metadata = MetadataEntry { + last_valid_lsn: at_lsn, + last_record_lsn: at_lsn, + ancestor_timeline: Some(src), + ancestor_lsn: at_lsn, + }; + self.obj_store.put( + &timeline_metadata_key(dst), + Lsn(0), + &MetadataEntry::ser(&metadata)?, + )?; + + Ok(()) + } +} + +/// +/// A handle to a specific timeline in the repository. This is the API +/// that's exposed to the rest of the system. +/// +pub struct ObjectTimeline { + timelineid: ZTimelineId, + + // Backing key-value store + obj_store: Arc, + + // WAL redo manager, for reconstructing page versions from WAL records. + walredo_mgr: Arc, + + // What page versions do we hold in the cache? If we get a request > last_valid_lsn, + // we need to wait until we receive all the WAL up to the request. The SeqWait + // provides functions for that. TODO: If we get a request for an old LSN, such that + // the versions have already been garbage collected away, we should throw an error, + // but we don't track that currently. + // + // last_record_lsn points to the end of last processed WAL record. + // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL + // after the end of last record, but not the whole next record yet. In the + // page cache, we care about last_valid_lsn, but if the WAL receiver needs to + // restart the streaming, it needs to restart at the end of last record, so + // we track them separately. last_record_lsn should perhaps be in + // walreceiver.rs instead of here, but it seems convenient to keep all three + // values together. + // + last_valid_lsn: SeqWait, + last_record_lsn: AtomicLsn, + + ancestor_timeline: Option, + ancestor_lsn: Lsn, +} + +impl ObjectTimeline { + /// Open a Timeline handle. + /// + /// Loads the metadata for the timeline into memory. + fn open( + obj_store: Arc, + timelineid: ZTimelineId, + walredo_mgr: Arc, + ) -> Result { + // Load metadata into memory + let v = obj_store + .get(&timeline_metadata_key(timelineid), Lsn(0)) + .with_context(|| "timeline not found in repository")?; + let metadata = MetadataEntry::des(&v)?; + + let timeline = ObjectTimeline { + timelineid, + obj_store, + walredo_mgr, + last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), + last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), + ancestor_timeline: metadata.ancestor_timeline, + ancestor_lsn: metadata.ancestor_lsn, + }; + Ok(timeline) + } +} + +impl Timeline for ObjectTimeline { + //------------------------------------------------------------------------------ + // Public GET functions + //------------------------------------------------------------------------------ + + /// Look up given page in the cache. + fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result { + let lsn = self.wait_lsn(req_lsn)?; + + self.get_page_at_lsn_nowait(tag, lsn) + } + + /// Get size of relation + fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result { + let lsn = self.wait_lsn(lsn)?; + + match self.relsize_get_nowait(rel, lsn)? { + Some(nblocks) => Ok(nblocks), + None => bail!("relation {} not found at {}", rel, lsn), + } + } + + /// Does relation exist at given LSN? + fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { + let lsn = self.wait_lsn(req_lsn)?; + let key = relation_size_key(self.timelineid, rel); + let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?; + if let Some((_key, _val)) = iter.next().transpose()? { + debug!("Relation {} exists at {}", rel, lsn); + return Ok(true); + } + debug!("Relation {} doesn't exist at {}", rel, lsn); + Ok(false) + } + + /// Get a list of all distinct relations in given tablespace and database. + fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { + // List all relations in this timeline. + let mut all_rels = self + .obj_store + .list_rels(self.timelineid, spcnode, dbnode, lsn)?; + + // Also list all relations in ancestor timelines. If a relation hasn't been modified + // after the fork, there will be no trace of it in the object store with the current + // timeline id. + let mut prev_timeline: Option = self.ancestor_timeline; + let mut lsn = self.ancestor_lsn; + while let Some(timeline) = prev_timeline { + let this_rels = self.obj_store.list_rels(timeline, spcnode, dbnode, lsn)?; + + for rel in this_rels { + all_rels.insert(rel); + } + + // Load ancestor metadata. + let v = self + .obj_store + .get(&timeline_metadata_key(timeline), Lsn(0)) + .with_context(|| "timeline not found in repository")?; + let metadata = MetadataEntry::des(&v)?; + + prev_timeline = metadata.ancestor_timeline; + lsn = metadata.ancestor_lsn; + } + + Ok(all_rels) + } + + //------------------------------------------------------------------------------ + // Public PUT functions, to update the repository with new page versions. + // + // These are called by the WAL receiver to digest WAL records. + //------------------------------------------------------------------------------ + + /// Put a new page version that can be constructed from a WAL record + /// + /// This will implicitly extend the relation, if the page is beyond the + /// current end-of-file. + fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()> { + let lsn = rec.lsn; + let key = ObjectKey { + timeline: self.timelineid, + buf_tag: tag, + }; + let val = PageEntry::WALRecord(rec); + + self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?; + debug!( + "put_wal_record rel {} blk {} at {}", + tag.rel, tag.blknum, lsn + ); + + // Also check if this created or extended the file + let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + + if tag.blknum >= old_nblocks { + let new_nblocks = tag.blknum + 1; + let key = relation_size_key(self.timelineid, tag.rel); + let val = RelationSizeEntry::Size(new_nblocks); + + trace!( + "Extended relation {} from {} to {} blocks at {}", + tag.rel, + old_nblocks, + new_nblocks, + lsn + ); + + self.obj_store + .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; + } + + Ok(()) + } + + /// + /// Memorize a full image of a page version + /// + fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()> { + let key = ObjectKey { + timeline: self.timelineid, + buf_tag: tag, + }; + let val = PageEntry::Page(img); + + self.obj_store.put(&key, lsn, &PageEntry::ser(&val)?)?; + + debug!( + "put_page_image rel {} blk {} at {}", + tag.rel, tag.blknum, lsn + ); + + // Also check if this created or extended the file + let old_nblocks = self.relsize_get_nowait(tag.rel, lsn)?.unwrap_or(0); + + if tag.blknum >= old_nblocks { + let new_nblocks = tag.blknum + 1; + let key = relation_size_key(self.timelineid, tag.rel); + let val = RelationSizeEntry::Size(new_nblocks); + + trace!( + "Extended relation {} from {} to {} blocks at {}", + tag.rel, + old_nblocks, + new_nblocks, + lsn + ); + + self.obj_store + .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; + } + + Ok(()) + } + + /// + /// Adds a relation-wide WAL record (like truncate) to the repository, + /// associating it with all pages started with specified block number + /// + fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> { + let key = relation_size_key(self.timelineid, rel); + let val = RelationSizeEntry::Size(nblocks); + + info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); + + self.obj_store + .put(&key, lsn, &RelationSizeEntry::ser(&val)?)?; + + Ok(()) + } + + /// Remember the all WAL before the given LSN has been processed. + /// + /// The WAL receiver calls this after the put_* functions, to indicate that + /// all WAL before this point has been digested. Before that, if you call + /// GET on an earlier LSN, it will block. + fn advance_last_valid_lsn(&self, lsn: Lsn) { + let old = self.last_valid_lsn.advance(lsn); + + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last valid LSN backwards (was {}, new {})", + old, lsn + ); + } + } + + fn get_last_valid_lsn(&self) -> Lsn { + self.last_valid_lsn.load() + } + + fn init_valid_lsn(&self, lsn: Lsn) { + let old = self.last_valid_lsn.advance(lsn); + assert!(old == Lsn(0)); + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old == Lsn(0)); + } + + /// Like `advance_last_valid_lsn`, but this always points to the end of + /// a WAL record, not in the middle of one. + /// + /// This must be <= last valid LSN. This is tracked separately from last + /// valid LSN, so that the WAL receiver knows where to restart streaming. + /// + /// NOTE: this updates last_valid_lsn as well. + fn advance_last_record_lsn(&self, lsn: Lsn) { + // Can't move backwards. + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old <= lsn); + + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); + } + } + fn get_last_record_lsn(&self) -> Lsn { + self.last_record_lsn.load() + } + + /// + /// Flush to disk all data that was written with the put_* functions + /// + /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't + /// know anything about them here in the repository. + + // Flush all the changes written so far with PUT functions to disk. + // RocksDB writes out things as we go (?), so we don't need to do much here. We just + // write out the last valid and record LSNs. + fn checkpoint(&self) -> Result<()> { + let metadata = MetadataEntry { + last_valid_lsn: self.last_valid_lsn.load(), + last_record_lsn: self.last_record_lsn.load(), + ancestor_timeline: self.ancestor_timeline, + ancestor_lsn: self.ancestor_lsn, + }; + self.obj_store.put( + &timeline_metadata_key(self.timelineid), + Lsn(0), + &MetadataEntry::ser(&metadata)?, + )?; + + trace!("checkpoint at {}", metadata.last_valid_lsn); + + Ok(()) + } +} + +impl ObjectTimeline { + fn get_page_at_lsn_nowait(&self, tag: BufferTag, lsn: Lsn) -> Result { + // Look up the page entry. If it's a page image, return that. If it's a WAL record, + // ask the WAL redo service to reconstruct the page image from the WAL records. + let searchkey = ObjectKey { + timeline: self.timelineid, + buf_tag: tag, + }; + let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; + + if let Some((version_lsn, value)) = iter.next().transpose()? { + let page_img: Bytes; + + match PageEntry::des(&value)? { + PageEntry::Page(img) => { + page_img = img; + } + PageEntry::WALRecord(_rec) => { + // Request the WAL redo manager to apply the WAL records for us. + let (base_img, records) = self.collect_records_for_apply(tag, lsn)?; + page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; + + self.put_page_image(tag, lsn, page_img.clone())?; + } + } + // FIXME: assumes little-endian. Only used for the debugging log though + let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); + let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); + trace!( + "Returning page with LSN {:X}/{:X} for {} blk {} from {} (request {})", + page_lsn_hi, + page_lsn_lo, + tag.rel, + tag.blknum, + version_lsn, + lsn + ); + return Ok(page_img); + } + static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; + trace!("page {} blk {} at {} not found", tag.rel, tag.blknum, lsn); + Ok(Bytes::from_static(&ZERO_PAGE)) + /* return Err("could not find page image")?; */ + } + + /// + /// Internal function to get relation size at given LSN. + /// + /// The caller must ensure that WAL has been received up to 'lsn'. + /// + fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result> { + let key = relation_size_key(self.timelineid, rel); + let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?; + + if let Some((version_lsn, value)) = iter.next().transpose()? { + match RelationSizeEntry::des(&value)? { + RelationSizeEntry::Size(nblocks) => { + trace!( + "relation {} has size {} at {} (request {})", + rel, + nblocks, + version_lsn, + lsn + ); + Ok(Some(nblocks)) + } + RelationSizeEntry::Unlink => { + trace!( + "relation {} not found; it was dropped at lsn {}", + rel, + version_lsn + ); + Ok(None) + } + } + } else { + info!("relation {} not found at {}", rel, lsn); + Ok(None) + } + } + + /// + /// Collect all the WAL records that are needed to reconstruct a page + /// image for the given cache entry. + /// + /// Returns an old page image (if any), and a vector of WAL records to apply + /// over it. + /// + fn collect_records_for_apply( + &self, + tag: BufferTag, + lsn: Lsn, + ) -> Result<(Option, Vec)> { + let mut base_img: Option = None; + let mut records: Vec = Vec::new(); + + // Scan backwards, collecting the WAL records, until we hit an + // old page image. + let searchkey = ObjectKey { + timeline: self.timelineid, + buf_tag: tag, + }; + let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; + while let Some((_key, value)) = iter.next().transpose()? { + match PageEntry::des(&value)? { + PageEntry::Page(img) => { + // We have a base image. No need to dig deeper into the list of + // records + base_img = Some(img); + break; + } + PageEntry::WALRecord(rec) => { + records.push(rec.clone()); + // If this WAL record initializes the page, no need to dig deeper. + if rec.will_init { + break; + } + } + } + } + records.reverse(); + Ok((base_img, records)) + } + + fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc) { + let _gc_thread = thread::Builder::new() + .name("Garbage collection thread".into()) + .spawn(move || { + // FIXME + timeline_rc.do_gc(conf).expect("GC thread died"); + }) + .unwrap(); + } + + fn do_gc(&self, conf: &'static PageServerConf) -> Result<()> { + loop { + thread::sleep(conf.gc_period); + + // FIXME: broken + /* + let last_lsn = self.get_last_valid_lsn(); + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { + let mut maxkey = StorageKey { + tag: BufferTag { + rel: RelTag { + spcnode: u32::MAX, + dbnode: u32::MAX, + relnode: u32::MAX, + forknum: u8::MAX, + }, + blknum: u32::MAX, + }, + lsn: Lsn::MAX, + }; + let now = Instant::now(); + let mut reconstructed = 0u64; + let mut truncated = 0u64; + let mut inspected = 0u64; + let mut deleted = 0u64; + loop { + let mut iter = self.db.raw_iterator(); + iter.seek_for_prev(maxkey.to_bytes()); + if iter.valid() { + let key = StorageKey::des(iter.key().unwrap()); + let val = StorageValue::des(iter.value().unwrap()); + + inspected += 1; + + // Construct boundaries for old records cleanup + maxkey.tag = key.tag; + let last_lsn = key.lsn; + maxkey.lsn = min(horizon, last_lsn); // do not remove last version + + let mut minkey = maxkey.clone(); + minkey.lsn = Lsn(0); // first version + + // reconstruct most recent page version + if let StorageValueContent::Image(_) = val.content { + // force reconstruction of most recent page version + let (base_img, records) = + self.collect_records_for_apply(key.tag, key.lsn); + + trace!( + "Reconstruct most recent page {} blk {} at {} from {} records", + key.tag.rel, + key.tag.blknum, + key.lsn, + records.len() + ); + + let new_img = self + .walredo_mgr + .request_redo(key.tag, key.lsn, base_img, records)?; + self.put_page_image(key.tag, key.lsn, new_img.clone()); + + reconstructed += 1; + } + + iter.seek_for_prev(maxkey.to_bytes()); + if iter.valid() { + // do not remove last version + if last_lsn > horizon { + // locate most recent record before horizon + let key = StorageKey::des(iter.key().unwrap()); + if key.tag == maxkey.tag { + let val = StorageValue::des(iter.value().unwrap()); + if let StorageValueContent::Image(_) = val.content { + let (base_img, records) = + self.collect_records_for_apply(key.tag, key.lsn); + trace!("Reconstruct horizon page {} blk {} at {} from {} records", + key.tag.rel, key.tag.blknum, key.lsn, records.len()); + let new_img = self + .walredo_mgr + .request_redo(key.tag, key.lsn, base_img, records)?; + self.put_page_image(key.tag, key.lsn, new_img.clone()); + + truncated += 1; + } else { + trace!( + "Keeping horizon page {} blk {} at {}", + key.tag.rel, + key.tag.blknum, + key.lsn + ); + } + } + } else { + trace!( + "Last page {} blk {} at {}, horizon {}", + key.tag.rel, + key.tag.blknum, + key.lsn, + horizon + ); + } + // remove records prior to horizon + loop { + iter.prev(); + if !iter.valid() { + break; + } + let key = StorageKey::des(iter.key().unwrap()); + if key.tag != maxkey.tag { + break; + } + let mut val = StorageValue::des(iter.value().unwrap()); + if val.alive { + val.alive = false; + self.storage.put(key, val)?; + deleted += 1; + trace!( + "deleted: {} blk {} at {}", + key.tag.rel, + key.tag.blknum, + key.lsn + ); + } else { + break; + } + } + } + maxkey = minkey; + } else { + break; + } + } + info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted", + now.elapsed(), inspected, reconstructed, truncated, deleted); + } + */ + } + } + + // + // Wait until WAL has been received up to the given LSN. + // + fn wait_lsn(&self, mut lsn: Lsn) -> Result { + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. + if lsn == Lsn(0) { + let last_valid_lsn = self.last_valid_lsn.load(); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + last_valid_lsn, + lsn + ); + lsn = last_valid_lsn; + } + trace!( + "Start waiting for LSN {}, valid LSN is {}", + lsn, + self.last_valid_lsn.load() + ); + self.last_valid_lsn + .wait_for_timeout(lsn, TIMEOUT) + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}", + lsn, + self.last_valid_lsn.load(), + ) + })?; + //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); + + Ok(lsn) + } + + /// + /// Iterate through object versions with given key, in reverse LSN order. + /// + /// This implements following the timeline history over the plain + /// ObjectStore::object_versions function, which doesn't know + /// about the relationships between timeline. + /// + fn object_versions<'a>( + &self, + obj_store: &'a dyn ObjectStore, + key: &ObjectKey, + lsn: Lsn, + ) -> Result> { + let current_iter = obj_store.object_versions(key, lsn)?; + + Ok(ObjectVersionIter { + obj_store, + buf_tag: key.buf_tag, + current_iter, + ancestor_timeline: self.ancestor_timeline, + ancestor_lsn: self.ancestor_lsn, + }) + } +} + +/// +/// We store two kinds of page versions in the repository: +/// +/// 1. Ready-made images of the block +/// 2. WAL records, to be applied on top of the "previous" entry +/// +/// Some WAL records will initialize the page from scratch. For such records, +/// the 'will_init' flag is set. They don't need the previous page image before +/// applying. The 'will_init' flag is set for records containing a full-page image, +/// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages +/// stored directly in the cache entry in that you still need to run the WAL redo +/// routine to generate the page image. +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +enum PageEntry { + Page(Bytes), + WALRecord(WALRecord), +} + +/// +/// In addition to page versions, we store relation size as a separate, versioned, +/// object. +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RelationSizeEntry { + Size(u32), + + /// Tombstone for a dropped relation. + // + // TODO: Not used. Currently, we never drop relations. The parsing + // of relation drops in COMMIT/ABORT records has not been + // implemented. We should also have a mechanism to remove + // "orphaned" relfiles, if the compute node crashes before writing + // the COMMIT/ABORT record. + Unlink, +} + +const fn relation_size_key(timelineid: ZTimelineId, rel: RelTag) -> ObjectKey { + ObjectKey { + timeline: timelineid, + buf_tag: BufferTag { + rel, + blknum: u32::MAX, + }, + } +} + +/// +/// In addition to those per-page and per-relation entries, we also +/// store a little metadata blob for each timeline. It is stored using +/// STORAGE_SPECIAL_FORKNUM. +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetadataEntry { + last_valid_lsn: Lsn, + last_record_lsn: Lsn, + ancestor_timeline: Option, + ancestor_lsn: Lsn, +} + +const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey { + ObjectKey { + timeline: timelineid, + buf_tag: BufferTag { + rel: RelTag { + forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM, + spcnode: 0, + dbnode: 0, + relnode: 0, + }, + blknum: 0, + }, + } +} + +/// +/// Iterator for `object_versions`. Returns all page versions of a given block, in +/// reverse LSN order. This implements the traversal of ancestor timelines. If +/// a page isn't found in the most recent timeline, this iterates to the parent, +/// until a page version is found. +/// +struct ObjectVersionIter<'a> { + obj_store: &'a dyn ObjectStore, + + buf_tag: BufferTag, + + /// Iterator on the current timeline. + current_iter: Box)> + 'a>, + + /// Ancestor of the current timeline being iterated. + ancestor_timeline: Option, + ancestor_lsn: Lsn, +} + +impl<'a> Iterator for ObjectVersionIter<'a> { + type Item = Result<(Lsn, Vec)>; + + fn next(&mut self) -> Option { + self.next_result().transpose() + } +} + +impl<'a> ObjectVersionIter<'a> { + /// + /// "transposed" version of the standard Iterator::next function. + /// + /// The rust standard Iterator::next function returns an + /// Option of a Result, but it's more convenient to work with + /// Result of a Option so that you can use ? to check for errors. + /// + fn next_result(&mut self) -> Result)>> { + loop { + // If there is another entry on the current timeline, return it. + if let Some(result) = self.current_iter.next() { + return Ok(Some(result)); + } + + // Out of entries on this timeline. Move to the ancestor, if any. + if let Some(ancestor_timeline) = self.ancestor_timeline { + let searchkey = ObjectKey { + timeline: ancestor_timeline, + buf_tag: self.buf_tag, + }; + let ancestor_iter = self + .obj_store + .object_versions(&searchkey, self.ancestor_lsn)?; + + // Load the parent timeline's metadata. (We don't + // actually need it yet, only if we need to follow to + // the grandparent timeline) + let v = self + .obj_store + .get(&timeline_metadata_key(ancestor_timeline), Lsn(0)) + .with_context(|| "timeline not found in repository")?; + let ancestor_metadata = MetadataEntry::des(&v)?; + + self.ancestor_timeline = ancestor_metadata.ancestor_timeline; + self.ancestor_lsn = ancestor_metadata.ancestor_lsn; + self.current_iter = ancestor_iter; + } else { + return Ok(None); + } + } + } +} diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs new file mode 100644 index 0000000000..7ff6fd0285 --- /dev/null +++ b/pageserver/src/object_store.rs @@ -0,0 +1,61 @@ +use crate::repository::{BufferTag, RelTag}; +use crate::ZTimelineId; +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::iter::Iterator; +use zenith_utils::lsn::Lsn; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ObjectKey { + pub timeline: ZTimelineId, + pub buf_tag: BufferTag, +} + +/// +/// Low-level storage abstraction. +/// +/// All the data in the repository is stored in a key-value store. This trait +/// abstracts the details of the key-value store. +/// +/// A simple key-value store would support just GET and PUT operations with +/// a key, but the upper layer needs slightly complicated read operations +/// +/// The most frequently used function is 'object_versions'. It is used +/// to look up a page version. It is LSN aware, in that the caller +/// specifies an LSN, and the function returns all values for that +/// block with the same or older LSN. +/// +pub trait ObjectStore: Send + Sync { + /// + /// Store a value with given key. + /// + fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()>; + + /// Read entry with the exact given key. + /// + /// This is used for retrieving metadata with special key that doesn't + /// correspond to any real relation. + fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result>; + + /// Iterate through all page versions of one object. + /// + /// Returns all page versions in descending LSN order, along with the LSN + /// of each page version. + fn object_versions<'a>( + &'a self, + key: &ObjectKey, + lsn: Lsn, + ) -> Result)> + 'a>>; + + /// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'. + /// + /// This is used to implement 'create database' + fn list_rels( + &self, + timelineid: ZTimelineId, + spcnode: u32, + dbnode: u32, + lsn: Lsn, + ) -> Result>; +} diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index e295b27903..add3e3ba7f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -3,30 +3,33 @@ //! isn't much here. If we implement multi-tenancy, this will probably be changed into //! a hash map, keyed by the tenant ID. -use crate::repository::rocksdb::RocksRepository; +use crate::object_repository::ObjectRepository; use crate::repository::Repository; +use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::PostgresRedoManager; use crate::PageServerConf; use lazy_static::lazy_static; use std::sync::{Arc, Mutex}; lazy_static! { - pub static ref REPOSITORY: Mutex>> = Mutex::new(None); + pub static ref REPOSITORY: Mutex>> = Mutex::new(None); } pub fn init(conf: &'static PageServerConf) { let mut m = REPOSITORY.lock().unwrap(); + let obj_store = RocksObjectStore::open(conf).unwrap(); + // Set up a WAL redo manager, for applying WAL records. let walredo_mgr = PostgresRedoManager::new(conf); // we have already changed current dir to the repository. - let repo = RocksRepository::new(conf, Arc::new(walredo_mgr)); + let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr)); *m = Some(Arc::new(repo)); } -pub fn get_repository() -> Arc { +pub fn get_repository() -> Arc { let o = &REPOSITORY.lock().unwrap(); Arc::clone(o.as_ref().unwrap()) } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 3da371f497..0113f98a00 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,5 +1,3 @@ -pub mod rocksdb; - use crate::ZTimelineId; use anyhow::Result; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -13,7 +11,7 @@ use zenith_utils::lsn::Lsn; /// /// A repository corresponds to one .zenith directory. One repository holds multiple /// timelines, forked off from the same initial call to 'initdb'. -pub trait Repository { +pub trait Repository: Send + Sync { /// Get Timeline handle for given zenith timeline ID. fn get_timeline(&self, timelineid: ZTimelineId) -> Result>; @@ -30,7 +28,7 @@ pub trait Repository { //fn get_stats(&self) -> RepositoryStats; } -pub trait Timeline { +pub trait Timeline: Send + Sync { //------------------------------------------------------------------------------ // Public GET functions //------------------------------------------------------------------------------ @@ -57,10 +55,10 @@ pub trait Timeline { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord); + fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) -> Result<()>; /// Like put_wal_record, but with ready-made image of the page. - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes); + fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) -> Result<()>; /// Truncate relation fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()>; @@ -156,7 +154,7 @@ pub struct BufferTag { pub blknum: u32, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct WALRecord { pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, @@ -196,6 +194,8 @@ impl WALRecord { #[cfg(test)] mod tests { use super::*; + use crate::object_repository::ObjectRepository; + use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::PageServerConf; use postgres_ffi::pg_constants; @@ -250,9 +250,11 @@ mod tests { // OK in a test. let conf: &'static PageServerConf = Box::leak(Box::new(conf)); + let obj_store = RocksObjectStore::create(conf)?; + let walredo_mgr = TestRedoManager {}; - let repo = rocksdb::RocksRepository::new(conf, Arc::new(walredo_mgr)); + let repo = ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr)); Ok(Box::new(repo)) } @@ -269,21 +271,17 @@ mod tests { let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; tline.init_valid_lsn(Lsn(1)); - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2")); - tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2")); - tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3")); - tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4")); - tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5")); + tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TEST_BUF(0), Lsn(2), TEST_IMG("foo blk 0 at 2"))?; + tline.put_page_image(TEST_BUF(0), Lsn(3), TEST_IMG("foo blk 0 at 3"))?; + tline.put_page_image(TEST_BUF(1), Lsn(4), TEST_IMG("foo blk 1 at 4"))?; + tline.put_page_image(TEST_BUF(2), Lsn(5), TEST_IMG("foo blk 2 at 5"))?; tline.advance_last_valid_lsn(Lsn(5)); - // FIXME: The rocksdb implementation erroneously returns 'true' here, even - // though the relation was created only at a later LSN - // rocksdb implementation erroneosly returns 'true' here - assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, true); // CORRECT: false - - // And this probably should throw an error, becaue the relation doesn't exist at Lsn(1) yet - assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(1))?, 0); // CORRECT: throw error + // The relation was created at LSN 2, not visible at LSN 1 yet. + assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(1))?, false); + assert!(tline.get_rel_size(TESTREL_A, Lsn(1)).is_err()); assert_eq!(tline.get_rel_exists(TESTREL_A, Lsn(2))?, true); assert_eq!(tline.get_rel_size(TESTREL_A, Lsn(2))?, 1); @@ -364,7 +362,7 @@ mod tests { for i in 0..pg_constants::RELSEG_SIZE + 1 { let img = TEST_IMG(&format!("foo blk {} at {}", i, Lsn(lsn))); lsn += 1; - tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img); + tline.put_page_image(TEST_BUF(i as u32), Lsn(lsn), img)?; } tline.advance_last_valid_lsn(Lsn(lsn)); diff --git a/pageserver/src/repository/rocksdb.rs b/pageserver/src/repository/rocksdb.rs deleted file mode 100644 index c561cd8f07..0000000000 --- a/pageserver/src/repository/rocksdb.rs +++ /dev/null @@ -1,955 +0,0 @@ -// -// A Repository holds all the different page versions and WAL records -// -// This implementation uses RocksDB to store WAL wal records and -// full page images, keyed by the RelFileNode, blocknumber, and the -// LSN. - -use crate::repository::{BufferTag, RelTag, Repository, Timeline, WALRecord}; -use crate::restore_local_repo::import_timeline_wal; -use crate::walredo::WalRedoManager; -use crate::PageServerConf; -use crate::ZTimelineId; -use anyhow::{anyhow, bail, Context, Result}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; -use log::*; -use postgres_ffi::pg_constants; -use serde::{Deserialize, Serialize}; -use std::cmp::min; -use std::collections::HashMap; -use std::collections::HashSet; -use std::convert::TryInto; -use std::str::FromStr; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::{Duration, Instant}; -use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::{AtomicLsn, Lsn}; -use zenith_utils::seqwait::SeqWait; - -// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. -static TIMEOUT: Duration = Duration::from_secs(600); - -pub struct RocksRepository { - conf: &'static PageServerConf, - timelines: Mutex>>, - - walredo_mgr: Arc, -} - -pub struct RocksTimeline { - // RocksDB handle - db: rocksdb::DB, - - // WAL redo manager - walredo_mgr: Arc, - - // What page versions do we hold in the cache? If we get a request > last_valid_lsn, - // we need to wait until we receive all the WAL up to the request. The SeqWait - // provides functions for that. TODO: If we get a request for an old LSN, such that - // the versions have already been garbage collected away, we should throw an error, - // but we don't track that currently. - // - // last_record_lsn points to the end of last processed WAL record. - // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL - // after the end of last record, but not the whole next record yet. In the - // page cache, we care about last_valid_lsn, but if the WAL receiver needs to - // restart the streaming, it needs to restart at the end of last record, so - // we track them separately. last_record_lsn should perhaps be in - // walreceiver.rs instead of here, but it seems convenient to keep all three - // values together. - // - last_valid_lsn: SeqWait, - last_record_lsn: AtomicLsn, - - // Counters, for metrics collection. - pub num_entries: AtomicU64, - pub num_page_images: AtomicU64, - pub num_wal_records: AtomicU64, - pub num_getpage_requests: AtomicU64, -} - -// -// We store two kinds of entries in the repository: -// -// 1. Ready-made images of the block -// 2. WAL records, to be applied on top of the "previous" entry -// -// Some WAL records will initialize the page from scratch. For such records, -// the 'will_init' flag is set. They don't need the previous page image before -// applying. The 'will_init' flag is set for records containing a full-page image, -// and for records with the BKPBLOCK_WILL_INIT flag. These differ from PageImages -// stored directly in the cache entry in that you still need to run the WAL redo -// routine to generate the page image. -// -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize, Deserialize)] -struct CacheKey { - pub tag: BufferTag, - pub lsn: Lsn, -} - -// -// In addition to those per-page entries, the 'last_valid_lsn' and 'last_record_lsn' -// values are also persisted in the rocskdb repository. They are stored with CacheKeys -// with ROCKSDB_SPECIAL_FORKNUM, and 'blknum' indicates which value it is. The -// rest of the key fields are zero. We use a CacheKey as the key for these too, -// so that whenever we iterate through keys in the repository, we can safely parse -// the key blob as CacheKey without checking for these special values first. -// -// FIXME: This is quite a similar concept to the special entries created by -// `BufferTag::fork` function. Merge them somehow? These special keys are specific -// to the rocksb implementation, not exposed to the rest of the system, but the -// other special forks created by `BufferTag::fork` are also used elsewhere. -// -impl CacheKey { - const fn special(id: u32) -> CacheKey { - CacheKey { - tag: BufferTag { - rel: RelTag { - forknum: pg_constants::ROCKSDB_SPECIAL_FORKNUM, - spcnode: 0, - dbnode: 0, - relnode: 0, - }, - blknum: id, - }, - lsn: Lsn(0), - } - } - - fn is_special(&self) -> bool { - self.tag.rel.forknum == pg_constants::ROCKSDB_SPECIAL_FORKNUM - } -} - -static LAST_VALID_LSN_KEY: CacheKey = CacheKey::special(0); -static LAST_VALID_RECORD_LSN_KEY: CacheKey = CacheKey::special(1); - -enum CacheEntryContent { - PageImage(Bytes), - WALRecord(WALRecord), - Truncation, -} - -// The serialized representation of a CacheEntryContent begins with -// single byte that indicates what kind of entry it is. There is also -// an UNUSED_VERSION_FLAG that is not represented in the CacheEntryContent -// at all, you must peek into the first byte of the serialized representation -// to read it. -const CONTENT_PAGE_IMAGE: u8 = 1u8; -const CONTENT_WAL_RECORD: u8 = 2u8; -const CONTENT_TRUNCATION: u8 = 3u8; - -const CONTENT_KIND_MASK: u8 = 3u8; // bitmask that covers the above - -const UNUSED_VERSION_FLAG: u8 = 4u8; - -impl CacheEntryContent { - pub fn pack(&self, buf: &mut BytesMut) { - match self { - CacheEntryContent::PageImage(image) => { - buf.put_u8(CONTENT_PAGE_IMAGE); - buf.put_u16(image.len() as u16); - buf.put_slice(&image[..]); - } - CacheEntryContent::WALRecord(rec) => { - buf.put_u8(CONTENT_WAL_RECORD); - rec.pack(buf); - } - CacheEntryContent::Truncation => { - buf.put_u8(CONTENT_TRUNCATION); - } - } - } - pub fn unpack(buf: &mut Bytes) -> CacheEntryContent { - let kind = buf.get_u8() & CONTENT_KIND_MASK; - - match kind { - CONTENT_PAGE_IMAGE => { - let len = buf.get_u16() as usize; - let mut dst = vec![0u8; len]; - buf.copy_to_slice(&mut dst); - CacheEntryContent::PageImage(Bytes::from(dst)) - } - CONTENT_WAL_RECORD => CacheEntryContent::WALRecord(WALRecord::unpack(buf)), - CONTENT_TRUNCATION => CacheEntryContent::Truncation, - _ => unreachable!(), - } - } - - fn from_slice(slice: &[u8]) -> Self { - let mut buf = Bytes::copy_from_slice(slice); - Self::unpack(&mut buf) - } - - fn to_bytes(&self) -> BytesMut { - let mut buf = BytesMut::new(); - self.pack(&mut buf); - buf - } -} - -impl RocksRepository { - pub fn new( - conf: &'static PageServerConf, - walredo_mgr: Arc, - ) -> RocksRepository { - RocksRepository { - conf, - timelines: Mutex::new(HashMap::new()), - walredo_mgr, - } - } -} - -impl RocksRepository { - fn get_rocks_timeline(&self, timelineid: ZTimelineId) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); - - match timelines.get(&timelineid) { - Some(timeline) => Ok(timeline.clone()), - None => { - let timeline = - RocksTimeline::open(self.conf, timelineid, self.walredo_mgr.clone())?; - - // Load any new WAL after the last checkpoint into the repository. - info!( - "Loading WAL for timeline {} starting at {}", - timelineid, - timeline.get_last_record_lsn() - ); - let wal_dir = self.conf.timeline_path(timelineid).join("wal"); - import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; - - let timeline_rc = Arc::new(timeline); - - if self.conf.gc_horizon != 0 { - RocksTimeline::launch_gc_thread(self.conf, timeline_rc.clone()); - } - - timelines.insert(timelineid, timeline_rc.clone()); - - Ok(timeline_rc) - } - } - } -} - -// Get handle to a given timeline. It is assumed to already exist. -impl Repository for RocksRepository { - fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { - Ok(self.get_rocks_timeline(timelineid)?) - } - - fn create_empty_timeline( - &self, - timelineid: ZTimelineId, - start_lsn: Lsn, - ) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); - - let timeline = - RocksTimeline::create(&self.conf, timelineid, self.walredo_mgr.clone(), start_lsn)?; - - let timeline_rc = Arc::new(timeline); - let r = timelines.insert(timelineid, timeline_rc.clone()); - assert!(r.is_none()); - - // don't start the garbage collector for unit tests, either. - - Ok(timeline_rc) - } - - fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { - let src_timeline = self.get_rocks_timeline(src)?; - - info!("branching at {}", at_lsn); - - let dst_timeline = - RocksTimeline::create(&self.conf, dst, self.walredo_mgr.clone(), at_lsn)?; - - // Copy all entries <= LSN - // - // This is very inefficient, a far cry from the promise of cheap copy-on-write - // branching. But it will do for now. - let mut iter = src_timeline.db.raw_iterator(); - iter.seek_to_first(); - while iter.valid() { - let k = iter.key().unwrap(); - let key = CacheKey::des(k)?; - - if !key.is_special() && key.lsn <= at_lsn { - let v = iter.value().unwrap(); - dst_timeline.db.put(k, v)?; - } - iter.next(); - } - Ok(()) - } -} - -impl RocksTimeline { - /// common options used by `open` and `create` - fn get_rocksdb_opts() -> rocksdb::Options { - let mut opts = rocksdb::Options::default(); - opts.set_use_fsync(true); - opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { - if (val[0] & UNUSED_VERSION_FLAG) != 0 { - rocksdb::compaction_filter::Decision::Remove - } else { - rocksdb::compaction_filter::Decision::Keep - } - }); - opts - } - - /// Open a RocksDB database, and load the last valid and record LSNs into memory. - fn open( - conf: &PageServerConf, - timelineid: ZTimelineId, - walredo_mgr: Arc, - ) -> Result { - let path = conf.timeline_path(timelineid); - let db = rocksdb::DB::open(&RocksTimeline::get_rocksdb_opts(), path)?; - - // Load these into memory - let lsnstr = db - .get(LAST_VALID_LSN_KEY.ser()?) - .with_context(|| "last_valid_lsn not found in repository")? - .ok_or(anyhow!("empty last_valid_lsn"))?; - let last_valid_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?; - let lsnstr = db - .get(LAST_VALID_RECORD_LSN_KEY.ser()?) - .with_context(|| "last_record_lsn not found in repository")? - .ok_or(anyhow!("empty last_record_lsn"))?; - let last_record_lsn = Lsn::from_str(std::str::from_utf8(&lsnstr)?)?; - - let timeline = RocksTimeline { - db, - walredo_mgr, - - last_valid_lsn: SeqWait::new(last_valid_lsn), - last_record_lsn: AtomicLsn::new(last_record_lsn.0), - - num_entries: AtomicU64::new(0), - num_page_images: AtomicU64::new(0), - num_wal_records: AtomicU64::new(0), - num_getpage_requests: AtomicU64::new(0), - }; - Ok(timeline) - } - - /// Create a new RocksDB database. It is initally empty, except for the last - /// valid and last record LSNs, which are set to 'start_lsn'. - fn create( - conf: &PageServerConf, - timelineid: ZTimelineId, - walredo_mgr: Arc, - start_lsn: Lsn, - ) -> Result { - let path = conf.timeline_path(timelineid); - let mut opts = RocksTimeline::get_rocksdb_opts(); - opts.create_if_missing(true); - opts.set_error_if_exists(true); - let db = rocksdb::DB::open(&opts, path)?; - - let timeline = RocksTimeline { - db, - walredo_mgr, - - last_valid_lsn: SeqWait::new(start_lsn), - last_record_lsn: AtomicLsn::new(start_lsn.0), - - num_entries: AtomicU64::new(0), - num_page_images: AtomicU64::new(0), - num_wal_records: AtomicU64::new(0), - num_getpage_requests: AtomicU64::new(0), - }; - // Write the initial last_valid/record_lsn values - timeline.checkpoint()?; - Ok(timeline) - } - - fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc) { - let timeline_rc_copy = timeline_rc.clone(); - let _gc_thread = thread::Builder::new() - .name("Garbage collection thread".into()) - .spawn(move || { - // FIXME - timeline_rc_copy.do_gc(conf).expect("GC thread died"); - }) - .unwrap(); - } - - /// - /// Collect all the WAL records that are needed to reconstruct a page - /// image for the given cache entry. - /// - /// Returns an old page image (if any), and a vector of WAL records to apply - /// over it. - /// - fn collect_records_for_apply( - &self, - tag: BufferTag, - lsn: Lsn, - ) -> (Option, Vec) { - let key = CacheKey { tag, lsn }; - let mut base_img: Option = None; - let mut records: Vec = Vec::new(); - - let mut iter = self.db.raw_iterator(); - let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); - iter.seek_for_prev(serialized_key); - - // Scan backwards, collecting the WAL records, until we hit an - // old page image. - while iter.valid() { - let key = CacheKey::des(iter.key().unwrap()).unwrap(); - if key.tag != tag { - break; - } - let content = CacheEntryContent::from_slice(iter.value().unwrap()); - if let CacheEntryContent::PageImage(img) = content { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img); - break; - } else if let CacheEntryContent::WALRecord(rec) = content { - records.push(rec.clone()); - // If this WAL record initializes the page, no need to dig deeper. - if rec.will_init { - break; - } - } else { - panic!("no base image and no WAL record on cache entry"); - } - iter.prev(); - } - records.reverse(); - (base_img, records) - } - - // Internal functions - - // - // Internal function to get relation size at given LSN. - // - // The caller must ensure that WAL has been received up to 'lsn'. - // - fn relsize_get_nowait(&self, rel: RelTag, lsn: Lsn) -> Result { - assert!(lsn <= self.last_valid_lsn.load()); - - let mut key = CacheKey { - tag: BufferTag { - rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut iter = self.db.raw_iterator(); - loop { - iter.seek_for_prev(key.ser()?); - if iter.valid() { - let thiskey = CacheKey::des(iter.key().unwrap())?; - if thiskey.tag.rel == rel { - // Ignore entries with later LSNs. - if thiskey.lsn > lsn { - key.tag.blknum = thiskey.tag.blknum; - continue; - } - - let content = CacheEntryContent::from_slice(iter.value().unwrap()); - if let CacheEntryContent::Truncation = content { - if thiskey.tag.blknum > 0 { - key.tag.blknum = thiskey.tag.blknum - 1; - continue; - } - break; - } - let relsize = thiskey.tag.blknum + 1; - debug!("Size of relation {} at {} is {}", rel, lsn, relsize); - return Ok(relsize); - } - } - break; - } - debug!("Size of relation {} at {} is zero", rel, lsn); - Ok(0) - } - - fn do_gc(&self, conf: &'static PageServerConf) -> Result { - loop { - thread::sleep(conf.gc_period); - let last_lsn = self.get_last_valid_lsn(); - - // checked_sub() returns None on overflow. - if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { - let mut maxkey = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: u32::MAX, - dbnode: u32::MAX, - relnode: u32::MAX, - forknum: u8::MAX, - }, - blknum: u32::MAX, - }, - lsn: Lsn::MAX, - }; - let now = Instant::now(); - let mut reconstructed = 0u64; - let mut truncated = 0u64; - let mut inspected = 0u64; - let mut deleted = 0u64; - loop { - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(maxkey.ser()?); - if iter.valid() { - let key = CacheKey::des(iter.key().unwrap())?; - let v = iter.value().unwrap(); - - inspected += 1; - - // Construct boundaries for old records cleanup - maxkey.tag = key.tag; - let last_lsn = key.lsn; - maxkey.lsn = min(horizon, last_lsn); // do not remove last version - - let mut minkey = maxkey.clone(); - minkey.lsn = Lsn(0); // first version - - // reconstruct most recent page version - if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { - // force reconstruction of most recent page version - let (base_img, records) = - self.collect_records_for_apply(key.tag, key.lsn); - - trace!( - "Reconstruct most recent page {} blk {} at {} from {} records", - key.tag.rel, - key.tag.blknum, - key.lsn, - records.len() - ); - - let new_img = self - .walredo_mgr - .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); - - reconstructed += 1; - } - - iter.seek_for_prev(maxkey.ser()?); - if iter.valid() { - // do not remove last version - if last_lsn > horizon { - // locate most recent record before horizon - let key = CacheKey::des(iter.key().unwrap())?; - if key.tag == maxkey.tag { - let v = iter.value().unwrap(); - if (v[0] & CONTENT_KIND_MASK) == CONTENT_WAL_RECORD { - let (base_img, records) = - self.collect_records_for_apply(key.tag, key.lsn); - trace!("Reconstruct horizon page {} blk {} at {} from {} records", - key.tag.rel, key.tag.blknum, key.lsn, records.len()); - let new_img = self - .walredo_mgr - .request_redo(key.tag, key.lsn, base_img, records)?; - self.put_page_image(key.tag, key.lsn, new_img.clone()); - - truncated += 1; - } else { - trace!( - "Keeping horizon page {} blk {} at {}", - key.tag.rel, - key.tag.blknum, - key.lsn - ); - } - } - } else { - trace!( - "Last page {} blk {} at {}, horizon {}", - key.tag.rel, - key.tag.blknum, - key.lsn, - horizon - ); - } - // remove records prior to horizon - loop { - iter.prev(); - if !iter.valid() { - break; - } - let key = CacheKey::des(iter.key().unwrap())?; - if key.tag != maxkey.tag { - break; - } - let v = iter.value().unwrap(); - if (v[0] & UNUSED_VERSION_FLAG) == 0 { - let mut v = v.to_owned(); - v[0] |= UNUSED_VERSION_FLAG; - self.db.put(key.ser()?, &v[..])?; - deleted += 1; - trace!( - "deleted: {} blk {} at {}", - key.tag.rel, - key.tag.blknum, - key.lsn - ); - } else { - break; - } - } - } - maxkey = minkey; - } else { - break; - } - } - info!("Garbage collection completed in {:?}:\n{} version chains inspected, {} pages reconstructed, {} version histories truncated, {} versions deleted", - now.elapsed(), inspected, reconstructed, truncated, deleted); - } - } - } - - // - // Wait until WAL has been received up to the given LSN. - // - fn wait_lsn(&self, mut lsn: Lsn) -> Result { - // When invalid LSN is requested, it means "don't wait, return latest version of the page" - // This is necessary for bootstrap. - if lsn == Lsn(0) { - let last_valid_lsn = self.last_valid_lsn.load(); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - last_valid_lsn, - lsn - ); - lsn = last_valid_lsn; - } - trace!( - "Start waiting for LSN {}, valid LSN is {}", - lsn, - self.last_valid_lsn.load() - ); - self.last_valid_lsn - .wait_for_timeout(lsn, TIMEOUT) - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}", - lsn, - self.last_valid_lsn.load(), - ) - })?; - //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); - - Ok(lsn) - } -} - -impl Timeline for RocksTimeline { - // Public GET interface functions - - /// - /// GetPage@LSN - /// - /// Returns an 8k page image - /// - fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> Result { - self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); - - let lsn = self.wait_lsn(req_lsn)?; - - // Look up cache entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let key = CacheKey { tag, lsn }; - - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.ser()?); - - if iter.valid() { - let key = CacheKey::des(iter.key().unwrap())?; - if key.tag == tag { - let content = CacheEntryContent::from_slice(iter.value().unwrap()); - let page_img: Bytes; - if let CacheEntryContent::PageImage(img) = content { - page_img = img; - } else if let CacheEntryContent::WALRecord(_rec) = content { - // Request the WAL redo manager to apply the WAL records for us. - let (base_img, records) = self.collect_records_for_apply(tag, lsn); - page_img = self.walredo_mgr.request_redo(tag, lsn, base_img, records)?; - - self.put_page_image(tag, lsn, page_img.clone()); - } else { - // No base image, and no WAL record. Huh? - bail!("no page image or WAL record for requested page"); - } - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = - u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = - u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - debug!( - "Returning page with LSN {:X}/{:X} for {} blk {}", - page_lsn_hi, page_lsn_lo, tag.rel, tag.blknum - ); - return Ok(page_img); - } - } - static ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - debug!( - "Page {} blk {} at {}({}) not found", - tag.rel, tag.blknum, req_lsn, lsn - ); - Ok(Bytes::from_static(&ZERO_PAGE)) - /* return Err("could not find page image")?; */ - } - - /// - /// Get size of relation at given LSN. - /// - fn get_rel_size(&self, rel: RelTag, lsn: Lsn) -> Result { - let lsn = self.wait_lsn(lsn)?; - self.relsize_get_nowait(rel, lsn) - } - - /// - /// Does relation exist at given LSN? - /// - /// FIXME: this actually returns true, if the relation exists at *any* LSN - fn get_rel_exists(&self, rel: RelTag, req_lsn: Lsn) -> Result { - let lsn = self.wait_lsn(req_lsn)?; - - let key = CacheKey { - tag: BufferTag { - rel, - blknum: u32::MAX, - }, - lsn, - }; - let mut iter = self.db.raw_iterator(); - iter.seek_for_prev(key.ser()?); - if iter.valid() { - let key = CacheKey::des(iter.key().unwrap())?; - if key.tag.rel == rel { - debug!("Relation {} exists at {}", rel, lsn); - return Ok(true); - } - } - debug!("Relation {} doesn't exist at {}", rel, lsn); - Ok(false) - } - - /// Get a list of all distinct relations in given tablespace and database. - /// - /// TODO: This implementation is very inefficient, it scans - /// through all entries in the given database. In practice, this - /// is used for CREATE DATABASE, and usually the template database is small. - /// But if it's not, this will be slow. - fn list_rels<'a>(&'a self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { - trace!("list_rels spcnode {} dbnode {} at {}", spcnode, dbnode, lsn); - - let mut rels: HashSet = HashSet::new(); - - let searchkey = CacheKey { - tag: BufferTag { - rel: RelTag { - spcnode: spcnode, - dbnode: dbnode, - relnode: 0, - forknum: 0u8, - }, - blknum: 0, - }, - lsn: Lsn(0), - }; - let mut iter = self.db.raw_iterator(); - iter.seek(searchkey.ser()?); - while iter.valid() { - let key = CacheKey::des(iter.key().unwrap())?; - if key.tag.rel.spcnode != spcnode || key.tag.rel.dbnode != dbnode { - break; - } - - if key.lsn < lsn { - rels.insert(key.tag.rel); - } - iter.next(); - } - - Ok(rels) - } - - // Other public functions, for updating the repository. - // These are used by the WAL receiver and WAL redo. - - /// - /// Adds a WAL record to the repository - /// - fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { - let lsn = rec.lsn; - let key = CacheKey { tag, lsn }; - - let content = CacheEntryContent::WALRecord(rec); - - let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); - let _res = self.db.put(serialized_key, content.to_bytes()); - trace!( - "put_wal_record rel {} blk {} at {}", - tag.rel, - tag.blknum, - lsn - ); - - self.num_entries.fetch_add(1, Ordering::Relaxed); - self.num_wal_records.fetch_add(1, Ordering::Relaxed); - } - - /// - /// Adds a relation-wide WAL record (like truncate) to the repository, - /// associating it with all pages started with specified block number - /// - fn put_truncation(&self, rel: RelTag, lsn: Lsn, nblocks: u32) -> Result<()> { - // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(); - let old_rel_size = self.relsize_get_nowait(rel, last_lsn)?; - - let content = CacheEntryContent::Truncation; - // set new relation size - trace!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); - - for blknum in nblocks..old_rel_size { - let key = CacheKey { - tag: BufferTag { rel, blknum }, - lsn, - }; - trace!("put_wal_record lsn: {}", key.lsn); - let _res = self.db.put(key.ser()?, content.to_bytes()); - } - let n = (old_rel_size - nblocks) as u64; - self.num_entries.fetch_add(n, Ordering::Relaxed); - self.num_wal_records.fetch_add(n, Ordering::Relaxed); - Ok(()) - } - - /// - /// Memorize a full image of a page version - /// - fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { - let img_len = img.len(); - let key = CacheKey { tag, lsn }; - let content = CacheEntryContent::PageImage(img); - - let mut val_buf = content.to_bytes(); - - // Zero size of page image indicates that page can be removed - if img_len == 0 { - if (val_buf[0] & UNUSED_VERSION_FLAG) != 0 { - // records already marked for deletion - return; - } else { - // delete truncated multixact page - val_buf[0] |= UNUSED_VERSION_FLAG; - } - } - - trace!("put_wal_record lsn: {}", key.lsn); - let serialized_key = key.ser().expect("serialize CacheKey should always succeed"); - let _res = self.db.put(serialized_key, content.to_bytes()); - - trace!( - "put_page_image rel {} blk {} at {}", - tag.rel, - tag.blknum, - lsn - ); - self.num_page_images.fetch_add(1, Ordering::Relaxed); - } - - /// Remember that WAL has been received and added to the timeline up to the given LSN - fn advance_last_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last valid LSN backwards (was {}, new {})", - old, lsn - ); - } - } - - /// - /// Remember the (end of) last valid WAL record remembered for the timeline. - /// - /// NOTE: this updates last_valid_lsn as well. - /// - fn advance_last_record_lsn(&self, lsn: Lsn) { - // Can't move backwards. - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old <= lsn); - - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old, lsn - ); - } - } - - fn get_last_record_lsn(&self) -> Lsn { - self.last_record_lsn.load() - } - - fn init_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - assert!(old == Lsn(0)); - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old == Lsn(0)); - } - - fn get_last_valid_lsn(&self) -> Lsn { - self.last_valid_lsn.load() - } - - // Flush all the changes written so far with PUT functions to disk. - // RocksDB writes out things as we go (?), so we don't need to do much here. We just - // write out the last valid and record LSNs. - fn checkpoint(&self) -> Result<()> { - let last_valid_lsn = self.last_valid_lsn.load(); - self.db - .put(LAST_VALID_LSN_KEY.ser()?, last_valid_lsn.to_string())?; - self.db.put( - LAST_VALID_RECORD_LSN_KEY.ser()?, - self.last_record_lsn.load().to_string(), - )?; - - trace!("checkpoint at {}", last_valid_lsn); - - Ok(()) - } - - // - // Get statistics to be displayed in the user interface. - // - // FIXME - /* - fn get_stats(&self) -> TimelineStats { - TimelineStats { - num_entries: self.num_entries.load(Ordering::Relaxed), - num_page_images: self.num_page_images.load(Ordering::Relaxed), - num_wal_records: self.num_wal_records.load(Ordering::Relaxed), - num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - } - } - */ -} diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index 4cfcdc9898..0d7e4ae508 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -145,7 +145,7 @@ fn import_relfile( }, blknum, }; - timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf)); + timeline.put_page_image(tag, lsn, Bytes::copy_from_slice(&buf))?; /* if oldest_lsn == 0 || p.lsn < oldest_lsn { oldest_lsn = p.lsn; @@ -273,7 +273,7 @@ pub fn save_decoded_record( main_data_offset: decoded.main_data_offset as u32, }; - timeline.put_wal_record(tag, rec); + timeline.put_wal_record(tag, rec)?; } // Handle a few special record types @@ -359,7 +359,7 @@ fn save_create_database( info!("copying block {:?} to {:?}", src_key, dst_key); - timeline.put_page_image(dst_key, lsn, content); + timeline.put_page_image(dst_key, lsn, content)?; num_blocks_copied += 1; } diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs new file mode 100644 index 0000000000..c5e461031b --- /dev/null +++ b/pageserver/src/rocksdb_storage.rs @@ -0,0 +1,208 @@ +//! +//! An implementation of the ObjectStore interface, backed by RocksDB +//! +use crate::object_store::{ObjectKey, ObjectStore}; +use crate::repository::{BufferTag, RelTag}; +use crate::PageServerConf; +use crate::ZTimelineId; +use anyhow::{bail, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::Lsn; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct StorageKey { + obj_key: ObjectKey, + lsn: Lsn, +} + +pub struct RocksObjectStore { + _conf: &'static PageServerConf, + + // RocksDB handle + db: rocksdb::DB, +} + +impl ObjectStore for RocksObjectStore { + fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result> { + let val = self.db.get(StorageKey::ser(&StorageKey { + obj_key: key.clone(), + lsn, + })?)?; + if let Some(val) = val { + Ok(val) + } else { + bail!("could not find page {:?}", key); + } + } + + fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> { + self.db.put( + StorageKey::ser(&StorageKey { + obj_key: key.clone(), + lsn, + })?, + value, + )?; + Ok(()) + } + + /// Iterate through page versions of given page, starting from the given LSN. + /// The versions are walked in descending LSN order. + fn object_versions<'a>( + &'a self, + key: &ObjectKey, + lsn: Lsn, + ) -> Result)> + 'a>> { + let iter = RocksObjectVersionIter::new(&self.db, key, lsn)?; + Ok(Box::new(iter)) + } + + /// Get a list of all distinct relations in given tablespace and database. + /// + /// TODO: This implementation is very inefficient, it scans + /// through all entries in the given database. In practice, this + /// is used for CREATE DATABASE, and usually the template database is small. + /// But if it's not, this will be slow. + fn list_rels( + &self, + timelineid: ZTimelineId, + spcnode: u32, + dbnode: u32, + lsn: Lsn, + ) -> Result> { + // FIXME: This scans everything. Very slow + + let mut rels: HashSet = HashSet::new(); + + let searchkey = StorageKey { + obj_key: ObjectKey { + timeline: timelineid, + buf_tag: BufferTag { + rel: RelTag { + spcnode, + dbnode, + relnode: 0, + forknum: 0u8, + }, + blknum: 0, + }, + }, + lsn: Lsn(0), + }; + let mut iter = self.db.raw_iterator(); + iter.seek(searchkey.ser()?); + while iter.valid() { + let key = StorageKey::des(iter.key().unwrap())?; + if key.obj_key.buf_tag.rel.spcnode != spcnode + || key.obj_key.buf_tag.rel.dbnode != dbnode + { + break; + } + + if key.lsn < lsn { + rels.insert(key.obj_key.buf_tag.rel); + } + iter.next(); + } + + Ok(rels) + } +} + +impl RocksObjectStore { + /// Open a RocksDB database. + pub fn open(conf: &'static PageServerConf) -> Result { + let path = conf.workdir.join("rocksdb-storage"); + let db = rocksdb::DB::open(&Self::get_rocksdb_opts(), path)?; + + let storage = RocksObjectStore { _conf: conf, db }; + Ok(storage) + } + + /// Create a new, empty RocksDB database. + pub fn create(conf: &'static PageServerConf) -> Result { + let path = conf.workdir.join("rocksdb-storage"); + std::fs::create_dir(&path)?; + + let mut opts = Self::get_rocksdb_opts(); + opts.create_if_missing(true); + opts.set_error_if_exists(true); + let db = rocksdb::DB::open(&opts, &path)?; + + let obj_store = RocksObjectStore { _conf: conf, db }; + Ok(obj_store) + } + + /// common options used by `open` and `create` + fn get_rocksdb_opts() -> rocksdb::Options { + let mut opts = rocksdb::Options::default(); + opts.set_use_fsync(true); + opts.set_compression_type(rocksdb::DBCompressionType::Lz4); + + // FIXME + /* + opts.set_compaction_filter("ttl", move |_level: u32, _key: &[u8], val: &[u8]| { + if (val[0] & UNUSED_VERSION_FLAG) != 0 { + rocksdb::compaction_filter::Decision::Remove + } else { + rocksdb::compaction_filter::Decision::Keep + } + }); + */ + opts + } +} + +/// +/// Iterator for `object_versions`. Returns all page versions of a given block, in +/// reverse LSN order. +/// +struct RocksObjectVersionIter<'a> { + obj_key: ObjectKey, + dbiter: rocksdb::DBRawIterator<'a>, + first_call: bool, +} +impl<'a> RocksObjectVersionIter<'a> { + fn new( + db: &'a rocksdb::DB, + obj_key: &ObjectKey, + lsn: Lsn, + ) -> Result> { + let key = StorageKey { + obj_key: obj_key.clone(), + lsn, + }; + let mut dbiter = db.raw_iterator(); + dbiter.seek_for_prev(StorageKey::ser(&key)?); // locate last entry + Ok(RocksObjectVersionIter { + first_call: true, + obj_key: obj_key.clone(), + dbiter, + }) + } +} +impl<'a> Iterator for RocksObjectVersionIter<'a> { + type Item = (Lsn, Vec); + + fn next(&mut self) -> std::option::Option { + if self.first_call { + self.first_call = false; + } else { + self.dbiter.prev(); // walk backwards + } + + if !self.dbiter.valid() { + return None; + } + let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); + if key.obj_key.buf_tag != self.obj_key.buf_tag { + return None; + } + let val = self.dbiter.value().unwrap(); + let result = val.to_vec(); + + Some((key.lsn, result)) + } +} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 3110ab26f4..c24cf64690 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -186,10 +186,6 @@ fn walreceiver_main( while let Some((lsn, recdata)) = waldecoder.poll_decode()? { let decoded = decode_wal_record(recdata.clone()); restore_local_repo::save_decoded_record(&*timeline, decoded, recdata, lsn)?; - - // Now that this record has been handled, let the page cache know that - // it is up-to-date to this LSN - timeline.advance_last_record_lsn(lsn); last_rec_lsn = lsn; } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index f1101e1e82..e2b0761481 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -259,7 +259,7 @@ impl PostgresRedoManagerInternal { let result: Result; - trace!( + debug!( "applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(),