diff --git a/Cargo.lock b/Cargo.lock index 6d785fa9af..75483c0a18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,25 +152,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bindgen" -version = "0.57.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd4865004a46a0aafb2a0a5eb19d3c9fc46ee5f063a6cfc605c69ac9ecf5263d" -dependencies = [ - "bitflags", - "cexpr 0.4.0", - "clang-sys", - "lazy_static", - "lazycell", - "peeking_take_while", - "proc-macro2", - "quote", - "regex", - "rustc-hash", - "shlex 0.1.1", -] - [[package]] name = "bindgen" version = "0.59.1" @@ -178,7 +159,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "453c49e5950bb0eb63bb3df640e31618846c89d5b7faa54040d76e98e0134375" dependencies = [ "bitflags", - "cexpr 0.5.0", + "cexpr", "clang-sys", "clap", "env_logger", @@ -190,7 +171,7 @@ dependencies = [ "quote", "regex", "rustc-hash", - "shlex 1.0.0", + "shlex", "which", ] @@ -265,18 +246,6 @@ name = "cc" version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70cc2f62c6ce1868963827bd677764c62d07c3d9a3e1fb1177ee1a9ab199eb2" -dependencies = [ - "jobserver", -] - -[[package]] -name = "cexpr" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4aedb84272dbe89af497cf81375129abda4fc0a9e7c5d317498c15cc30c0d27" -dependencies = [ - "nom 5.1.2", -] [[package]] name = "cexpr" @@ -284,7 +253,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db507a7679252d2276ed0dd8113c6875ec56d3089f9225b2b42c30cc1f8e5c89" dependencies = [ - "nom 6.1.2", + "nom", ] [[package]] @@ -903,15 +872,6 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" -[[package]] -name = "jobserver" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "972f5ae5d1cb9c6ae417789196c803205313edde988685da5e3aae0827b9e7fd" -dependencies = [ - "libc", -] - [[package]] name = "js-sys" version = "0.3.51" @@ -963,18 +923,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "librocksdb-sys" -version = "6.17.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5da125e1c0f22c7cae785982115523a0738728498547f415c9054cb17c7e89f9" -dependencies = [ - "bindgen 0.57.0", - "cc", - "glob", - "libc", -] - [[package]] name = "lock_api" version = "0.4.4" @@ -1109,16 +1057,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nom" -version = "5.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" -dependencies = [ - "memchr", - "version_check", -] - [[package]] name = "nom" version = "6.1.2" @@ -1260,7 +1198,6 @@ dependencies = [ "postgres_ffi", "rand", "regex", - "rocksdb", "routerify", "rust-s3", "scopeguard", @@ -1442,7 +1379,7 @@ name = "postgres_ffi" version = "0.1.0" dependencies = [ "anyhow", - "bindgen 0.59.1", + "bindgen", "byteorder", "bytes", "chrono", @@ -1683,16 +1620,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "rocksdb" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c749134fda8bfc90d0de643d59bfc841dcb3ac8a1062e12b6754bd60235c48b3" -dependencies = [ - "libc", - "librocksdb-sys", -] - [[package]] name = "routerify" version = "2.2.0" @@ -1916,12 +1843,6 @@ dependencies = [ "opaque-debug", ] -[[package]] -name = "shlex" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" - [[package]] name = "shlex" version = "1.0.0" diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 10e70c1485..7daaa5e7be 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -42,7 +42,7 @@ pub struct LocalEnv { #[serde(with = "hex")] pub tenantid: ZTenantId, - // Repository format, 'rocksdb' or 'layered' or None for default + // Repository format, only 'layered' supported currently, or None for default pub repository_format: Option, // jwt auth token used for communication with pageserver diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 460514dc73..e5cf9b43ee 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -30,8 +30,6 @@ tokio-stream = { version = "0.1.5" } postgres-types = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres-protocol = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858" } -# by default rust-rocksdb tries to build a lot of compression algos. Use lz4 only for now as it is simplest dependency. -rocksdb = { version = "0.16.0", features = ["lz4"], default-features = false } routerify = "2" anyhow = "1.0" crc32c = "0.6.0" diff --git a/pageserver/README b/pageserver/README index 1c02f77052..033bdf247c 100644 --- a/pageserver/README +++ b/pageserver/README @@ -74,9 +74,8 @@ Repository corresponds to one .zenith directory. Repository is needed to manage Timelines. Each repository has associated WAL redo service. -Now we have two implementations of Repository: -- ObjectRepository uses RocksDB as a storage -- LayeredRepository uses custom storage format, described in layered_repository/README.md +There is currently only one implementation of the Repository trait: LayeredRepository. +It uses custom storage format, described in layered_repository/README.md #### Timeline Timeline is a page cache workhorse that accepts page changes @@ -89,13 +88,6 @@ Each Branch lives in a corresponding timeline and has an ancestor. To get full snapshot of data at certain moment we need to traverse timeline and its ancestors. -#### ObjectRepository -ObjectRepository implements Repository and has associated ObjectStore and WAL redo service. - -#### ObjectStore -ObjectStore is an interface for key-value store for page images and wal records. -Currently it has one implementation - RocksDB. - #### WAL redo service WAL redo service - service that runs PostgreSQL in a special wal_redo mode to apply given WAL records over an old page image and return new page image. diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 4a23d737c4..e780bd8f27 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -138,10 +138,9 @@ impl CfgFileParams { } let repository_format = match self.repository_format.as_ref() { - Some(repo_format_str) if repo_format_str == "rocksdb" => RepositoryFormat::RocksDb, Some(repo_format_str) if repo_format_str == "layered" => RepositoryFormat::Layered, Some(repo_format_str) => anyhow::bail!( - "invalid --repository-format '{}', must be 'rocksdb' or 'layered'", + "invalid --repository-format '{}', only 'layered' supported", repo_format_str ), None => RepositoryFormat::Layered, // default @@ -240,7 +239,7 @@ fn main() -> Result<()> { Arg::with_name("repository-format") .long("repository-format") .takes_value(true) - .help("Which repository implementation to use, 'rocksdb' or 'layered'"), + .help("Which repository implementation to use, only 'layered' supported currently"), ) .get_matches(); diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index ba82ee3848..5247901823 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -20,7 +20,6 @@ use log::*; use zenith_utils::lsn::Lsn; use crate::logger; -use crate::object_repository::ObjectRepository; use crate::page_cache; use crate::restore_local_repo; use crate::walredo::WalRedoManager; @@ -102,16 +101,6 @@ pub fn create_repo( RepositoryFormat::Layered => Arc::new( crate::layered_repository::LayeredRepository::new(conf, wal_redo_manager, tenantid), ), - RepositoryFormat::RocksDb => { - let obj_store = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?; - - Arc::new(ObjectRepository::new( - conf, - Arc::new(obj_store), - wal_redo_manager, - tenantid, - )) - } }; // Load data into pageserver diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 032d9a7331..a7bb17db88 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -12,15 +12,11 @@ pub mod branches; pub mod http; pub mod layered_repository; pub mod logger; -pub mod object_key; -pub mod object_repository; -pub mod object_store; pub mod page_cache; pub mod page_service; pub mod relish; pub mod repository; pub mod restore_local_repo; -pub mod rocksdb_storage; pub mod waldecoder; pub mod walreceiver; pub mod walredo; @@ -63,7 +59,6 @@ pub struct PageServerConf { #[derive(Debug, Clone, PartialEq)] pub enum RepositoryFormat { Layered, - RocksDb, } impl PageServerConf { diff --git a/pageserver/src/object_key.rs b/pageserver/src/object_key.rs deleted file mode 100644 index 42f6156c78..0000000000 --- a/pageserver/src/object_key.rs +++ /dev/null @@ -1,49 +0,0 @@ -//! -//! Common structs shared by object_repository.rs and object_store.rs. -//! - -use crate::relish::RelishTag; -use serde::{Deserialize, Serialize}; -use zenith_utils::zid::ZTimelineId; - -/// -/// ObjectKey is the key type used to identify objects stored in an object -/// repository. It is shared between object_repository.rs and object_store.rs. -/// It is mostly opaque to ObjectStore, it just stores and retrieves objects -/// using the key given by the caller. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ObjectKey { - pub timeline: ZTimelineId, - pub tag: ObjectTag, -} - -/// -/// ObjectTag is a part of ObjectKey that is specific to the type of -/// the stored object. -/// -/// NB: the order of the enum values is significant! In particular, -/// rocksdb_storage.rs assumes that TimelineMetadataTag is first -/// -/// Buffer is the kind of object that is accessible by the public -/// get_page_at_lsn() / put_page_image() / put_wal_record() functions in -/// the repository.rs interface. The rest are internal objects stored in -/// the key-value store, to store various metadata. They're not directly -/// accessible outside object_repository.rs -/// -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -pub enum ObjectTag { - // dummy tag preceeding all other keys - FirstTag, - - // Metadata about a timeline. Not versioned. - TimelineMetadataTag, - - // These objects store metadata about one relish. Currently it's used - // just to track the relish's size. It's not used for non-blocky relishes - // at all. - RelationMetadata(RelishTag), - - // These are the pages exposed in the public Repository/Timeline interface. - Buffer(RelishTag, u32), -} diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs deleted file mode 100644 index 46e619237a..0000000000 --- a/pageserver/src/object_repository.rs +++ /dev/null @@ -1,1219 +0,0 @@ -//! -//! Implementation of the Repository/Timeline traits, using a key-value store -//! (ObjectStore) to for the actual storage. -//! -//! This maps the relation-oriented operations in the Timeline interface into -//! objects stored in an ObjectStore. Relation size is stored as a separate object -//! in the key-value store. If a page is written beyond the current end-of-file, -//! we also insert the new size as a new "page version" in the key-value store. -//! -//! Also, this implements Copy-on-Write forking of timelines. For each timeline, -//! we store the parent timeline in the object store, in a little metadata blob. -//! When we need to find a version of a page, we walk the timeline history backwards -//! until we find the page we're looking for, making a separate lookup into the -//! key-value store for each timeline. - -use crate::object_key::*; -use crate::object_store::ObjectStore; -use crate::relish::*; -use crate::repository::*; -use crate::restore_local_repo::import_timeline_wal; -use crate::walredo::WalRedoManager; -use crate::PageServerConf; -use anyhow::{anyhow, bail, Context, Result}; -use bytes::Bytes; -use log::*; - -use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap, HashSet}; -use std::convert::TryInto; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; -use std::time::{Duration, Instant}; -use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::{AtomicLsn, Lsn}; -use zenith_utils::seqwait::SeqWait; -use zenith_utils::zid::ZTenantId; -use zenith_utils::zid::ZTimelineId; - -/// -/// A repository corresponds to one .zenith directory. One repository holds multiple -/// timelines, forked off from the same initial call to 'initdb'. -/// -pub struct ObjectRepository { - obj_store: Arc, - conf: &'static PageServerConf, - timelines: Mutex>>, - walredo_mgr: Arc, - tenantid: ZTenantId, -} - -// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. -static TIMEOUT: Duration = Duration::from_secs(600); - -impl ObjectRepository { - pub fn new( - conf: &'static PageServerConf, - obj_store: Arc, - walredo_mgr: Arc, - tenantid: ZTenantId, - ) -> ObjectRepository { - ObjectRepository { - conf, - obj_store, - timelines: Mutex::new(HashMap::new()), - walredo_mgr, - tenantid, - } - } -} - -impl Repository for ObjectRepository { - /// Get Timeline handle for given zenith timeline ID. - fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); - - match timelines.get(&timelineid) { - Some(timeline) => Ok(timeline.clone()), - None => { - let timeline = ObjectTimeline::open( - Arc::clone(&self.obj_store), - timelineid, - self.walredo_mgr.clone(), - )?; - - // Load any new WAL after the last checkpoint into the repository. - info!( - "Loading WAL for timeline {} starting at {}", - timelineid, - timeline.get_last_record_lsn() - ); - let wal_dir = self.conf.wal_dir_path(&timelineid, &self.tenantid); - import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; - - let timeline_rc = Arc::new(timeline); - - if self.conf.gc_horizon != 0 { - ObjectTimeline::launch_gc_thread(self.conf, timeline_rc.clone()); - } - - timelines.insert(timelineid, timeline_rc.clone()); - - Ok(timeline_rc) - } - } - } - - /// Create a new, empty timeline. The caller is responsible for loading data into it - fn create_empty_timeline( - &self, - timelineid: ZTimelineId, - start_lsn: Lsn, - ) -> Result> { - let mut timelines = self.timelines.lock().unwrap(); - - // Write initial metadata key. - let metadata = MetadataEntry { - last_valid_lsn: start_lsn, - last_record_lsn: start_lsn, - prev_record_lsn: Lsn(0), - ancestor_timeline: None, - ancestor_lsn: start_lsn, - }; - let val = ObjectValue::TimelineMetadata(metadata); - self.obj_store.put( - &timeline_metadata_key(timelineid), - Lsn(0), - &ObjectValue::ser(&val)?, - )?; - - info!("Created empty timeline {}", timelineid); - - let timeline = ObjectTimeline::open( - Arc::clone(&self.obj_store), - timelineid, - self.walredo_mgr.clone(), - )?; - - let timeline_rc = Arc::new(timeline); - let r = timelines.insert(timelineid, timeline_rc.clone()); - assert!(r.is_none()); - - // don't start the garbage collector for unit tests, either. - - Ok(timeline_rc) - } - - /// Branch a timeline - fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, at_lsn: Lsn) -> Result<()> { - let src_timeline = self.get_timeline(src)?; - - trace!("branch_timeline at lsn {}", at_lsn); - // Write a metadata key, noting the ancestor of the new timeline. There is initially - // no data in it, but all the read-calls know to look into the ancestor. - let metadata = MetadataEntry { - last_valid_lsn: at_lsn, - last_record_lsn: at_lsn, - prev_record_lsn: src_timeline.get_prev_record_lsn(), - ancestor_timeline: Some(src), - ancestor_lsn: at_lsn, - }; - let val = ObjectValue::TimelineMetadata(metadata); - self.obj_store.put( - &timeline_metadata_key(dst), - Lsn(0), - &ObjectValue::ser(&val)?, - )?; - - Ok(()) - } - - fn gc_iteration( - &self, - timelineid: Option, - horizon: u64, - compact: bool, - ) -> Result { - if let Some(timelineid) = timelineid { - let timelines = self.timelines.lock().unwrap(); - - // FIXME: If the timeline isn't opened yet, we don't open it just for GC. - if let Some(timeline) = timelines.get(&timelineid) { - return timeline.gc_iteration(horizon, compact); - } - } else { - // FIXME: the object repository doesn't support GC on all timelines. Should - // iterate all the timelines here - bail!("GC of all timelines not implemented"); - } - return Ok(GcResult::default()); - } -} - -/// -/// Relation metadata (currently only size) is stored in separate record -/// in object store and is update by each put operation: we need to check if -/// new block size is greater or equal than existed relation size and if so: update -/// the relation metadata. So it requires extra object storage lookup at each -/// put operation. To avoid this lookup we maintain in-memory cache of relation metadata. -/// To prevent memory overflow metadata only of the most recent version of relation is cached. -/// If page server needs to access some older version, then object storage has to be accessed. -/// -struct RelishMetadata { - size: Option, // size of the relish (None if unlinked) - last_updated: Lsn, // lsn of last metadata update (used to determine when cache value can be used) -} - -/// -/// A handle to a specific timeline in the repository. This is the API -/// that's exposed to the rest of the system. -/// -pub struct ObjectTimeline { - timelineid: ZTimelineId, - - // Backing key-value store - obj_store: Arc, - - // WAL redo manager, for reconstructing page versions from WAL records. - walredo_mgr: Arc, - - // What page versions do we hold in the cache? If we get a request > last_valid_lsn, - // we need to wait until we receive all the WAL up to the request. The SeqWait - // provides functions for that. TODO: If we get a request for an old LSN, such that - // the versions have already been garbage collected away, we should throw an error, - // but we don't track that currently. - // - // last_record_lsn points to the end of last processed WAL record. - // It can lag behind last_valid_lsn, if the WAL receiver has received some WAL - // after the end of last record, but not the whole next record yet. In the - // page cache, we care about last_valid_lsn, but if the WAL receiver needs to - // restart the streaming, it needs to restart at the end of last record, so - // we track them separately. last_record_lsn should perhaps be in - // walreceiver.rs instead of here, but it seems convenient to keep all three - // values together. - // - last_valid_lsn: SeqWait, - last_record_lsn: AtomicLsn, - prev_record_lsn: AtomicLsn, - - ancestor_timeline: Option, - ancestor_lsn: Lsn, - - rel_meta: RwLock>, -} - -impl ObjectTimeline { - /// Open a Timeline handle. - /// - /// Loads the metadata for the timeline into memory. - fn open( - obj_store: Arc, - timelineid: ZTimelineId, - walredo_mgr: Arc, - ) -> Result { - // Load metadata into memory - let v = obj_store - .get(&timeline_metadata_key(timelineid), Lsn(0)) - .with_context(|| "timeline not found in repository")?; - let metadata = ObjectValue::des_timeline_metadata(&v)?; - - let timeline = ObjectTimeline { - timelineid, - obj_store, - walredo_mgr, - last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), - last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), - prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0), - ancestor_timeline: metadata.ancestor_timeline, - ancestor_lsn: metadata.ancestor_lsn, - rel_meta: RwLock::new(BTreeMap::new()), - }; - Ok(timeline) - } -} - -impl Timeline for ObjectTimeline { - //------------------------------------------------------------------------------ - // Public GET functions - //------------------------------------------------------------------------------ - - /// Look up given page in the cache. - fn get_page_at_lsn(&self, tag: RelishTag, blknum: u32, req_lsn: Lsn) -> Result { - let lsn = self.wait_lsn(req_lsn)?; - - self.get_page_at_lsn_nowait(tag, blknum, lsn) - } - - fn get_page_at_lsn_nowait(&self, rel: RelishTag, blknum: u32, req_lsn: Lsn) -> Result { - if !rel.is_blocky() && blknum != 0 { - bail!( - "invalid request for block {} for non-blocky relish {}", - blknum, - rel - ); - } - - // Handle truncated non-rel relishes - // We should never return a stale or zeroed page for the truncated SLRU segment. - // XXX if this will turn out to be performance critical, - // move this check out of the funciton. - // - match rel { - RelishTag::Slru { .. } | RelishTag::TwoPhase { .. } => { - if !self.get_rel_exists(rel, req_lsn).unwrap_or(false) { - trace!("{:?} at {} doesn't exist", rel, req_lsn); - return Err(anyhow!("non-rel relish doesn't exist")); - } - } - _ => (), - }; - - const ZERO_PAGE: [u8; 8192] = [0u8; 8192]; - // Look up the page entry. If it's a page image, return that. If it's a WAL record, - // ask the WAL redo service to reconstruct the page image from the WAL records. - let object_tag = ObjectTag::Buffer(rel, blknum); - let searchkey = ObjectKey { - timeline: self.timelineid, - tag: object_tag, - }; - let mut iter = self.object_versions(&*self.obj_store, &searchkey, req_lsn)?; - - if let Some((lsn, value)) = iter.next().transpose()? { - let page_img: Bytes; - - match ObjectValue::des(&value)? { - ObjectValue::Page(PageEntry::Page(img)) => { - page_img = img; - } - ObjectValue::Page(PageEntry::WALRecord(_rec)) => { - // Request the WAL redo manager to apply the WAL records for us. - let (base_img, records) = self.collect_records_for_apply(rel, blknum, lsn)?; - page_img = self - .walredo_mgr - .request_redo(rel, blknum, lsn, base_img, records)?; - - // Garbage collection assumes that we remember the materialized page - // version. Otherwise we could opt to not do it, with the downside that - // the next GetPage@LSN call of the same page version would have to - // redo the WAL again. - self.put_page_image(rel, blknum, lsn, page_img.clone(), false)?; - } - _ => bail!("Invalid object kind, expected a page entry"), - } - // FIXME: assumes little-endian. Only used for the debugging log though - let page_lsn_hi = u32::from_le_bytes(page_img.get(0..4).unwrap().try_into().unwrap()); - let page_lsn_lo = u32::from_le_bytes(page_img.get(4..8).unwrap().try_into().unwrap()); - - trace!( - "Returning page with LSN {:X}/{:X} for {:?} from {} (request {})", - page_lsn_hi, - page_lsn_lo, - object_tag, - lsn, - req_lsn - ); - return Ok(page_img); - } - trace!("page {:?} at {} not found", object_tag, req_lsn); - Ok(Bytes::from_static(&ZERO_PAGE)) - /* return Err("could not find page image")?; */ - } - - /// Get size of a relish in number of blocks - /// Return None if the relish was truncated - fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result> { - if !rel.is_physical() { - bail!( - "invalid get_relish_size request for non-physical relish {}", - rel - ); - } - - let lsn = self.wait_lsn(lsn)?; - return self.relsize_get_nowait(rel, lsn); - } - - /// Does relation exist at given LSN? - fn get_rel_exists(&self, rel: RelishTag, req_lsn: Lsn) -> Result { - if let Some(_) = self.get_relish_size(rel, req_lsn)? { - trace!("Relation {} exists at {}", rel, req_lsn); - return Ok(true); - } - - trace!("Relation {} doesn't exist at {}", rel, req_lsn); - Ok(false) - } - - /// Get a list of non-relational objects - fn list_nonrels<'a>(&'a self, lsn: Lsn) -> Result> { - // List all non-relations in this timeline. - let mut all_rels = self.obj_store.list_nonrels(self.timelineid, lsn)?; - - // Also list all nonrelations in ancestor timelines. If a nonrelation hasn't been modified - // after the fork, there will be no trace of it in the object store with the current - // timeline id. - let mut prev_timeline: Option = self.ancestor_timeline; - let mut lsn = self.ancestor_lsn; - while let Some(timeline) = prev_timeline { - let this_rels = self.obj_store.list_nonrels(timeline, lsn)?; - - for rel in this_rels { - all_rels.insert(rel); - } - - // Load ancestor metadata. - let v = self - .obj_store - .get(&timeline_metadata_key(timeline), Lsn(0)) - .with_context(|| "timeline not found in repository")?; - let metadata = ObjectValue::des_timeline_metadata(&v)?; - - prev_timeline = metadata.ancestor_timeline; - lsn = metadata.ancestor_lsn; - } - - Ok(all_rels) - } - - /// Get a list of all distinct relations in given tablespace and database. - fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { - // List all relations in this timeline. - let mut all_rels = self - .obj_store - .list_rels(self.timelineid, spcnode, dbnode, lsn)?; - - // Also list all relations in ancestor timelines. If a relation hasn't been modified - // after the fork, there will be no trace of it in the object store with the current - // timeline id. - let mut prev_timeline: Option = self.ancestor_timeline; - let mut lsn = self.ancestor_lsn; - while let Some(timeline) = prev_timeline { - let this_rels = self.obj_store.list_rels(timeline, spcnode, dbnode, lsn)?; - - for rel in this_rels { - all_rels.insert(rel); - } - - // Load ancestor metadata. - let v = self - .obj_store - .get(&timeline_metadata_key(timeline), Lsn(0)) - .with_context(|| "timeline not found in repository")?; - let metadata = ObjectValue::des_timeline_metadata(&v)?; - - prev_timeline = metadata.ancestor_timeline; - lsn = metadata.ancestor_lsn; - } - - Ok(all_rels) - } - - //------------------------------------------------------------------------------ - // Public PUT functions, to update the repository with new page versions. - // - // These are called by the WAL receiver to digest WAL records. - //------------------------------------------------------------------------------ - - /// Put a new page version that can be constructed from a WAL record - /// - /// This will implicitly extend the relation, if the page is beyond the - /// current end-of-file. - fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { - if !rel.is_blocky() && blknum != 0 { - bail!( - "invalid request for block {} for non-blocky relish {}", - blknum, - rel - ); - } - - let lsn = rec.lsn; - self.put_page_entry(&rel, blknum, lsn, PageEntry::WALRecord(rec))?; - debug!("put_wal_record {} at {}", rel, lsn); - - if rel.is_blocky() { - // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(rel, lsn)?.unwrap_or(0); - - if blknum >= old_nblocks { - let new_nblocks = blknum + 1; - - trace!( - "Extended {} from {} to {} blocks at {}", - rel, - old_nblocks, - new_nblocks, - lsn - ); - - self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(new_nblocks))?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - rel, - RelishMetadata { - size: Some(new_nblocks), - last_updated: lsn, - }, - ); - } - } else if let RelishTag::TwoPhase { xid: _ } = rel { - // This is non-blocky relish, so we put dummy relsize entry - // just to save the fact that such a segment exists at this lsn. - // GC will use this information to preserve segment if necessary. - self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(1))?; - } - Ok(()) - } - - /// Unlink relation. This method is used for marking dropped Relishes. - /// - /// Note: each SLRU segment in PostgreSQL is considered a separate relish, - /// so we can use unlink to truncate SLRU segments. - fn put_unlink(&self, rel_tag: RelishTag, lsn: Lsn) -> Result<()> { - self.put_relsize_entry(&rel_tag, lsn, RelationSizeEntry::Unlink)?; - - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - rel_tag, - RelishMetadata { - size: None, - last_updated: lsn, - }, - ); - Ok(()) - } - - /// - /// Memorize a full image of a page version - /// - fn put_page_image( - &self, - rel: RelishTag, - blknum: u32, - lsn: Lsn, - img: Bytes, - update_meta: bool, - ) -> Result<()> { - if !rel.is_blocky() && blknum != 0 { - bail!( - "invalid request for block {} for non-blocky relish {}", - blknum, - rel - ); - } - - self.put_page_entry(&rel, blknum, lsn, PageEntry::Page(img))?; - - debug!("put_page_image {} at {}", rel, lsn); - - if !update_meta { - return Ok(()); - } - - if rel.is_blocky() { - // Also check if this created or extended the file - let old_nblocks = self.relsize_get_nowait(rel, lsn)?.unwrap_or(0); - - if blknum >= old_nblocks { - let new_nblocks = blknum + 1; - - trace!( - "Extended {} from {} to {} blocks at {}", - rel, - old_nblocks, - new_nblocks, - lsn - ); - - self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(new_nblocks))?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - rel, - RelishMetadata { - size: Some(new_nblocks), - last_updated: lsn, - }, - ); - } - } else if let RelishTag::TwoPhase { xid: _ } = rel { - // This is non-blocky relish, so we put dummy relsize entry - // just to save the fact that such a segment exists at this lsn. - // GC will use this information to preserve segment if necessary. - self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(1))?; - } - Ok(()) - } - - /// - /// Adds a relation-wide WAL record (like truncate) to the repository, - /// associating it with all pages started with specified block number - /// - /// Note: This is only for regular relation truncations. - /// To truncate SLRU segments use put_unlink() function. - /// - fn put_truncation(&self, rel: RelishTag, lsn: Lsn, nblocks: u32) -> Result<()> { - match rel { - RelishTag::Relation(_) => {} - _ => bail!("invalid truncation for non-rel relish {}", rel), - }; - - info!("Truncate relation {} to {} blocks at {}", rel, nblocks, lsn); - - self.put_relsize_entry(&rel, lsn, RelationSizeEntry::Size(nblocks))?; - let mut rel_meta = self.rel_meta.write().unwrap(); - rel_meta.insert( - rel, - RelishMetadata { - size: Some(nblocks), - last_updated: lsn, - }, - ); - - Ok(()) - } - - /// Remember the all WAL before the given LSN has been processed. - /// - /// The WAL receiver calls this after the put_* functions, to indicate that - /// all WAL before this point has been digested. Before that, if you call - /// GET on an earlier LSN, it will block. - fn advance_last_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last valid LSN backwards (was {}, new {})", - old, lsn - ); - } - } - - fn get_last_valid_lsn(&self) -> Lsn { - self.last_valid_lsn.load() - } - - fn init_valid_lsn(&self, lsn: Lsn) { - let old = self.last_valid_lsn.advance(lsn); - assert!(old == Lsn(0)); - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old == Lsn(0)); - self.prev_record_lsn.store(Lsn(0)); - } - - /// Like `advance_last_valid_lsn`, but this always points to the end of - /// a WAL record, not in the middle of one. - /// - /// This must be <= last valid LSN. This is tracked separately from last - /// valid LSN, so that the WAL receiver knows where to restart streaming. - /// - /// NOTE: this updates last_valid_lsn as well. - fn advance_last_record_lsn(&self, lsn: Lsn) { - // Can't move backwards. - let old = self.last_record_lsn.fetch_max(lsn); - assert!(old <= lsn); - - // Use old value of last_record_lsn as prev_record_lsn - self.prev_record_lsn.fetch_max(Lsn((old.0 + 7) & !7)); - - // Also advance last_valid_lsn - let old = self.last_valid_lsn.advance(lsn); - // Can't move backwards. - if lsn < old { - warn!( - "attempted to move last record LSN backwards (was {}, new {})", - old, lsn - ); - } - } - fn get_last_record_lsn(&self) -> Lsn { - self.last_record_lsn.load() - } - - fn get_prev_record_lsn(&self) -> Lsn { - self.prev_record_lsn.load() - } - - /// - /// Flush to disk all data that was written with the put_* functions - /// - /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't - /// know anything about them here in the repository. - - // Flush all the changes written so far with PUT functions to disk. - // RocksDB writes out things as we go (?), so we don't need to do much here. We just - // write out the last valid and record LSNs. - fn checkpoint(&self) -> Result<()> { - let metadata = MetadataEntry { - last_valid_lsn: self.last_valid_lsn.load(), - last_record_lsn: self.last_record_lsn.load(), - prev_record_lsn: self.prev_record_lsn.load(), - ancestor_timeline: self.ancestor_timeline, - ancestor_lsn: self.ancestor_lsn, - }; - trace!("checkpoint at {}", metadata.last_valid_lsn); - - self.put_timeline_metadata_entry(metadata)?; - - Ok(()) - } -} - -impl ObjectTimeline { - /// - /// Internal function to get relation size at given LSN. - /// - /// The caller must ensure that WAL has been received up to 'lsn'. - /// - fn relsize_get_nowait(&self, rel: RelishTag, lsn: Lsn) -> Result> { - { - let rel_meta = self.rel_meta.read().unwrap(); - if let Some(meta) = rel_meta.get(&rel) { - if meta.last_updated <= lsn { - return Ok(meta.size); - } - } - } - let key = relation_size_key(self.timelineid, rel); - let mut iter = self.object_versions(&*self.obj_store, &key, lsn)?; - - if let Some((version_lsn, value)) = iter.next().transpose()? { - match ObjectValue::des_relsize(&value)? { - RelationSizeEntry::Size(nblocks) => { - trace!( - "relation {} has size {} at {} (request {})", - rel, - nblocks, - version_lsn, - lsn - ); - Ok(Some(nblocks)) - } - RelationSizeEntry::Unlink => { - trace!( - "relation {} not found; it was dropped at lsn {}", - rel, - version_lsn - ); - Ok(None) - } - } - } else { - trace!("relation {} not found at {}", rel, lsn); - Ok(None) - } - } - - /// - /// Collect all the WAL records that are needed to reconstruct a page - /// image for the given cache entry. - /// - /// Returns an old page image (if any), and a vector of WAL records to apply - /// over it. - /// - fn collect_records_for_apply( - &self, - rel: RelishTag, - blknum: u32, - lsn: Lsn, - ) -> Result<(Option, Vec)> { - let mut base_img: Option = None; - let mut records: Vec = Vec::new(); - - // Scan backwards, collecting the WAL records, until we hit an - // old page image. - let searchkey = ObjectKey { - timeline: self.timelineid, - tag: ObjectTag::Buffer(rel, blknum), - }; - let mut iter = self.object_versions(&*self.obj_store, &searchkey, lsn)?; - while let Some((_key, value)) = iter.next().transpose()? { - match ObjectValue::des_page(&value)? { - PageEntry::Page(img) => { - // We have a base image. No need to dig deeper into the list of - // records - base_img = Some(img); - break; - } - PageEntry::WALRecord(rec) => { - records.push(rec.clone()); - // If this WAL record initializes the page, no need to dig deeper. - if rec.will_init { - break; - } - } - } - } - records.reverse(); - Ok((base_img, records)) - } - - fn launch_gc_thread(conf: &'static PageServerConf, timeline_rc: Arc) { - let _gc_thread = thread::Builder::new() - .name("Garbage collection thread".into()) - .spawn(move || { - // FIXME - timeline_rc.gc_loop(conf).expect("GC thread died"); - }) - .unwrap(); - } - - fn gc_loop(&self, conf: &'static PageServerConf) -> Result<()> { - loop { - thread::sleep(conf.gc_period); - self.gc_iteration(conf.gc_horizon, false)?; - } - } - - // - // Wait until WAL has been received up to the given LSN. - // - fn wait_lsn(&self, mut lsn: Lsn) -> Result { - // When invalid LSN is requested, it means "don't wait, return latest version of the page" - // This is necessary for bootstrap. - if lsn == Lsn(0) { - let last_valid_lsn = self.last_valid_lsn.load(); - trace!( - "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - last_valid_lsn, - lsn - ); - lsn = last_valid_lsn; - } - trace!( - "Start waiting for LSN {}, valid LSN is {}", - lsn, - self.last_valid_lsn.load() - ); - self.last_valid_lsn - .wait_for_timeout(lsn, TIMEOUT) - .with_context(|| { - format!( - "Timed out while waiting for WAL record at LSN {} to arrive. valid LSN in {}", - lsn, - self.last_valid_lsn.load(), - ) - })?; - //trace!("Stop waiting for LSN {}, valid LSN is {}", lsn, self.last_valid_lsn.load()); - - Ok(lsn) - } - - /// - /// Iterate through object versions with given key, in reverse LSN order. - /// - /// This implements following the timeline history over the plain - /// ObjectStore::object_versions function, which doesn't know - /// about the relationships between timeline. - /// - fn object_versions<'a>( - &self, - obj_store: &'a dyn ObjectStore, - key: &ObjectKey, - lsn: Lsn, - ) -> Result> { - let current_iter = obj_store.object_versions(key, lsn)?; - - Ok(ObjectVersionIter { - obj_store, - object_tag: key.tag, - current_iter, - ancestor_timeline: self.ancestor_timeline, - ancestor_lsn: self.ancestor_lsn, - }) - } - - // - // Helper functions to store different kinds of objects to the underlying ObjectStore - // - fn put_page_entry(&self, tag: &RelishTag, blknum: u32, lsn: Lsn, val: PageEntry) -> Result<()> { - let key = ObjectKey { - timeline: self.timelineid, - tag: ObjectTag::Buffer(*tag, blknum), - }; - let val = ObjectValue::Page(val); - - self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?) - } - - fn put_relsize_entry(&self, tag: &RelishTag, lsn: Lsn, val: RelationSizeEntry) -> Result<()> { - let key = relation_size_key(self.timelineid, *tag); - let val = ObjectValue::RelationSize(val); - - self.obj_store.put(&key, lsn, &ObjectValue::ser(&val)?) - } - - fn put_timeline_metadata_entry(&self, val: MetadataEntry) -> Result<()> { - let key = timeline_metadata_key(self.timelineid); - let val = ObjectValue::TimelineMetadata(val); - - self.obj_store.put(&key, Lsn(0), &ObjectValue::ser(&val)?) - } - - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { - let last_lsn = self.get_last_valid_lsn(); - let mut result: GcResult = Default::default(); - - // checked_sub() returns None on overflow. - if let Some(horizon) = last_lsn.checked_sub(horizon) { - // WAL is large enough to perform GC - let now = Instant::now(); - // Iterate through all objects in timeline - for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { - result.inspected += 1; - match obj { - ObjectTag::RelationMetadata(_) => { - // Do not need to reconstruct page images, - // just delete all old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - let content = vers.1; - match ObjectValue::des(&content[..])? { - ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - result.dropped += 1; - } - _ => (), // preserve last version - } - last_version = false; - result.truncated += 1; - result.n_relations += 1; - } else { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } - } - ObjectTag::Buffer(rel, blknum) => { - if rel.is_blocky() { - // Reconstruct page at horizon unless relation was dropped - // and delete all older versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - result.truncated += 1; - last_version = false; - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - if rel_size > blknum { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(rel, blknum, lsn)?; - continue; - } - debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); - } else { - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); - continue; - } - debug!("Relation {} was dropped at {}", rel, lsn); - } - // relation was dropped or truncated so this block can be removed - } - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } else { - // versioned always materialized objects: no need to reconstruct pages - - // Remove old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - last_version = false; - // Don't preserve last version for unlinked relishes - match rel { - RelishTag::TwoPhase { .. } => { - if !self.get_rel_exists(rel, last_lsn)? { - self.obj_store.unlink(&key, lsn)?; - result.prep_deleted += 1; - } - } - // TODO treat unlinked FileNodeMap too - _ => (), - } - } else { - // delete deteriorated version - self.obj_store.unlink(&key, lsn)?; - - match rel { - RelishTag::TwoPhase { .. } => { - result.prep_deleted += 1; - } - RelishTag::Checkpoint => { - result.chkp_deleted += 1; - } - RelishTag::ControlFile => { - result.control_deleted += 1; - } - RelishTag::FileNodeMap { .. } => { - result.filenodemap_deleted += 1; - } - _ => { - bail!( - "unexpected non-blocky object found during GC {}", - rel - ); - } - }; - } - } - } - } - _ => (), // do nothing - } - } - result.elapsed = now.elapsed(); - info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", - result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); - if compact { - self.obj_store.compact(); - } - } - Ok(result) - } -} - -/// -/// We store several kinds of objects in the repository. -/// We have per-page, per-relation and per-timeline entries. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ObjectValue { - Page(PageEntry), - RelationSize(RelationSizeEntry), - TimelineMetadata(MetadataEntry), -} - -/// -/// This is what we store for each page in the object store. Use -/// ObjectTag::RelationBuffer as key. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum PageEntry { - /// Ready-made image of the block - Page(Bytes), - - /// WAL record, to be applied on top of the "previous" entry - /// - /// Some WAL records will initialize the page from scratch. For such records, - /// the 'will_init' flag is set. They don't need the previous page image before - /// applying. The 'will_init' flag is set for records containing a full-page image, - /// and for records with the BKPBLOCK_WILL_INIT flag. These differ from Page images - /// stored directly in the repository in that you still need to run the WAL redo - /// routine to generate the page image. - WALRecord(WALRecord), -} - -/// -/// In addition to page versions, we store relation size as a separate, versioned, -/// object. That way we can answer nblocks requests faster, and we also use it to -/// support relation truncation without having to add a tombstone page version for -/// each block that is truncated away. -/// -/// Use ObjectTag::RelationMetadata as the key. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum RelationSizeEntry { - Size(u32), - - /// Tombstone for a dropped relation. - Unlink, -} - -const fn relation_size_key(timelineid: ZTimelineId, rel: RelishTag) -> ObjectKey { - ObjectKey { - timeline: timelineid, - tag: ObjectTag::RelationMetadata(rel), - } -} - -/// -/// In addition to the per-page and per-relation entries, we also store -/// a little metadata blob for each timeline. This is not versioned, use -/// ObjectTag::TimelineMetadataTag with constant Lsn(0) as the key. -/// -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct MetadataEntry { - last_valid_lsn: Lsn, - last_record_lsn: Lsn, - prev_record_lsn: Lsn, - ancestor_timeline: Option, - ancestor_lsn: Lsn, -} - -const fn timeline_metadata_key(timelineid: ZTimelineId) -> ObjectKey { - ObjectKey { - timeline: timelineid, - tag: ObjectTag::TimelineMetadataTag, - } -} - -/// -/// Helper functions to deserialize ObjectValue, when the caller knows what kind of -/// a value it should be. -/// -/// There are no matching helper functions for serializing. Instead, there are -/// `put_page_entry`, `put_relsize_entry`, and `put_timeline_metadata_entry` helper -/// functions in ObjectTimeline that both construct the right kind of key and -/// serialize the value in the same call. -/// -impl ObjectValue { - fn des_page(v: &[u8]) -> Result { - match ObjectValue::des(&v)? { - ObjectValue::Page(p) => Ok(p), - _ => { - bail!("Invalid object kind, expected a page entry"); - } - } - } - - fn des_relsize(v: &[u8]) -> Result { - match ObjectValue::des(&v)? { - ObjectValue::RelationSize(rs) => Ok(rs), - _ => { - bail!("Invalid object kind, expected a relation size entry"); - } - } - } - - fn des_timeline_metadata(v: &[u8]) -> Result { - match ObjectValue::des(&v)? { - ObjectValue::TimelineMetadata(t) => Ok(t), - _ => { - bail!("Invalid object kind, expected a timeline metadata entry"); - } - } - } -} - -/// -/// Iterator for `object_versions`. Returns all page versions of a given block, in -/// reverse LSN order. This implements the traversal of ancestor timelines. If -/// a page isn't found in the most recent timeline, this iterates to the parent, -/// until a page version is found. -/// -struct ObjectVersionIter<'a> { - obj_store: &'a dyn ObjectStore, - - object_tag: ObjectTag, - - /// Iterator on the current timeline. - current_iter: Box)> + 'a>, - - /// Ancestor of the current timeline being iterated. - ancestor_timeline: Option, - ancestor_lsn: Lsn, -} - -impl<'a> Iterator for ObjectVersionIter<'a> { - type Item = Result<(Lsn, Vec)>; - - fn next(&mut self) -> Option { - self.next_result().transpose() - } -} - -impl<'a> ObjectVersionIter<'a> { - /// - /// "transposed" version of the standard Iterator::next function. - /// - /// The rust standard Iterator::next function returns an - /// Option of a Result, but it's more convenient to work with - /// Result of a Option so that you can use ? to check for errors. - /// - fn next_result(&mut self) -> Result)>> { - loop { - // If there is another entry on the current timeline, return it. - if let Some(result) = self.current_iter.next() { - return Ok(Some(result)); - } - - // Out of entries on this timeline. Move to the ancestor, if any. - if let Some(ancestor_timeline) = self.ancestor_timeline { - let searchkey = ObjectKey { - timeline: ancestor_timeline, - tag: self.object_tag, - }; - let ancestor_iter = self - .obj_store - .object_versions(&searchkey, self.ancestor_lsn)?; - - // Load the parent timeline's metadata. (We don't - // actually need it yet, only if we need to follow to - // the grandparent timeline) - let v = self - .obj_store - .get(&timeline_metadata_key(ancestor_timeline), Lsn(0)) - .with_context(|| "timeline not found in repository")?; - - let ancestor_metadata = ObjectValue::des_timeline_metadata(&v)?; - self.ancestor_timeline = ancestor_metadata.ancestor_timeline; - self.ancestor_lsn = ancestor_metadata.ancestor_lsn; - self.current_iter = ancestor_iter; - } else { - return Ok(None); - } - } - } -} diff --git a/pageserver/src/object_store.rs b/pageserver/src/object_store.rs deleted file mode 100644 index 10eb3fcea6..0000000000 --- a/pageserver/src/object_store.rs +++ /dev/null @@ -1,92 +0,0 @@ -//! Low-level key-value storage abstraction. -//! -use crate::object_key::*; -use crate::relish::*; -use anyhow::Result; -use std::collections::HashSet; -use std::iter::Iterator; -use zenith_utils::lsn::Lsn; -use zenith_utils::zid::ZTimelineId; - -/// -/// Low-level storage abstraction. -/// -/// All the data in the repository is stored in a key-value store. This trait -/// abstracts the details of the key-value store. -/// -/// A simple key-value store would support just GET and PUT operations with -/// a key, but the upper layer needs slightly complicated read operations -/// -/// The most frequently used function is 'object_versions'. It is used -/// to look up a page version. It is LSN aware, in that the caller -/// specifies an LSN, and the function returns all values for that -/// block with the same or older LSN. -/// -pub trait ObjectStore: Send + Sync { - /// - /// Store a value with given key. - /// - fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()>; - - /// Read entry with the exact given key. - /// - /// This is used for retrieving metadata with special key that doesn't - /// correspond to any real relation. - fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result>; - - /// Read key greater or equal than specified - fn get_next_key(&self, key: &ObjectKey) -> Result>; - - /// Iterate through all page versions of one object. - /// - /// Returns all page versions in descending LSN order, along with the LSN - /// of each page version. - fn object_versions<'a>( - &'a self, - key: &ObjectKey, - lsn: Lsn, - ) -> Result)> + 'a>>; - - /// Iterate through versions of all objects in a timeline. - /// - /// Returns objects in increasing key-version order. - /// Returns all versions up to and including the specified LSN. - fn objects<'a>( - &'a self, - timeline: ZTimelineId, - lsn: Lsn, - ) -> Result)>> + 'a>>; - - /// Iterate through all keys with given tablespace and database ID, and LSN <= 'lsn'. - /// Both dbnode and spcnode can be InvalidId (0) which means get all relations in tablespace/cluster - /// - /// This is used to implement 'create database' - fn list_rels( - &self, - timelineid: ZTimelineId, - spcnode: u32, - dbnode: u32, - lsn: Lsn, - ) -> Result>; - - /// Iterate through non-rel relishes - /// - /// This is used to prepare tarball for new node startup. - fn list_nonrels<'a>(&'a self, timelineid: ZTimelineId, lsn: Lsn) -> Result>; - - /// Iterate through objects tags. If nonrel_only, then only non-relationa data is iterated. - /// - /// This is used to implement GC and preparing tarball for new node startup - /// Returns objects in increasing key-version order. - fn list_objects<'a>( - &'a self, - timelineid: ZTimelineId, - lsn: Lsn, - ) -> Result + 'a>>; - - /// Unlink object (used by GC). This mehod may actually delete object or just mark it for deletion. - fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()>; - - // Compact storage and remove versions marged for deletion - fn compact(&self); -} diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index b4990eaba1..f8d6097da3 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -3,9 +3,7 @@ use crate::branches; use crate::layered_repository::LayeredRepository; -use crate::object_repository::ObjectRepository; use crate::repository::Repository; -use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::PostgresRedoManager; use crate::{PageServerConf, RepositoryFormat}; use anyhow::{anyhow, bail, Result}; @@ -43,16 +41,6 @@ pub fn init(conf: &'static PageServerConf) { LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); repo } - RepositoryFormat::RocksDb => { - let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap(); - - Arc::new(ObjectRepository::new( - conf, - Arc::new(obj_store), - Arc::new(walredo_mgr), - tenantid, - )) - } }; info!("initialized storage for tenant: {}", &tenantid); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c75c96a5de..97f71e5cdb 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -24,13 +24,13 @@ use std::{io, net::TcpStream}; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::auth::{self, JwtAuth}; use zenith_utils::auth::{Claims, Scope}; +use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::PostgresBackend; use zenith_utils::postgres_backend::{self, AuthType}; use zenith_utils::pq_proto::{ BeMessage, FeMessage, RowDescriptor, HELLO_WORLD_ROW, SINGLE_COL_ROWDESC, }; use zenith_utils::zid::{ZTenantId, ZTimelineId}; -use zenith_utils::lsn::Lsn; use crate::basebackup; use crate::branches; @@ -596,15 +596,6 @@ impl postgres_backend::Handler for PageServerHandler { let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ - RowDescriptor::int8_col(b"n_relations"), - RowDescriptor::int8_col(b"truncated"), - RowDescriptor::int8_col(b"deleted"), - RowDescriptor::int8_col(b"prep_deleted"), - RowDescriptor::int8_col(b"slru_deleted"), - RowDescriptor::int8_col(b"chkp_deleted"), - RowDescriptor::int8_col(b"control_deleted"), - RowDescriptor::int8_col(b"filenodemap_deleted"), - RowDescriptor::int8_col(b"dropped"), RowDescriptor::int8_col(b"snapshot_relfiles_total"), RowDescriptor::int8_col(b"snapshot_relfiles_needed_by_cutoff"), RowDescriptor::int8_col(b"snapshot_relfiles_needed_by_branches"), @@ -620,15 +611,6 @@ impl postgres_backend::Handler for PageServerHandler { RowDescriptor::int8_col(b"elapsed"), ]))? .write_message_noflush(&BeMessage::DataRow(&[ - Some(&result.n_relations.to_string().as_bytes()), - Some(&result.truncated.to_string().as_bytes()), - Some(&result.deleted.to_string().as_bytes()), - Some(&result.prep_deleted.to_string().as_bytes()), - Some(&result.slru_deleted.to_string().as_bytes()), - Some(&result.chkp_deleted.to_string().as_bytes()), - Some(&result.control_deleted.to_string().as_bytes()), - Some(&result.filenodemap_deleted.to_string().as_bytes()), - Some(&result.dropped.to_string().as_bytes()), Some(&result.snapshot_relfiles_total.to_string().as_bytes()), Some( &result diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index d005615bef..ef6366880d 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -55,20 +55,6 @@ pub trait Repository: Send + Sync { /// #[derive(Default)] pub struct GcResult { - // FIXME: These counters make sense for the ObjectRepository. They are not used - // by the LayeredRepository. - pub n_relations: u64, - pub inspected: u64, - pub truncated: u64, - pub deleted: u64, - pub prep_deleted: u64, // RelishTag::Twophase - pub slru_deleted: u64, // RelishTag::Slru - pub chkp_deleted: u64, // RelishTag::Checkpoint - pub control_deleted: u64, // RelishTag::ControlFile - pub filenodemap_deleted: u64, // RelishTag::FileNodeMap - pub dropped: u64, - - // These are used for the LayeredRepository instead pub snapshot_relfiles_total: u64, pub snapshot_relfiles_needed_by_cutoff: u64, pub snapshot_relfiles_needed_by_branches: u64, @@ -88,11 +74,6 @@ pub struct GcResult { impl AddAssign for GcResult { fn add_assign(&mut self, other: Self) { - self.n_relations += other.n_relations; - self.truncated += other.truncated; - self.deleted += other.deleted; - self.dropped += other.dropped; - self.snapshot_relfiles_total += other.snapshot_relfiles_total; self.snapshot_relfiles_needed_by_cutoff += other.snapshot_relfiles_needed_by_cutoff; self.snapshot_relfiles_needed_by_branches += other.snapshot_relfiles_needed_by_branches; @@ -241,8 +222,6 @@ impl WALRecord { mod tests { use super::*; use crate::layered_repository::LayeredRepository; - use crate::object_repository::ObjectRepository; - use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; use crate::{PageServerConf, RepositoryFormat}; use postgres_ffi::pg_constants; @@ -279,10 +258,7 @@ mod tests { static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); - fn get_test_repo( - test_name: &str, - repository_format: RepositoryFormat, - ) -> Result> { + fn get_test_repo(test_name: &str) -> Result> { let repo_dir = PathBuf::from(format!("../tmp_check/test_{}", test_name)); let _ = fs::remove_dir_all(&repo_dir); fs::create_dir_all(&repo_dir)?; @@ -299,7 +275,7 @@ mod tests { pg_distrib_dir: "".into(), auth_type: AuthType::Trust, auth_validation_public_key_path: None, - repository_format, + repository_format: RepositoryFormat::Layered, }; // Make a static copy of the config. This can never be free'd, but that's // OK in a test. @@ -315,35 +291,14 @@ mod tests { Arc::new(walredo_mgr), tenantid, )), - RepositoryFormat::RocksDb => { - let obj_store = RocksObjectStore::create(conf, &tenantid)?; - - Box::new(ObjectRepository::new( - conf, - Arc::new(obj_store), - Arc::new(walredo_mgr), - tenantid, - )) - } }; Ok(repo) } - /// Test get_relsize() and truncation. #[test] - fn test_relsize_rocksdb() -> Result<()> { - let repo = get_test_repo("test_relsize_rocksdb", RepositoryFormat::RocksDb)?; - test_relsize(&*repo) - } - - #[test] - fn test_relsize_layered() -> Result<()> { - let repo = get_test_repo("test_relsize_layered", RepositoryFormat::Layered)?; - test_relsize(&*repo) - } - - fn test_relsize(repo: &dyn Repository) -> Result<()> { + fn test_relsize() -> Result<()> { + let repo = get_test_repo("test_relsize")?; // get_timeline() with non-existent timeline id should fail //repo.get_timeline("11223344556677881122334455667788"); @@ -428,22 +383,9 @@ mod tests { /// Test get_relsize() and truncation with a file larger than 1 GB, so that it's /// split into multiple 1 GB segments in Postgres. - /// - /// This isn't very interesting with the RocksDb implementation, as we don't pay - /// any attention to Postgres segment boundaries there. #[test] - fn test_large_rel_rocksdb() -> Result<()> { - let repo = get_test_repo("test_large_rel_rocksdb", RepositoryFormat::RocksDb)?; - test_large_rel(&*repo) - } - - #[test] - fn test_large_rel_layered() -> Result<()> { - let repo = get_test_repo("test_large_rel_layered", RepositoryFormat::Layered)?; - test_large_rel(&*repo) - } - - fn test_large_rel(repo: &dyn Repository) -> Result<()> { + fn test_large_rel() -> Result<()> { + let repo = get_test_repo("test_large_rel")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; @@ -498,22 +440,12 @@ mod tests { Ok(()) } - #[test] - fn test_branch_rocksdb() -> Result<()> { - let repo = get_test_repo("test_branch_rocksdb", RepositoryFormat::RocksDb)?; - test_branch(&*repo) - } - - #[test] - fn test_branch_layered() -> Result<()> { - let repo = get_test_repo("test_branch_layered", RepositoryFormat::Layered)?; - test_branch(&*repo) - } - /// /// Test branch creation /// - fn test_branch(repo: &dyn Repository) -> Result<()> { + #[test] + fn test_branch() -> Result<()> { + let repo = get_test_repo("test_branch")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; diff --git a/pageserver/src/rocksdb_storage.rs b/pageserver/src/rocksdb_storage.rs deleted file mode 100644 index 41ed86d76f..0000000000 --- a/pageserver/src/rocksdb_storage.rs +++ /dev/null @@ -1,475 +0,0 @@ -//! -//! An implementation of the ObjectStore interface, backed by RocksDB -//! -use crate::object_key::*; -use crate::object_store::ObjectStore; -use crate::relish::*; -use crate::PageServerConf; -use anyhow::{bail, Result}; -use serde::{Deserialize, Serialize}; -use std::collections::HashSet; -use std::sync::{Arc, Mutex}; -use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::Lsn; -use zenith_utils::zid::ZTenantId; -use zenith_utils::zid::ZTimelineId; - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct StorageKey { - obj_key: ObjectKey, - lsn: Lsn, -} - -impl StorageKey { - /// The first key for a given timeline - fn timeline_start(timeline: ZTimelineId) -> Self { - Self { - obj_key: ObjectKey { - timeline, - tag: ObjectTag::TimelineMetadataTag, - }, - lsn: Lsn(0), - } - } -} - -/// -/// RocksDB very inefficiently delete random record. Instead of it we have to use merge -/// filter, which allows to throw away records at LSM merge phase. -/// Unfortunately, it is hard (if ever possible) to determine whether version can be removed -/// at merge time. Version ca be removed if: -/// 1. It is above PITR horizon (we need to get current LSN and gc_horizon from config) -/// 2. Page is reconstructed at horizon (all WAL records above horizon are applied and can be removed) -/// -/// So we have GC process which reconstructs pages at horizon and mark deteriorated WAL record -/// for deletion. To mark object for deletion we can either set some flag in object itself. -/// But it is complicated with new object value format, because RocksDB storage knows nothing about -/// this format. Also updating whole record just to set one bit seems to be inefficient in any case. -/// This is why we keep keys of marked for deletion versions in HashSet in memory. -/// When LSM merge filter found key in this map, it removes it from the set preventing memory overflow. -/// -struct GarbageCollector { - garbage: Mutex>>, -} - -impl GarbageCollector { - fn new() -> GarbageCollector { - GarbageCollector { - garbage: Mutex::new(HashSet::new()), - } - } - - /// Called by GC to mark version as delete - fn mark_for_deletion(&self, key: &[u8]) { - let mut garbage = self.garbage.lock().unwrap(); - garbage.insert(key.to_vec()); - } - - /// Called by LSM merge filter. If it finds key in the set, then - /// it doesn't merge it and removes from this set. - fn was_deleted(&self, key: &[u8]) -> bool { - let key = key.to_vec(); - let mut garbage = self.garbage.lock().unwrap(); - garbage.remove(&key) - } -} - -pub struct RocksObjectStore { - _conf: &'static PageServerConf, - - // RocksDB handle - db: rocksdb::DB, - gc: Arc, -} - -impl ObjectStore for RocksObjectStore { - fn get(&self, key: &ObjectKey, lsn: Lsn) -> Result> { - let val = self.db.get(StorageKey::ser(&StorageKey { - obj_key: key.clone(), - lsn, - })?)?; - if let Some(val) = val { - Ok(val) - } else { - bail!("could not find page {:?}", key); - } - } - - fn get_next_key(&self, key: &ObjectKey) -> Result> { - let mut iter = self.db.raw_iterator(); - let search_key = StorageKey { - obj_key: key.clone(), - lsn: Lsn(0), - }; - iter.seek(search_key.ser()?); - if !iter.valid() { - Ok(None) - } else { - let key = StorageKey::des(iter.key().unwrap())?; - Ok(Some(key.obj_key.clone())) - } - } - - fn put(&self, key: &ObjectKey, lsn: Lsn, value: &[u8]) -> Result<()> { - self.db.put( - StorageKey::ser(&StorageKey { - obj_key: key.clone(), - lsn, - })?, - value, - )?; - Ok(()) - } - - fn unlink(&self, key: &ObjectKey, lsn: Lsn) -> Result<()> { - self.gc.mark_for_deletion(&StorageKey::ser(&StorageKey { - obj_key: key.clone(), - lsn, - })?); - Ok(()) - } - - /// Iterate through page versions of given page, starting from the given LSN. - /// The versions are walked in descending LSN order. - fn object_versions<'a>( - &'a self, - key: &ObjectKey, - lsn: Lsn, - ) -> Result)> + 'a>> { - let iter = RocksObjectVersionIter::new(&self.db, key, lsn)?; - Ok(Box::new(iter)) - } - - /// Iterate through all timeline objects - fn list_objects<'a>( - &'a self, - timeline: ZTimelineId, - lsn: Lsn, - ) -> Result + 'a>> { - let iter = RocksObjectIter::new(&self.db, timeline, lsn)?; - Ok(Box::new(iter)) - } - - /// Get a list of all distinct relations in given tablespace and database. - /// - /// TODO: This implementation is very inefficient, it scans - /// through all entries in the given database. In practice, this - /// is used for CREATE DATABASE, and usually the template database is small. - /// But if it's not, this will be slow. - fn list_rels( - &self, - timelineid: ZTimelineId, - spcnode: u32, - dbnode: u32, - lsn: Lsn, - ) -> Result> { - // FIXME: This scans everything. Very slow - - let mut rels: HashSet = HashSet::new(); - - let mut search_rel_tag = RelTag { - spcnode, - dbnode, - relnode: 0, - forknum: 0u8, - }; - let mut iter = self.db.raw_iterator(); - loop { - let search_key = StorageKey { - obj_key: ObjectKey { - timeline: timelineid, - tag: ObjectTag::RelationMetadata(RelishTag::Relation(search_rel_tag)), - }, - lsn: Lsn(0), - }; - iter.seek(search_key.ser()?); - if !iter.valid() { - break; - } - let key = StorageKey::des(iter.key().unwrap())?; - - if let ObjectTag::RelationMetadata(RelishTag::Relation(rel_tag)) = key.obj_key.tag { - if spcnode != 0 && rel_tag.spcnode != spcnode - || dbnode != 0 && rel_tag.dbnode != dbnode - { - break; - } - if key.lsn <= lsn { - // visible in this snapshot - rels.insert(rel_tag); - } - search_rel_tag = rel_tag; - // skip to next relation - // FIXME: What if relnode is u32::MAX ? - search_rel_tag.relnode += 1; - } else { - // no more relation metadata entries - break; - } - } - - Ok(rels) - } - - /// Get a list of all distinct NON-relations in timeline - /// that are visible at given lsn. - /// - /// TODO: This implementation is very inefficient, it scans - /// through all non-rel page versions in the system. In practice, this - /// is used when initializing a new compute node, and the non-rel files - /// are never very large nor change very frequently, so this will do for now. - fn list_nonrels(&self, timelineid: ZTimelineId, lsn: Lsn) -> Result> { - let mut rels: HashSet = HashSet::new(); - - let search_key = StorageKey { - obj_key: ObjectKey { - timeline: timelineid, - tag: ObjectTag::Buffer(FIRST_NONREL_RELISH_TAG, 0), - }, - lsn: Lsn(0), - }; - - let mut iter = self.db.raw_iterator(); - iter.seek(search_key.ser()?); - while iter.valid() { - let key = StorageKey::des(iter.key().unwrap())?; - - if key.obj_key.timeline != timelineid { - // reached end of this timeline in the store - break; - } - - if let ObjectTag::Buffer(rel_tag, _blknum) = key.obj_key.tag { - if key.lsn <= lsn { - // visible in this snapshot - rels.insert(rel_tag); - } - } - // TODO: we could skip to next relation here like we do in list_rels(), - // but hopefully there are not that many SLRU segments or other non-rel - // entries for it to matter. - iter.next(); - } - - Ok(rels) - } - - /// Iterate through versions of all objects in a timeline. - /// - /// Returns objects in increasing key-version order. - /// Returns all versions up to and including the specified LSN. - fn objects<'a>( - &'a self, - timeline: ZTimelineId, - lsn: Lsn, - ) -> Result)>> + 'a>> { - let start_key = StorageKey::timeline_start(timeline); - let start_key_bytes = StorageKey::ser(&start_key)?; - let iter = self.db.iterator(rocksdb::IteratorMode::From( - &start_key_bytes, - rocksdb::Direction::Forward, - )); - - Ok(Box::new(RocksObjects { - iter, - timeline, - lsn, - })) - } - - fn compact(&self) { - self.db.compact_range::<&[u8], &[u8]>(None, None); - } -} - -impl RocksObjectStore { - /// Open a RocksDB database. - pub fn open(conf: &'static PageServerConf, tenantid: &ZTenantId) -> Result { - let opts = Self::get_rocksdb_opts(); - let obj_store = Self::new(conf, opts, tenantid)?; - Ok(obj_store) - } - - /// Create a new, empty RocksDB database. - pub fn create(conf: &'static PageServerConf, tenantid: &ZTenantId) -> Result { - let path = conf.tenant_path(&tenantid).join("rocksdb-storage"); - std::fs::create_dir(&path)?; - - let mut opts = Self::get_rocksdb_opts(); - opts.create_if_missing(true); - opts.set_error_if_exists(true); - let obj_store = Self::new(conf, opts, tenantid)?; - Ok(obj_store) - } - - fn new( - conf: &'static PageServerConf, - mut opts: rocksdb::Options, - tenantid: &ZTenantId, - ) -> Result { - let path = conf.tenant_path(&tenantid).join("rocksdb-storage"); - let gc = Arc::new(GarbageCollector::new()); - let gc_ref = gc.clone(); - opts.set_compaction_filter("ttl", move |_level: u32, key: &[u8], _val: &[u8]| { - if gc_ref.was_deleted(key) { - rocksdb::compaction_filter::Decision::Remove - } else { - rocksdb::compaction_filter::Decision::Keep - } - }); - let db = rocksdb::DB::open(&opts, &path)?; - let obj_store = RocksObjectStore { - _conf: conf, - db, - gc, - }; - Ok(obj_store) - } - - /// common options used by `open` and `create` - fn get_rocksdb_opts() -> rocksdb::Options { - let mut opts = rocksdb::Options::default(); - opts.set_use_fsync(true); - opts.set_compression_type(rocksdb::DBCompressionType::Lz4); - opts - } -} - -/// -/// Iterator for `object_versions`. Returns all page versions of a given block, in -/// reverse LSN order. -/// -struct RocksObjectVersionIter<'a> { - obj_key: ObjectKey, - dbiter: rocksdb::DBRawIterator<'a>, - first_call: bool, -} -impl<'a> RocksObjectVersionIter<'a> { - fn new( - db: &'a rocksdb::DB, - obj_key: &ObjectKey, - lsn: Lsn, - ) -> Result> { - let key = StorageKey { - obj_key: obj_key.clone(), - lsn, - }; - let mut dbiter = db.raw_iterator(); - dbiter.seek_for_prev(StorageKey::ser(&key)?); // locate last entry - Ok(RocksObjectVersionIter { - first_call: true, - obj_key: obj_key.clone(), - dbiter, - }) - } -} -impl<'a> Iterator for RocksObjectVersionIter<'a> { - type Item = (Lsn, Vec); - - fn next(&mut self) -> std::option::Option { - if self.first_call { - self.first_call = false; - } else { - self.dbiter.prev(); // walk backwards - } - - if !self.dbiter.valid() { - return None; - } - let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); - if key.obj_key.tag != self.obj_key.tag { - return None; - } - let val = self.dbiter.value().unwrap(); - let result = val.to_vec(); - - Some((key.lsn, result)) - } -} - -struct RocksObjects<'r> { - iter: rocksdb::DBIterator<'r>, - timeline: ZTimelineId, - lsn: Lsn, -} - -impl<'r> Iterator for RocksObjects<'r> { - // TODO consider returning Box<[u8]> - type Item = Result<(ObjectTag, Lsn, Vec)>; - - fn next(&mut self) -> Option { - self.next_result().transpose() - } -} - -impl<'r> RocksObjects<'r> { - fn next_result(&mut self) -> Result)>> { - for (key_bytes, v) in &mut self.iter { - let key = StorageKey::des(&key_bytes)?; - - if key.obj_key.timeline != self.timeline { - return Ok(None); - } - - if key.lsn > self.lsn { - // TODO can speed up by seeking iterator - continue; - } - - return Ok(Some((key.obj_key.tag, key.lsn, v.to_vec()))); - } - - Ok(None) - } -} - -/// -/// Iterator for `list_objects`. Returns all objects preceeding specified LSN -/// -struct RocksObjectIter<'a> { - timeline: ZTimelineId, - key: StorageKey, - lsn: Lsn, - dbiter: rocksdb::DBRawIterator<'a>, -} -impl<'a> RocksObjectIter<'a> { - fn new(db: &'a rocksdb::DB, timeline: ZTimelineId, lsn: Lsn) -> Result> { - let key = StorageKey { - obj_key: ObjectKey { - timeline, - tag: ObjectTag::FirstTag, - }, - lsn: Lsn(0), - }; - let dbiter = db.raw_iterator(); - Ok(RocksObjectIter { - key, - timeline, - lsn, - dbiter, - }) - } -} -impl<'a> Iterator for RocksObjectIter<'a> { - type Item = ObjectTag; - - fn next(&mut self) -> std::option::Option { - loop { - self.dbiter.seek(StorageKey::ser(&self.key).unwrap()); - if !self.dbiter.valid() { - return None; - } - let key = StorageKey::des(self.dbiter.key().unwrap()).unwrap(); - if key.obj_key.timeline != self.timeline { - // End of this timeline - return None; - } - self.key = key.clone(); - self.key.lsn = Lsn(u64::MAX); // next seek should skip all versions - if key.lsn <= self.lsn { - // visible in this snapshot - return Some(key.obj_key.tag); - } - } - } -} diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 069f84e2ff..cf84df7906 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -8,7 +8,7 @@ use crate::page_cache; use crate::relish::*; use crate::restore_local_repo; use crate::waldecoder::*; -use crate::{PageServerConf, RepositoryFormat}; +use crate::PageServerConf; use anyhow::{Error, Result}; use lazy_static::lazy_static; use log::*; @@ -264,12 +264,6 @@ fn walreceiver_main( )?; if newest_segno - oldest_segno >= 10 { - // FIXME: The layered repository performs checkpointing in a separate thread, so this - // isn't needed anymore. Remove 'checkpoint' from the Timeline trait altogether? - if conf.repository_format == RepositoryFormat::RocksDb { - timeline.checkpoint()?; - } - // TODO: This is where we could remove WAL older than last_rec_lsn. //remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?; } diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py deleted file mode 100644 index 2e58b5096d..0000000000 --- a/test_runner/batch_others/test_gc.py +++ /dev/null @@ -1,105 +0,0 @@ -import pytest - -from contextlib import closing -from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver -import psycopg2.extras - -pytest_plugins = ("fixtures.zenith_fixtures") - -# -# Test Garbage Collection of old page versions. -# -# This test is pretty tightly coupled with the current implementation of page version storage -# and garbage collection in object_repository.rs. -# -@pytest.mark.skip(reason="""" - Current GC test is flaky and overly strict. Since we are migrating to the layered repo format - with different GC implementation let's just silence this test for now. This test only - works with the RocksDB implementation. -""") -def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): - zenith_cli.run(["branch", "test_gc", "empty"]) - pg = postgres.create_start('test_gc') - - with closing(pg.connect()) as conn: - with conn.cursor() as cur: - with closing(pageserver.connect()) as psconn: - with psconn.cursor(cursor_factory = psycopg2.extras.DictCursor) as pscur: - - # Get the timeline ID of our branch. We need it for the 'do_gc' command - cur.execute("SHOW zenith.zenith_timeline") - timeline = cur.fetchone()[0] - - # Create a test table - cur.execute("CREATE TABLE foo(x integer)") - - # Run GC, to clear out any old page versions left behind in the catalogs by - # the CREATE TABLE command. We want to have a clean slate with no garbage - # before running the actual tests below, otherwise the counts won't match - # what we expect. - print("Running GC before test") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") - row = pscur.fetchone() - print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) - # remember the number of relations - n_relations = row['n_relations'] - assert n_relations > 0 - - # Insert a row. The first insert will also create a metadata entry for the - # relation, with size == 1 block. Hence, bump up the expected relation count. - n_relations += 1 - print("Inserting one row and running GC") - cur.execute("INSERT INTO foo VALUES (1)") - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") - row = pscur.fetchone() - print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) - assert row['n_relations'] == n_relations - assert row['dropped'] == 0 - assert row['truncated'] == 31 - assert row['deleted'] == 4 - - # Insert two more rows and run GC. - print("Inserting two more rows and running GC") - cur.execute("INSERT INTO foo VALUES (2)") - cur.execute("INSERT INTO foo VALUES (3)") - - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") - row = pscur.fetchone() - print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) - assert row['n_relations'] == n_relations - assert row['dropped'] == 0 - assert row['truncated'] == 31 - assert row['deleted'] == 4 - - # Insert one more row. It creates one more page version, but doesn't affect the - # relation size. - print("Inserting one more row") - cur.execute("INSERT INTO foo VALUES (3)") - - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") - row = pscur.fetchone() - print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) - assert row['n_relations'] == n_relations - assert row['dropped'] == 0 - assert row['truncated'] == 31 - assert row['deleted'] == 2 - - # Run GC again, with no changes in the database. Should not remove anything. - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") - row = pscur.fetchone() - print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) - assert row['n_relations'] == n_relations - assert row['dropped'] == 0 - assert row['truncated'] == 31 - assert row['deleted'] == 0 - - # - # Test DROP TABLE checks that relation data and metadata was deleted by GC from object storage - # - cur.execute("DROP TABLE foo") - - pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") - row = pscur.fetchone() - print("GC duration {elapsed} ms, relations: {n_relations}, dropped {dropped}, truncated: {truncated}, deleted: {deleted}".format_map(row)) - # Each relation fork is counted separately, hence 3. - assert row['dropped'] == 3 diff --git a/zenith/src/main.rs b/zenith/src/main.rs index f39e1f480a..58160481d9 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -67,7 +67,7 @@ fn main() -> Result<()> { .long("repository-format") .takes_value(false) .value_name("repository-format") - .help("Choose repository format, 'layered' or 'rocksdb'") + .help("Choose repository format, only 'layered' supported currently") ), ) .subcommand(