From 2450f82de5eecb1bfb30185ef8f5b21d0bfd4be5 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 16 Aug 2021 10:06:48 +0300 Subject: [PATCH] Introduce a new "layered" repository implementation. This replaces the RocksDB based implementation with an approach using "snapshot files" on disk, and in-memory btreemaps to hold the recent changes. This make the repository implementation a configuration option. You can choose 'layered' or 'rocksdb' with "zenith init --repository-format=" The unit tests have been refactored to exercise both implementations. 'layered' is now the default. Push/pull is not implemented. The 'test_history_inmemory' test has been commented out accordingly. It's not clear how we will implement that functionality; probably by copying the snapshot files directly. --- Cargo.lock | 99 +- control_plane/src/local_env.rs | 6 + control_plane/src/storage.rs | 6 +- pageserver/Cargo.toml | 1 + pageserver/src/bin/pageserver.rs | 26 +- pageserver/src/branches.rs | 30 +- pageserver/src/layered_repository.rs | 1212 +++++++++++++++++ pageserver/src/layered_repository/README.md | 298 ++++ .../src/layered_repository/inmemory_layer.rs | 534 ++++++++ .../src/layered_repository/layer_map.rs | 132 ++ .../src/layered_repository/snapshot_layer.rs | 631 +++++++++ .../src/layered_repository/storage_layer.rs | 123 ++ pageserver/src/lib.rs | 9 + pageserver/src/page_cache.rs | 32 +- pageserver/src/page_service.rs | 49 + pageserver/src/relish.rs | 11 +- pageserver/src/repository.rs | 147 +- pageserver/src/restore_local_repo.rs | 15 +- pageserver/src/walreceiver.rs | 8 +- test_runner/batch_others/test_gc.py | 3 +- test_runner/batch_others/test_snapfiles_gc.py | 122 ++ vendor/postgres | 2 +- zenith/src/main.rs | 12 +- zenith_utils/src/zid.rs | 2 +- 24 files changed, 3435 insertions(+), 75 deletions(-) create mode 100644 pageserver/src/layered_repository.rs create mode 100644 pageserver/src/layered_repository/README.md create mode 100644 pageserver/src/layered_repository/inmemory_layer.rs create mode 100644 pageserver/src/layered_repository/layer_map.rs create mode 100644 pageserver/src/layered_repository/snapshot_layer.rs create mode 100644 pageserver/src/layered_repository/storage_layer.rs create mode 100644 test_runner/batch_others/test_snapfiles_gc.py diff --git a/Cargo.lock b/Cargo.lock index 67c00293b0..9bf6f0b7fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,7 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 - [[package]] name = "ahash" version = "0.4.7" @@ -82,6 +80,30 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "aversion" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41992ab8cfcc3026ef9abceffe0c2b0479c043183fc23825e30d22baab6df334" +dependencies = [ + "aversion-macros", + "byteorder", + "serde", + "serde_cbor", + "thiserror", +] + +[[package]] +name = "aversion-macros" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ba5785f953985aa0caca927ba4005880f3b4f53de87f134e810ae3549f744d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "aws-creds" version = "0.26.0" @@ -166,6 +188,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bookfile" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efa3e2086414e1bbecbc10730f265e5b079ab4ea0b830e7219a70dab6471e753" +dependencies = [ + "aversion", + "byteorder", + "serde", + "thiserror", +] + [[package]] name = "boxfnonce" version = "0.1.1" @@ -646,6 +680,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62aca2aba2d62b4a7f5b33f3712cb1b0692779a56fb510499d5c0aa594daeaf3" + [[package]] name = "hashbrown" version = "0.9.1" @@ -1139,6 +1179,7 @@ name = "pageserver" version = "0.1.0" dependencies = [ "anyhow", + "bookfile", "byteorder", "bytes", "chrono", @@ -1276,24 +1317,6 @@ dependencies = [ "tokio-postgres 0.7.1", ] -[[package]] -name = "postgres-protocol" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" -dependencies = [ - "base64 0.13.0", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand", - "sha2", - "stringprep", -] - [[package]] name = "postgres-protocol" version = "0.6.1" @@ -1313,14 +1336,21 @@ dependencies = [ ] [[package]] -name = "postgres-types" -version = "0.2.1" +name = "postgres-protocol" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +checksum = "ff3e0f70d32e20923cabf2df02913be7c1842d4c772db8065c00fcfdd1d1bff3" dependencies = [ + "base64 0.13.0", + "byteorder", "bytes", "fallible-iterator", - "postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", ] [[package]] @@ -1333,6 +1363,17 @@ dependencies = [ "postgres-protocol 0.6.1 (git+https://github.com/zenithdb/rust-postgres.git?rev=9eb0dbfbeb6a6c1b79099b9f7ae4a8c021877858)", ] +[[package]] +name = "postgres-types" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "430f4131e1b7657b0cd9a2b0c3408d77c9a43a042d300b8c77f981dffcc43a2f" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "postgres_ffi" version = "0.1.0" @@ -1735,6 +1776,16 @@ dependencies = [ "xml-rs", ] +[[package]] +name = "serde_cbor" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e18acfa2f90e8b735b2836ab8d538de304cbb6729a7360729ea5a895d15a622" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.126" diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 084285cb16..10e70c1485 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -42,6 +42,9 @@ pub struct LocalEnv { #[serde(with = "hex")] pub tenantid: ZTenantId, + // Repository format, 'rocksdb' or 'layered' or None for default + pub repository_format: Option, + // jwt auth token used for communication with pageserver pub auth_token: String, @@ -101,6 +104,7 @@ pub fn init( remote_pageserver: Option<&str>, tenantid: ZTenantId, auth_type: AuthType, + repository_format: Option<&str>, ) -> Result<()> { // check if config already exists let base_path = base_path(); @@ -176,6 +180,7 @@ pub fn init( base_data_dir: base_path, remotes: BTreeMap::default(), tenantid, + repository_format: repository_format.map(|x| x.into()), auth_token, auth_type, private_key_path, @@ -194,6 +199,7 @@ pub fn init( base_data_dir: base_path, remotes: BTreeMap::default(), tenantid, + repository_format: repository_format.map(|x| x.into()), auth_token, auth_type, private_key_path, diff --git a/control_plane/src/storage.rs b/control_plane/src/storage.rs index cc576b1c45..5da3334e4a 100644 --- a/control_plane/src/storage.rs +++ b/control_plane/src/storage.rs @@ -50,7 +50,7 @@ impl PageServerNode { .unwrap() } - pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool) -> Result<()> { + pub fn init(&self, create_tenant: Option<&str>, enable_auth: bool, repository_format: Option<&str>) -> Result<()> { let mut cmd = Command::new(self.env.pageserver_bin()?); let mut args = vec![ "--init", @@ -65,6 +65,10 @@ impl PageServerNode { args.extend(&["--auth-type", "ZenithJWT"]); } + if let Some(repo_format) = repository_format { + args.extend(&["--repository-format", repo_format]); + } + create_tenant.map(|tenantid| args.extend(&["--create-tenant", tenantid])); let status = cmd .args(args) diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index a58c35756b..539c9dccd0 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bookfile = "^0.3" chrono = "0.4.19" rand = "0.8.3" regex = "1.4.5" diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 914c8858ca..ecc004f7d1 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -20,14 +20,14 @@ use anyhow::{ensure, Result}; use clap::{App, Arg, ArgMatches}; use daemonize::Daemonize; -use pageserver::{branches, logger, page_cache, page_service, PageServerConf}; +use pageserver::{branches, logger, page_cache, page_service, PageServerConf, RepositoryFormat}; use zenith_utils::http_endpoint; const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:64000"; const DEFAULT_HTTP_ENDPOINT_ADDR: &str = "127.0.0.1:9898"; const DEFAULT_GC_HORIZON: u64 = 64 * 1024 * 1024; -const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(100); +const DEFAULT_GC_PERIOD: Duration = Duration::from_secs(10); const DEFAULT_SUPERUSER: &str = "zenith_admin"; @@ -41,6 +41,7 @@ struct CfgFileParams { pg_distrib_dir: Option, auth_validation_public_key_path: Option, auth_type: Option, + repository_format: Option, } impl CfgFileParams { @@ -58,6 +59,7 @@ impl CfgFileParams { pg_distrib_dir: get_arg("postgres-distrib"), auth_validation_public_key_path: get_arg("auth-validation-public-key-path"), auth_type: get_arg("auth-type"), + repository_format: get_arg("repository-format"), } } @@ -74,6 +76,7 @@ impl CfgFileParams { .auth_validation_public_key_path .or(other.auth_validation_public_key_path), auth_type: self.auth_type.or(other.auth_type), + repository_format: self.repository_format.or(other.repository_format), } } @@ -133,6 +136,16 @@ impl CfgFileParams { ); } + let repository_format = match self.repository_format.as_ref() { + Some(repo_format_str) if repo_format_str == "rocksdb" => RepositoryFormat::RocksDb, + Some(repo_format_str) if repo_format_str == "layered" => RepositoryFormat::Layered, + Some(repo_format_str) => anyhow::bail!( + "invalid --repository-format '{}', must be 'rocksdb' or 'layered'", + repo_format_str + ), + None => RepositoryFormat::Layered, // default + }; + Ok(PageServerConf { daemonize: false, @@ -148,8 +161,9 @@ impl CfgFileParams { pg_distrib_dir, auth_validation_public_key_path, - auth_type, + + repository_format, }) } } @@ -221,6 +235,12 @@ fn main() -> Result<()> { .takes_value(true) .help("Authentication scheme type. One of: Trust, MD5, ZenithJWT"), ) + .arg( + Arg::with_name("repository-format") + .long("repository-format") + .takes_value(true) + .help("Which repository implementation to use, 'rocksdb' or 'layered'"), + ) .get_matches(); let workdir = Path::new(arg_matches.value_of("workdir").unwrap_or(".zenith")); diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 2739312326..c62074cd04 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -24,7 +24,7 @@ use crate::object_repository::ObjectRepository; use crate::page_cache; use crate::restore_local_repo; use crate::walredo::WalRedoManager; -use crate::{repository::Repository, PageServerConf}; +use crate::{repository::Repository, PageServerConf, RepositoryFormat}; #[derive(Serialize, Deserialize, Clone)] pub struct BranchInfo { @@ -65,8 +65,8 @@ pub fn init_pageserver(conf: &'static PageServerConf, create_tenant: Option<&str pub fn create_repo( conf: &'static PageServerConf, tenantid: ZTenantId, - wal_redo_manager: Arc, -) -> Result { + wal_redo_manager: Arc, +) -> Result> { let repo_dir = conf.tenant_path(&tenantid); if repo_dir.exists() { bail!("repo for {} already exists", tenantid) @@ -96,19 +96,27 @@ pub fn create_repo( // and we failed to run initdb again in the same directory. This has been solved for the // rapid init+start case now, but the general race condition remains if you restart the // server quickly. - let storage = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?; + let repo: Arc = + match conf.repository_format { + RepositoryFormat::Layered => Arc::new( + crate::layered_repository::LayeredRepository::new(conf, wal_redo_manager, tenantid), + ), + RepositoryFormat::RocksDb => { + let obj_store = crate::rocksdb_storage::RocksObjectStore::create(conf, &tenantid)?; - let repo = crate::object_repository::ObjectRepository::new( - conf, - std::sync::Arc::new(storage), - wal_redo_manager, - tenantid, - ); + Arc::new(ObjectRepository::new( + conf, + Arc::new(obj_store), + wal_redo_manager, + tenantid, + )) + } + }; // Load data into pageserver // TODO To implement zenith import we need to // move data loading out of create_repo() - bootstrap_timeline(conf, tenantid, tli, &repo)?; + bootstrap_timeline(conf, tenantid, tli, &*repo)?; Ok(repo) } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs new file mode 100644 index 0000000000..495d2be6ab --- /dev/null +++ b/pageserver/src/layered_repository.rs @@ -0,0 +1,1212 @@ +//! +//! Zenith repository implementation that keeps old data in "snapshot files", and +//! the recent changes in memory. See layered_repository/snapshot_layer.rs and +//! layered_repository/inmemory_layer.rs, respectively. The functions here are +//! responsible for locating the correct layer for the get/put call, tracing +//! timeline branching history as needed. +//! +//! The snapshot files are stored in the .zenith/tenants//timelines/ +//! directory. See layered_repository/README for how the files are managed. +//! In addition to the snapshot files, there is a metadata file in the same +//! directory that contains information about the timeline, in particular its +//! parent timeline, and the last LSN that has been written to disk. +//! + +use anyhow::{bail, Context, Result}; +use bytes::Bytes; +use lazy_static::lazy_static; +use log::*; +use serde::{Deserialize, Serialize}; + +use std::collections::HashMap; +use std::collections::{BTreeSet, HashSet}; +use std::fs; +use std::fs::File; +use std::io::Write; +use std::ops::Bound::Included; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use crate::relish::*; +use crate::repository::{GcResult, History, Repository, Timeline, WALRecord}; +use crate::restore_local_repo::import_timeline_wal; +use crate::walredo::WalRedoManager; +use crate::PageServerConf; +use crate::{ZTenantId, ZTimelineId}; + +use zenith_metrics::{register_histogram_vec, HistogramVec}; +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::{AtomicLsn, Lsn}; +use zenith_utils::seqwait::SeqWait; + +mod inmemory_layer; +mod layer_map; +mod snapshot_layer; +mod storage_layer; + +use inmemory_layer::InMemoryLayer; +use layer_map::LayerMap; +use snapshot_layer::SnapshotLayer; +use storage_layer::Layer; + +// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. +static TIMEOUT: Duration = Duration::from_secs(60); + +// Perform a checkpoint in the GC thread, when the LSN has advanced this much since +// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if +// the page server is restarted. +// +// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB +// would be more appropriate. But a low value forces the code to be exercised more, +// which is good for now to trigger bugs. +static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024; + +// Metrics collected on operations on the storage repository. +lazy_static! { + static ref STORAGE_TIME: HistogramVec = register_histogram_vec!( + "pageserver_storage_time", + "Time spent on storage operations", + &["operation"] + ) + .expect("failed to define a metric"); +} + +/// +/// Repository consists of multiple timelines. Keep them in a hash table. +/// +pub struct LayeredRepository { + conf: &'static PageServerConf, + tenantid: ZTenantId, + timelines: Mutex>>, + + walredo_mgr: Arc, +} + +/// Public interface +impl Repository for LayeredRepository { + fn get_timeline(&self, timelineid: ZTimelineId) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + + Ok(self.get_timeline_locked(timelineid, &mut timelines)?) + } + + fn create_empty_timeline( + &self, + timelineid: ZTimelineId, + start_lsn: Lsn, + ) -> Result> { + let mut timelines = self.timelines.lock().unwrap(); + + // Create the timeline directory, and write initial metadata to file. + std::fs::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?; + + let metadata = TimelineMetadata { + last_valid_lsn: start_lsn, + last_record_lsn: start_lsn, + prev_record_lsn: Lsn(0), + ancestor_timeline: None, + ancestor_lsn: start_lsn, + }; + Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata)?; + + let timeline = LayeredTimeline::new( + self.conf, + metadata, + None, + timelineid, + self.tenantid, + self.walredo_mgr.clone(), + )?; + + let timeline_rc = Arc::new(timeline); + let r = timelines.insert(timelineid, timeline_rc.clone()); + assert!(r.is_none()); + Ok(timeline_rc) + } + + /// Branch a timeline + fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()> { + let src_timeline = self.get_timeline(src)?; + + // Create the metadata file, noting the ancestor of the new timeline. + // There is initially no data in it, but all the read-calls know to look + // into the ancestor. + let metadata = TimelineMetadata { + last_valid_lsn: start_lsn, + last_record_lsn: start_lsn, + prev_record_lsn: src_timeline.get_prev_record_lsn(), + ancestor_timeline: Some(src), + ancestor_lsn: start_lsn, + }; + std::fs::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?; + Self::save_metadata(self.conf, dst, self.tenantid, &metadata)?; + + info!("branched timeline {} from {} at {}", dst, src, start_lsn); + + Ok(()) + } + + /// Public entry point to GC. All the logic is in the private + /// gc_iteration_internal function, this public facade just wraps it for + /// metrics collection. + fn gc_iteration( + &self, + target_timelineid: Option, + horizon: u64, + compact: bool, + ) -> Result { + STORAGE_TIME + .with_label_values(&["gc"]) + .observe_closure_duration(|| self.gc_iteration_internal( + target_timelineid, + horizon, + compact, + )) + } +} + +/// Private functions +impl LayeredRepository { + // Implementation of the public `get_timeline` function. This differs from the public + // interface in that the caller must already hold the mutex on the 'timelines' hashmap. + fn get_timeline_locked( + &self, + timelineid: ZTimelineId, + timelines: &mut HashMap>, + ) -> Result> { + match timelines.get(&timelineid) { + Some(timeline) => Ok(timeline.clone()), + None => { + let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)?; + + // Recurse to look up the ancestor timeline. + // + // TODO: If you have a very deep timeline history, this could become + // expensive. Perhaps delay this until we need to look up a page in + // ancestor. + let ancestor = if let Some(ancestor_timelineid) = metadata.ancestor_timeline { + Some(self.get_timeline_locked(ancestor_timelineid, timelines)?) + } else { + None + }; + + let timeline = LayeredTimeline::new( + self.conf, + metadata, + ancestor, + timelineid, + self.tenantid, + self.walredo_mgr.clone(), + )?; + + // List the snapshot layers on disk, and load them into the layer map + timeline.load_layer_map()?; + + // Load any new WAL after the last checkpoint into memory. + info!( + "Loading WAL for timeline {} starting at {}", + timelineid, + timeline.get_last_record_lsn() + ); + let wal_dir = self + .conf + .timeline_path(&timelineid, &self.tenantid) + .join("wal"); + import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; + + let timeline_rc = Arc::new(timeline); + timelines.insert(timelineid, timeline_rc.clone()); + Ok(timeline_rc) + } + } + } + + pub fn new( + conf: &'static PageServerConf, + walredo_mgr: Arc, + tenantid: ZTenantId, + ) -> LayeredRepository { + LayeredRepository { + tenantid: tenantid, + conf: conf, + timelines: Mutex::new(HashMap::new()), + walredo_mgr, + } + } + + /// + /// Launch the checkpointer thread in given repository. + /// + pub fn launch_checkpointer_thread(conf: &'static PageServerConf, rc: Arc) { + let _thread = std::thread::Builder::new() + .name("Checkpointer thread".into()) + .spawn(move || { + // FIXME: relaunch it? Panic is not good. + rc.checkpoint_loop(conf).expect("Checkpointer thread died"); + }) + .unwrap(); + } + + /// + /// Checkpointer thread's main loop + /// + fn checkpoint_loop(&self, conf: &'static PageServerConf) -> Result<()> { + loop { + std::thread::sleep(conf.gc_period); + + info!("checkpointer thread for tenant {} waking up", self.tenantid); + + // checkpoint timelines that have accumulated more than CHECKPOINT_INTERVAL + // bytes of WAL since last checkpoint. + { + let timelines = self.timelines.lock().unwrap(); + for (_timelineid, timeline) in timelines.iter() { + let distance = u64::from(timeline.last_valid_lsn.load()) + - u64::from(timeline.last_checkpoint_lsn.load()); + if distance > CHECKPOINT_INTERVAL { + timeline.checkpoint()?; + } + } + // release lock on 'timelines' + } + + // Garbage collect old files that are not needed for PITR anymore + if conf.gc_horizon > 0 { + self.gc_iteration(None, conf.gc_horizon, false).unwrap(); + } + } + } + + /// Save timeline metadata to file + fn save_metadata( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + data: &TimelineMetadata, + ) -> Result<()> { + let path = conf.timeline_path(&timelineid, &tenantid).join("metadata"); + let mut file = File::create(&path)?; + + info!("saving metadata {}", path.display()); + + file.write_all(&TimelineMetadata::ser(data)?)?; + + Ok(()) + } + + fn load_metadata( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + ) -> Result { + let path = conf.timeline_path(&timelineid, &tenantid).join("metadata"); + let data = std::fs::read(&path)?; + + Ok(TimelineMetadata::des(&data)?) + } + + // + // How garbage collection works: + // + // +--bar-------------> + // / + // +----+-----foo----------------> + // / + // ----main--+--------------------------> + // \ + // +-----baz--------> + // + // + // 1. Grab a mutex to prevent new timelines from being created + // 2. Scan all timelines, and on each timeline, make note of the + // all the points where other timelines have been branched off. + // We will refrain from removing page versions at those LSNs. + // 3. For each timeline, scan all snapshot files on the timeline. + // Remove all files for which a newer file exists and which + // don't cover any branch point LSNs. + // + // TODO: + // - if a relation has been modified on a child branch, then we + // don't need to keep that in the parent anymore. But currently + // we do. + fn gc_iteration_internal( + &self, + target_timelineid: Option, + horizon: u64, + _compact: bool, + ) -> Result { + let mut totals: GcResult = Default::default(); + let now = Instant::now(); + + // grab mutex to prevent new timelines from being created here. + // TODO: We will hold it for a long time + let mut timelines = self.timelines.lock().unwrap(); + + // Scan all timelines. For each timeline, remember the timeline ID and + // the branch point where it was created. + // + // We scan the directory, not the in-memory hash table, because the hash + // table only contains entries for timelines that have been accessed. We + // need to take all timelines into account, not only the active ones. + let mut timelineids: Vec = Vec::new(); + let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); + let timelines_path = self.conf.timelines_path(&self.tenantid); + for direntry in fs::read_dir(timelines_path)? { + let direntry = direntry?; + if let Some(fname) = direntry.file_name().to_str() { + if let Ok(timelineid) = fname.parse::() { + timelineids.push(timelineid); + + // Read the metadata of this timeline to get its parent timeline. + // + // We read the ancestor information directly from the file, instead + // of calling get_timeline(). We don't want to load the timeline + // into memory just for GC. + // + // FIXME: we open the timeline in the loop below with + // get_timeline_locked() anyway, so maybe we should just do it + // here, too. + let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)?; + if let Some(ancestor_timeline) = metadata.ancestor_timeline { + all_branchpoints.insert((ancestor_timeline, metadata.ancestor_lsn)); + } + } + } + } + + // Ok, we now know all the branch points. Iterate through them. + for timelineid in timelineids { + // If a target timeline was specified, leave the other timelines alone. + // This is a bit inefficient, because we still collect the information for + // all the timelines above. + if let Some(x) = target_timelineid { + if x != timelineid { + continue; + } + } + + let branchpoints: Vec = all_branchpoints + .range(( + Included((timelineid, Lsn(0))), + Included((timelineid, Lsn(u64::MAX))), + )) + .map(|&x| x.1) + .collect(); + + let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?; + let last_lsn = timeline.get_last_valid_lsn(); + + if let Some(cutoff) = last_lsn.checked_sub(horizon) { + let result = timeline.gc_timeline(branchpoints, cutoff)?; + + totals += result; + } + } + + totals.elapsed = now.elapsed(); + Ok(totals) + } +} + +/// Metadata stored on disk for each timeline +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TimelineMetadata { + last_valid_lsn: Lsn, + last_record_lsn: Lsn, + prev_record_lsn: Lsn, + ancestor_timeline: Option, + ancestor_lsn: Lsn, +} + +pub struct LayeredTimeline { + conf: &'static PageServerConf, + + tenantid: ZTenantId, + timelineid: ZTimelineId, + + layers: Mutex, + + // WAL redo manager + walredo_mgr: Arc, + + // What page versions do we hold in the repository? If we get a + // request > last_valid_lsn, we need to wait until we receive all + // the WAL up to the request. The SeqWait provides functions for + // that. TODO: If we get a request for an old LSN, such that the + // versions have already been garbage collected away, we should + // throw an error, but we don't track that currently. + // + // last_record_lsn points to the end of last processed WAL record. + // It can lag behind last_valid_lsn, if the WAL receiver has + // received some WAL after the end of last record, but not the + // whole next record yet. In the page cache, we care about + // last_valid_lsn, but if the WAL receiver needs to restart the + // streaming, it needs to restart at the end of last record, so we + // track them separately. last_record_lsn should perhaps be in + // walreceiver.rs instead of here, but it seems convenient to keep + // all three values together. + // + // We also remember the starting point of the previous record in + // 'prev_record_lsn'. It's used to set the xl_prev pointer of the + // first WAL record when the node is started up. But here, we just + // keep track of it. FIXME: last_record_lsn and prev_record_lsn + // should be updated atomically together. + // + last_valid_lsn: SeqWait, + last_record_lsn: AtomicLsn, + prev_record_lsn: AtomicLsn, + + last_checkpoint_lsn: AtomicLsn, + + // Parent timeline that this timeline was branched from, and the LSN + // of the branch point. + ancestor_timeline: Option>, + ancestor_lsn: Lsn, +} + +/// Public interface functions +impl Timeline for LayeredTimeline { + /// Look up given page in the cache. + fn get_page_at_lsn(&self, rel: RelishTag, blknum: u32, lsn: Lsn) -> Result { + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } + let lsn = self.wait_lsn(lsn)?; + + if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? { + layer.get_page_at_lsn(&*self.walredo_mgr, blknum, lsn) + } else { + bail!("relish {} not found at {}", rel, lsn); + } + } + + fn get_page_at_lsn_nowait(&self, rel: RelishTag, blknum: u32, lsn: Lsn) -> Result { + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } + + if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? { + layer.get_page_at_lsn(&*self.walredo_mgr, blknum, lsn) + } else { + bail!("relish {} not found at {}", rel, lsn); + } + } + + fn get_relish_size(&self, rel: RelishTag, lsn: Lsn) -> Result> { + if !rel.is_blocky() { + bail!("invalid get_relish_size request for non-blocky relish {}", rel); + } + + let lsn = self.wait_lsn(lsn)?; + + if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? { + let result = layer.get_relish_size(lsn); + trace!( + "get_relish_size: rel {} at {}/{} -> {:?}", + rel, + self.timelineid, + lsn, + result + ); + result + } else { + Ok(None) + } + } + + fn get_rel_exists(&self, rel: RelishTag, lsn: Lsn) -> Result { + let lsn = self.wait_lsn(lsn)?; + + let result; + if let Some((layer, lsn)) = self.get_layer_for_read(rel, lsn)? { + result = layer.get_rel_exists(lsn)?; + } else { + result = false; + } + + trace!("get_rel_exists: {} at {} -> {}", rel, lsn, result); + Ok(result) + } + + fn list_rels(&self, spcnode: u32, dbnode: u32, lsn: Lsn) -> Result> { + trace!("list_rels called at {}", lsn); + + // List all rels in this timeline, and all its ancestors. + let mut all_rels = HashSet::new(); + let mut timeline = self; + loop { + let rels = timeline.layers.lock().unwrap().list_rels(spcnode, dbnode)?; + + all_rels.extend(rels.iter()); + + if let Some(ancestor) = timeline.ancestor_timeline.as_ref() { + timeline = ancestor; + continue; + } else { + break; + } + } + + // Now we have a list of all rels that appeared anywhere in the history. Filter + // out relations that were dropped. + // + // FIXME: We should pass the LSN argument to the calls above, and avoid scanning + // dropped relations in the first place. + let mut res: Result<()> = Ok(()); + all_rels.retain(|reltag| + match self.get_rel_exists(RelishTag::Relation(*reltag), lsn) { + Ok(exists) => { info!("retain: {} -> {}", *reltag, exists); exists }, + Err(err) => { res = Err(err); false } + } + ); + res?; + + Ok(all_rels) + } + + fn list_nonrels(&self, lsn: Lsn) -> Result> { + info!("list_nonrels called at {}", lsn); + + // List all nonrels in this timeline, and all its ancestors. + let mut all_rels = HashSet::new(); + let mut timeline = self; + loop { + let rels = timeline.layers.lock().unwrap().list_nonrels(lsn)?; + + all_rels.extend(rels.iter()); + + if let Some(ancestor) = timeline.ancestor_timeline.as_ref() { + timeline = ancestor; + continue; + } else { + break; + } + } + + // Now we have a list of all nonrels that appeared anywhere in the history. Filter + // out dropped ones. + // + // FIXME: We should pass the LSN argument to the calls above, and avoid scanning + // dropped relations in the first place. + let mut res: Result<()> = Ok(()); + all_rels.retain(|tag| + match self.get_rel_exists(*tag, lsn) { + Ok(exists) => { info!("retain: {} -> {}", *tag, exists); exists }, + Err(err) => { res = Err(err); false } + } + ); + res?; + + Ok(all_rels) + } + + fn history<'a>(&'a self) -> Result> { + // This is needed by the push/pull functionality. Not implemented yet. + todo!(); + } + + fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } + let layer = self.get_layer_for_write(rel, rec.lsn)?; + layer.put_wal_record(blknum, rec) + } + + fn put_truncation(&self, rel: RelishTag, lsn: Lsn, relsize: u32) -> anyhow::Result<()> { + if !rel.is_blocky() { + bail!("invalid truncation for non-blocky relish {}", rel); + } + + debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn); + + let layer = self.get_layer_for_write(rel, lsn)?; + layer.put_truncation(lsn, relsize) + } + + fn put_page_image( + &self, + rel: RelishTag, + blknum: u32, + lsn: Lsn, + img: Bytes, + _update_meta: bool, + ) -> Result<()> { + if !rel.is_blocky() && blknum != 0 { + bail!( + "invalid request for block {} for non-blocky relish {}", + blknum, + rel + ); + } + + let layer = self.get_layer_for_write(rel, lsn)?; + layer.put_page_image(blknum, lsn, img) + } + + fn put_unlink(&self, rel: RelishTag, lsn: Lsn) -> Result<()> { + trace!("put_unlink: {} at {}", rel, lsn); + + let layer = self.get_layer_for_write(rel, lsn)?; + layer.put_unlink(lsn) + } + + fn put_raw_data( + &self, + _tag: crate::object_key::ObjectTag, + _lsn: Lsn, + _data: &[u8], + ) -> Result<()> { + // FIXME: This doesn't make much sense for the layered storage format, + // it's pretty tightly coupled with the way the object store stores + // things. + bail!("put_raw_data not implemented"); + } + + /// Public entry point for checkpoint(). All the logic is in the private + /// checkpoint_internal function, this public facade just wraps it for + /// metrics collection. + fn checkpoint(&self) -> Result<()> { + STORAGE_TIME + .with_label_values(&["checkpoint"]) + .observe_closure_duration(|| self.checkpoint_internal()) + } + + /// Remember that WAL has been received and added to the page cache up to the given LSN + fn advance_last_valid_lsn(&self, lsn: Lsn) { + let old = self.last_valid_lsn.advance(lsn); + + // The last valid LSN cannot move backwards, but when WAL + // receiver is restarted after having only partially processed + // a record, it can call this with an lsn older than previous + // last valid LSN, when it restarts processing that record. + if lsn < old { + // Should never be called with an LSN older than the last + // record LSN, though. + let last_record_lsn = self.last_record_lsn.load(); + if lsn < last_record_lsn { + warn!( + "attempted to move last valid LSN backwards beyond last record LSN (last record {}, new {})", + last_record_lsn, lsn + ); + } + } + } + + fn init_valid_lsn(&self, lsn: Lsn) { + let old = self.last_valid_lsn.advance(lsn); + assert!(old == Lsn(0)); + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old == Lsn(0)); + self.prev_record_lsn.store(Lsn(0)); + } + + fn get_last_valid_lsn(&self) -> Lsn { + self.last_valid_lsn.load() + } + + /// + /// Remember the (end of) last valid WAL record remembered in the page cache. + /// + /// NOTE: this updates last_valid_lsn as well. + /// + fn advance_last_record_lsn(&self, lsn: Lsn) { + // Can't move backwards. + let old = self.last_record_lsn.fetch_max(lsn); + assert!(old <= lsn); + + // Use old value of last_record_lsn as prev_record_lsn + self.prev_record_lsn.fetch_max(old); + + // Also advance last_valid_lsn + let old = self.last_valid_lsn.advance(lsn); + // Can't move backwards. + if lsn < old { + warn!( + "attempted to move last record LSN backwards (was {}, new {})", + old, lsn + ); + } + } + + fn get_last_record_lsn(&self) -> Lsn { + self.last_record_lsn.load() + } + + fn get_prev_record_lsn(&self) -> Lsn { + self.prev_record_lsn.load() + } +} + +impl LayeredTimeline { + /// Open a Timeline handle. + /// + /// Loads the metadata for the timeline into memory, but not the layer map. + fn new( + conf: &'static PageServerConf, + metadata: TimelineMetadata, + ancestor: Option>, + timelineid: ZTimelineId, + tenantid: ZTenantId, + walredo_mgr: Arc, + ) -> Result { + let timeline = LayeredTimeline { + conf, + timelineid, + tenantid, + layers: Mutex::new(LayerMap::default()), + + walredo_mgr, + + last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), + last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), + prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0), + last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0), + + ancestor_timeline: ancestor, + ancestor_lsn: metadata.ancestor_lsn, + }; + Ok(timeline) + } + + /// + /// Load the list of snapshot files from disk, populating the layer map + /// + fn load_layer_map(&self) -> anyhow::Result<()> { + info!( + "loading layer map for timeline {} into memory", + self.timelineid + ); + let mut layers = self.layers.lock().unwrap(); + let snapfiles = + SnapshotLayer::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?; + + for layer_rc in snapfiles.iter() { + info!( + "found layer {} {}-{} {} on timeline {}", + layer_rc.get_relish_tag(), + layer_rc.get_start_lsn(), + layer_rc.get_end_lsn(), + layer_rc.is_dropped(), + self.timelineid + ); + layers.insert(Arc::clone(layer_rc)); + } + + Ok(()) + } + + /// + /// Get a handle to a Layer for reading. + /// + /// The returned SnapshotFile might be from an ancestor timeline, if the + /// relation hasn't been updated on this timeline yet. + /// + fn get_layer_for_read( + &self, + rel: RelishTag, + lsn: Lsn, + ) -> Result, Lsn)>> { + trace!( + "get_layer_for_read called for {} at {}/{}", + rel, + self.timelineid, + lsn + ); + + // If you requested a page at an older LSN, before the branch point, dig into + // the right ancestor timeline. This can only happen if you launch a read-only + // node with an old LSN, a primary always uses a recent LSN in its requests. + let mut timeline = self; + let mut lsn = lsn; + + while lsn < timeline.ancestor_lsn { + trace!("going into ancestor {} ", timeline.ancestor_lsn); + timeline = &timeline.ancestor_timeline.as_ref().unwrap(); + } + + // Now we have the right starting timeline for our search. + loop { + let layers = timeline.layers.lock().unwrap(); + // + // FIXME: If the relation has been dropped, does this return the right + // thing? The compute node should not normally request dropped relations, + // but if OID wraparound happens the same relfilenode might get reused + // for an unrelated relation. + // + + // Do we have a layer on this timeline? + if let Some(layer) = layers.get(rel, lsn) { + trace!( + "found layer in cache: {} {}-{}", + timeline.timelineid, + layer.get_start_lsn(), + layer.get_end_lsn() + ); + + assert!(layer.get_start_lsn() <= lsn); + + return Ok(Some((layer.clone(), lsn))); + } + + // If not, check if there's a layer on the ancestor timeline + if let Some(ancestor) = &timeline.ancestor_timeline { + lsn = timeline.ancestor_lsn; + timeline = &ancestor.as_ref(); + trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); + continue; + } + return Ok(None); + } + } + + /// + /// Get a handle to the latest layer for appending. + /// + fn get_layer_for_write(&self, rel: RelishTag, lsn: Lsn) -> Result> { + if lsn < self.last_valid_lsn.load() { + bail!("cannot modify relation after advancing last_valid_lsn"); + } + + // Look up the correct layer. + let layers = self.layers.lock().unwrap(); + if let Some(layer) = layers.get(rel, lsn) { + + // If it's writeable, good, return it. + if !layer.is_frozen() { + return Ok(Arc::clone(&layer)); + } + } + + // No (writeable) layer for this relation yet. Create one. + // + // Is this a completely new relation? Or the first modification after branching? + // + + // FIXME: race condition, if another thread creates the layer while + // we're busy looking up the previous one. We should hold the mutex throughout + // this operation, but for that we'll need a version of get_layer_for_read() + // that doesn't try to also grab the mutex. + drop(layers); + + let layer; + if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(rel, lsn)? { + // Create new entry after the previous one. + let lsn; + if prev_layer.get_timeline_id() != self.timelineid { + // First modification on this timeline + lsn = self.ancestor_lsn; + trace!( + "creating file for write for {} at branch point {}/{}", + rel, + self.timelineid, + lsn + ); + } else { + lsn = prev_layer.get_end_lsn(); + trace!( + "creating file for write for {} after previous layer {}/{}", + rel, + self.timelineid, + lsn + ); + } + trace!( + "prev layer is at {}/{} - {}", + prev_layer.get_timeline_id(), + prev_layer.get_start_lsn(), + prev_layer.get_end_lsn() + ); + layer = InMemoryLayer::copy_snapshot( + self.conf, + &*self.walredo_mgr, + &*prev_layer, + self.timelineid, + self.tenantid, + lsn, + )?; + } else { + // New relation. + trace!( + "creating layer for write for new rel {} at {}/{}", + rel, + self.timelineid, + lsn + ); + + layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, rel, lsn)?; + } + + let mut layers = self.layers.lock().unwrap(); + let layer_rc: Arc = Arc::new(layer); + layers.insert(Arc::clone(&layer_rc)); + + Ok(layer_rc) + } + + /// + /// Wait until WAL has been received up to the given LSN. + /// + fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. + if lsn == Lsn(0) { + let last_valid_lsn = self.last_valid_lsn.load(); + trace!( + "walreceiver doesn't work yet last_valid_lsn {}, requested {}", + last_valid_lsn, + lsn + ); + lsn = last_valid_lsn; + } + + self.last_valid_lsn + .wait_for_timeout(lsn, TIMEOUT) + .with_context(|| { + format!( + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn + ) + })?; + + Ok(lsn) + } + + /// + /// Flush to disk all data that was written with the put_* functions + /// + /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't + /// know anything about them here in the repository. + fn checkpoint_internal(&self) -> Result<()> { + let last_valid_lsn = self.last_valid_lsn.load(); + let last_record_lsn = self.last_record_lsn.load(); + let prev_record_lsn = self.prev_record_lsn.load(); + trace!( + "checkpointing timeline {} at {}", + self.timelineid, + last_valid_lsn + ); + + // Grab lock on the layer map. + // + // TODO: We hold it locked throughout the checkpoint operation. That's bad, + // the checkpointing could take many seconds, and any incoming get_page_at_lsn() + // requests will block. + let mut layers = self.layers.lock().unwrap(); + + // Walk through each in-memory, and write any dirty data to disk, + // as snapshot files. + // + // We currently write a new snapshot file for every relation + // that was modified, if there has been any changes at all. + // It would be smarter to only flush out in-memory layers that + // have accumulated a fair amount of changes. Note that the + // start and end LSNs of snapshot files belonging to different + // relations don't have to line up, although currently they do + // because of the way this works. So you could have a snapshot + // file covering LSN range 100-200 for one relation, and a + // snapshot file covering 150-250 for another relation. The + // read functions should even cope with snapshot files + // covering overlapping ranges for the same relation, although + // that situation never arises currently. + // + // Note: We aggressively freeze and unload all the layer + // structs. Even if a layer is actively being used. This + // keeps memory usage in check, but is probably too + // aggressive. Some kind of LRU policy would be appropriate. + // + + // It is not possible to modify a BTreeMap while you're iterating + // it. So we have to make a temporary copy, and iterate through that, + // while we modify the original. + let old_layers = layers.inner.clone(); + + // Call freeze() on any unfrozen layers (that is, layers that haven't + // been written to disk yet). + // Call unload() on all frozen layers, to release memory. + for layer in old_layers.values() { + if !layer.is_frozen() { + let new_layers = layer.freeze(last_valid_lsn, &*self.walredo_mgr)?; + + // replace this layer with the new layers that 'freeze' returned + layers.remove(&**layer); + for new_layer in new_layers { + trace!( + "freeze returned layer {} {}-{}", + new_layer.get_relish_tag(), + new_layer.get_start_lsn(), + new_layer.get_end_lsn() + ); + layers.insert(Arc::clone(&new_layer)); + } + } else { + layer.unload()?; + } + } + + // Also save the metadata, with updated last_valid_lsn and last_record_lsn, to a + // file in the timeline dir. The metadata reflects the last_valid_lsn as it was + // when we *started* the checkpoint, so that after crash, the WAL receiver knows + // to restart the streaming from that WAL position. + let ancestor_timelineid = if let Some(x) = &self.ancestor_timeline { + Some(x.timelineid) + } else { + None + }; + let metadata = TimelineMetadata { + last_valid_lsn: last_valid_lsn, + last_record_lsn: last_record_lsn, + prev_record_lsn: prev_record_lsn, + ancestor_timeline: ancestor_timelineid, + ancestor_lsn: self.ancestor_lsn, + }; + LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?; + + self.last_checkpoint_lsn.store(last_valid_lsn); + + Ok(()) + } + + /// + /// Garbage collect snapshot files on a timeline that are no longer needed. + /// + /// The caller specifies how much history is needed with the two arguments: + /// + /// retain_lsns: keep a version of each page at these LSNs + /// cutoff: also keep everything newer than this LSN + /// + /// The 'retain_lsns' list is currently used to prevent removing files that + /// are needed by child timelines. In the future, the user might be able to + /// name additional points in time to retain. The caller is responsible for + /// collecting that information. + /// + /// The 'cutoff' point is used to retain recent versions that might still be + /// needed by read-only nodes. (As of this writing, the caller just passes + /// the latest LSN subtracted by a constant, and doesn't do anything smart + /// to figure out what read-only nodes might actually need.) + /// + /// Currently, we don't make any attempt at removing unneeded page versions + /// within a snapshot file. We can only remove the whole file if it's fully + /// obsolete. + /// + pub fn gc_timeline(&self, retain_lsns: Vec, cutoff: Lsn) -> Result { + let now = Instant::now(); + let mut result: GcResult = Default::default(); + + // Scan all snapshot files in the directory. For each file, if a newer file + // exists, we can remove the old one. + self.checkpoint()?; + + let mut layers = self.layers.lock().unwrap(); + + info!( + "running GC on timeline {}, cutoff {}", + self.timelineid, cutoff + ); + + let mut layers_to_remove: Vec> = Vec::new(); + + // Determine for each file if it needs to be retained + 'outer: for ((rel, _lsn), l) in layers.inner.iter() { + if rel.is_relation() { + result.snapshot_relfiles_total += 1; + } else { + result.snapshot_nonrelfiles_total += 1; + } + + // Is it newer than cutoff point? + if l.get_end_lsn() > cutoff { + info!( + "keeping {} {}-{} because it's newer than cutoff {}", + rel, + l.get_start_lsn(), + l.get_end_lsn(), + cutoff + ); + if rel.is_relation() { + result.snapshot_relfiles_needed_by_cutoff += 1; + } else { + result.snapshot_nonrelfiles_needed_by_cutoff += 1; + } + continue 'outer; + } + + // Is it needed by a child branch? + for retain_lsn in &retain_lsns { + // FIXME: are the bounds inclusive or exclusive? + if l.get_start_lsn() <= *retain_lsn && *retain_lsn <= l.get_end_lsn() { + info!( + "keeping {} {}-{} because it's needed by branch point {}", + rel, + l.get_start_lsn(), + l.get_end_lsn(), + *retain_lsn + ); + if rel.is_relation() { + result.snapshot_relfiles_needed_by_branches += 1; + } else { + result.snapshot_nonrelfiles_needed_by_branches += 1; + } + continue 'outer; + } + } + + // Unless the relation was dropped, is there a later snapshot file for this relation? + if !l.is_dropped() && !layers.newer_layer_exists(l.get_relish_tag(), l.get_end_lsn()) { + if rel.is_relation() { + result.snapshot_relfiles_not_updated += 1; + } else { + result.snapshot_nonrelfiles_not_updated += 1; + } + continue 'outer; + } + + // We didn't find any reason to keep this file, so remove it. + info!( + "garbage collecting {} {}-{} {}", + l.get_relish_tag(), + l.get_start_lsn(), + l.get_end_lsn(), + l.is_dropped() + ); + layers_to_remove.push(Arc::clone(l)); + } + + // Actually delete the layers from disk and remove them from the map. + // (couldn't do this in the loop above, because you cannot modify a collection + // while iterating it. BTreeMap::retain() would be another option) + for doomed_layer in layers_to_remove { + doomed_layer.delete()?; + layers.remove(&*doomed_layer); + + if doomed_layer.is_dropped() { + if doomed_layer.get_relish_tag().is_relation() { + result.snapshot_relfiles_dropped += 1; + } else { + result.snapshot_nonrelfiles_dropped += 1; + } + } else { + if doomed_layer.get_relish_tag().is_relation() { + result.snapshot_relfiles_removed += 1; + } else { + result.snapshot_nonrelfiles_removed += 1; + } + } + } + + result.elapsed = now.elapsed(); + Ok(result) + } +} diff --git a/pageserver/src/layered_repository/README.md b/pageserver/src/layered_repository/README.md new file mode 100644 index 0000000000..db3d7feb79 --- /dev/null +++ b/pageserver/src/layered_repository/README.md @@ -0,0 +1,298 @@ +# Overview + +The on-disk format is based on immutable files. The page server +receives a stream of incoming WAL, parses the WAL records to determine +which pages they apply to, and accumulates the incoming changes in +memory. Every now and then, the accumulated changes are written out to +new files. + +The files are called "snapshot files". Each snapshot file corresponds +to one PostgreSQL relation fork. The snapshot files for each timeline +are stored in the timeline's subdirectory under +.zenith/tenants//timelines. + +The files are named like this: + + rel______ + +For example: + + rel_1663_13990_2609_0_000000000169C348_0000000001702000 + +Some non-relation files are also stored in repository. For example, +a CLOG segment would be named like this: + + pg_xact_0000_00000000198B06B0_00000000198C2550 + +There is no difference in how the relation and non-relation files are +managed, except that the first part of file names is different. +Internally, the relations and non-relation files that are managed in +the versioned store are together called "relishes". + +Each snapshot file contains a full snapshot, that is, full copy of all +pages in the relation, as of the "start LSN". It also contains all WAL +records applicable to the relation between the start and end +LSNs. With this information, the page server can reconstruct any page +version of the relation in the LSN range. + +If a file has been dropped, the last snapshot file for it is created +with the _DROPPED suffix, e.g. + + rel_1663_13990_2609_0_000000000169C348_0000000001702000_DROPPED + +In addition to the relations, with "rel_*" prefix, we use the same +format for storing various smaller files from the PostgreSQL data +directory. They will use different suffixes and the naming scheme +up to the LSN range varies. The Zenith source code uses the term +"relish" to mean "a relation, or other file that's treated like a +relation in the storage" + +## Notation used in this document + +The full path of a snapshot file looks like this: + + .zenith/tenants/941ddc8604413b88b3d208bddf90396c/timelines/4af489b06af8eed9e27a841775616962/rel_1663_13990_2609_0_000000000169C348_0000000001702000 + +For simplicity, the examples below use a simplified notation for the +paths. The tenant ID is left out, the timeline ID is replaced with +the human-readable branch name, and spcnode+dbnode+relnode+forkum with +a human-readable table name. The LSNs are also shorter. For example, a +snapshot file for 'orders' table on 'main' branch, with LSN range +100-200 would be: + + main/orders_100_200 + + +# Creating snapshot files + +Let's start with a simple example with a system that contains one +branch called 'main' and two tables, 'orders' and 'customers'. The end +of WAL is currently at LSN 250. In this starting situation, you would +have two files on disk: + + main/orders_100_200 + main/customers_100_200 + +In addition to those files, the recent changes between LSN 200 and the +end of WAL at 250 are kept in memory. If the page server crashes, the +latest records between 200-250 need to be re-read from the WAL. + +Whenever enough WAL has been accumulated in memory, the page server +writes out the changes in memory into new snapshot files. This process +is called "checkpointing" (not to be confused with the PostgreSQL +checkpoints, that's a different thing). The page server only creates +snapshot files for relations that have been modified since the last +checkpoint. For example, if the current end of WAL is at LSN 450, and +the last checkpoint happened at LSN 400 but there hasn't been any +recent changes to 'customers' table, you would have these files on +disk: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + main/customers_100_200 + +If the customers table is modified later, a new file is created for it +at the next checkpoint. The new file will cover the "gap" from the +last snapshot file, so the LSN ranges are always contiguous: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + main/customers_100_200 + main/customers_200_500 + +## Reading page versions + +Whenever a GetPage@LSN request comes in from the compute node, the +page server needs to reconstruct the requested page, as it was at the +requested LSN. To do that, the page server first checks the recent +in-memory layer; if the requested page version is found there, it can +be returned immediatedly without looking at the files on +disk. Otherwise the page server needs to locate the snapshot file that +contains the requested page version. + +For example, if a request comes in for table 'orders' at LSN 250, the +page server would load the 'main/orders_200_300' file into memory, and +reconstruct and return the requested page from it, as it was at +LSN 250. Because the snapshot file consists of a full image of the +relation at the start LSN and the WAL, reconstructing the page +involves replaying any WAL records applicable to the page between LSNs +200-250, starting from the base image at LSN 200. + +A request at a file boundary can be satisfied using either file. For +example, if there are two files on disk: + + main/orders_100_200 + main/orders_200_300 + +And a request comes with LSN 200, either file can be used for it. It +is better to use the later file, however, because it contains an +already materialized version of all the pages at LSN 200. Using the +first file, you would need to apply any WAL records between 100 and +200 to reconstruct the requested page. + +# Multiple branches + +Imagine that a child branch is created at LSN 250: + + @250 + ----main--+--------------------------> + \ + +---child--------------> + + +Then, the 'orders' table is updated differently on the 'main' and +'child' branches. You now have this situation on disk: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + main/customers_100_200 + child/orders_250_300 + child/orders_300_400 + +Because the 'customers' table hasn't been modified on the child +branch, there is no file for it there. If you request a page for it on +the 'child' branch, the page server will not find any snapshot file +for it in the 'child' directory, so it will recurse to look into the +parent 'main' branch instead. + +From the 'child' branch's point of view, the history for each relation +is linear, and the request's LSN identifies unambiguously which file +you need to look at. For example, the history for the 'orders' table +on the 'main' branch consists of these files: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + +And from the 'child' branch's point of view, it consists of these +files: + + main/orders_100_200 + main/orders_200_300 + child/orders_250_300 + child/orders_300_400 + +The branch metadata includes the point where the child branch was +created, LSN 250. If a page request comes with LSN 275, we read the +page version from the 'child/orders_250_300' file. If the request LSN +is 225, we read it from the 'main/orders_200_300' file instead. The +page versions between 250-300 in the 'main/orders_200_300' file are +ignored when operating on the child branch. + +Note: It doesn't make any difference if the child branch is created +when the end of the main branch was at LSN 250, or later when the tip of +the main branch had already moved on. The latter case, creating a +branch at a historic LSN, is how we support PITR in Zenith. + + +# Garbage collection + +In this scheme, we keep creating new snapshot files over time. We also +need a mechanism to remove old files that are no longer needed, +because disk space isn't infinite. + +What files are still needed? Currently, the page server supports PITR +and branching from any branch at any LSN that is "recent enough" from +the tip of the branch. "Recent enough" is defined as an LSN horizon, +which by default is 64 MB. (See DEFAULT_GC_HORIZON). For this +example, let's assume that the LSN horizon is 150 units. + +Let's look at the single branch scenario again. Imagine that the end +of the branch is LSN 525, so that the GC horizon is currently at +525-150 = 375 + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + main/orders_400_500 + main/customers_100_200 + +We can remove files 'main/orders_100_200' and 'main/orders_200_300', +because the end LSNs of those files are older than GC horizon 375, and +there are more recent snapshot files for the table. 'main/orders_300_400' +and 'main/orders_400_500' are still within the horizon, so they must be +retained. 'main/customers_100_200' is old enough, but it cannot be +removed because there is no newer snapshot file for the table. + +Things get slightly more complicated with multiple branches. All of +the above still holds, but in addition to recent files we must also +retain older shapshot files that are still needed by child branches. +For example, if child branch is created at LSN 150, and the 'customers' +table is updated on the branch, you would have these files: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + main/orders_400_500 + main/customers_100_200 + child/customers_150_300 + +In this situation, the 'main/orders_100_200' file cannot be removed, +even though it is older than the GC horizon, because it is still +needed by the child branch. 'main/orders_200_300' can still be +removed. So after garbage collection, these files would remain: + + main/orders_100_200 + + main/orders_300_400 + main/orders_400_500 + main/customers_100_200 + child/customers_150_300 + +If 'orders' is modified later on the 'child' branch, we will create a +snapshot file for it on the child: + + main/orders_100_200 + + main/orders_300_400 + main/orders_400_500 + main/customers_100_200 + child/customers_150_300 + child/orders_150_400 + +After this, the 'main/orders_100_200' file can be removed. It is no +longer needed by the child branch, because there is a newer snapshot +file there. TODO: This optimization hasn't been implemented! The GC +algorithm will currently keep the file on the 'main' branch anyway, for +as long as the child branch exists. + + +# TODO: On LSN ranges + +In principle, each relation can be checkpointed separately, i.e. the +LSN ranges of the files don't need to line up. So this would be legal: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + main/customers_150_250 + main/customers_250_500 + +However, the code currently always checkpoints all relations together. +So that situation doesn't arise in practice. + +It would also be OK to have overlapping LSN ranges for the same relation: + + main/orders_100_200 + main/orders_200_300 + main/orders_250_350 + main/orders_300_400 + +The code that reads the snapshot files should cope with this, but this +situation doesn't arise either, because the checkpointing code never +does that. It could be useful, however, as a transient state when +garbage collecting around branch points, or explicit recovery +points. For example, if we start with this: + + main/orders_100_200 + main/orders_200_300 + main/orders_300_400 + +And there is a branch or explicit recovery point at LSN 150, we could +replace 'main/orders_100_200' with 'main/orders_150_150' to keep a +snapshot only at that exact point that's still needed, removing the +other page versions around it. But such compaction has not been +implemented yet. diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs new file mode 100644 index 0000000000..f9ac9178bb --- /dev/null +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -0,0 +1,534 @@ +//! +//! An in-memory layer stores recently received page versions in memory. The page versions +//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation. +//! + +use crate::layered_repository::storage_layer::Layer; +use crate::layered_repository::storage_layer::PageVersion; +use crate::layered_repository::SnapshotLayer; +use crate::relish::*; +use crate::repository::WALRecord; +use crate::walredo::WalRedoManager; +use crate::PageServerConf; +use crate::{ZTenantId, ZTimelineId}; +use anyhow::{bail, Result}; +use bytes::Bytes; +use log::*; +use std::collections::BTreeMap; +use std::ops::Bound::Included; +use std::sync::{Arc, Mutex}; + +use zenith_utils::lsn::Lsn; + +static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); + +pub struct InMemoryLayer { + conf: &'static PageServerConf, + tenantid: ZTenantId, + timelineid: ZTimelineId, + rel: RelishTag, + + /// + /// This layer contains all the changes from 'start_lsn'. The + /// start is inclusive. There is no end LSN; we only use in-memory + /// layer at the end of a timeline. + /// + start_lsn: Lsn, + + /// The above fields never change. The parts that do change are in 'inner', + /// and protected by mutex. + inner: Mutex, +} + +pub struct InMemoryLayerInner { + /// If this relation was dropped, remember when that happened. + drop_lsn: Option, + + /// + /// All versions of all pages in the layer are are kept here. + /// Indexed by block number and LSN. + /// + page_versions: BTreeMap<(u32, Lsn), PageVersion>, + + /// + /// `relsizes` tracks the size of the relation at different points in time. + /// + relsizes: BTreeMap, +} + +impl Layer for InMemoryLayer { + fn is_frozen(&self) -> bool { + return false; + } + + fn get_timeline_id(&self) -> ZTimelineId { + return self.timelineid; + } + + fn get_relish_tag(&self) -> RelishTag { + return self.rel; + } + + fn get_start_lsn(&self) -> Lsn { + return self.start_lsn; + } + + fn get_end_lsn(&self) -> Lsn { + return Lsn(u64::MAX); + } + + fn is_dropped(&self) -> bool { + let inner = self.inner.lock().unwrap(); + inner.drop_lsn.is_some() + } + + /// Look up given page in the cache. + fn get_page_at_lsn( + &self, + walredo_mgr: &dyn WalRedoManager, + blknum: u32, + lsn: Lsn, + ) -> Result { + // Scan the BTreeMap backwards, starting from the given entry. + let mut records: Vec = Vec::new(); + let mut page_img: Option = None; + let mut need_base_image_lsn: Option = Some(lsn); + + { + let inner = self.inner.lock().unwrap(); + let minkey = (blknum, Lsn(0)); + let maxkey = (blknum, lsn); + let mut iter = inner + .page_versions + .range((Included(&minkey), Included(&maxkey))); + while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { + if let Some(img) = &entry.page_image { + page_img = Some(img.clone()); + need_base_image_lsn = None; + break; + } else if let Some(rec) = &entry.record { + records.push(rec.clone()); + if rec.will_init { + // This WAL record initializes the page, so no need to go further back + need_base_image_lsn = None; + break; + } else { + need_base_image_lsn = Some(*entry_lsn); + } + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); + } + } + + // release lock on 'page_versions' + } + records.reverse(); + + // If we needed a base image to apply the WAL records against, we should have found it in memory. + if let Some(lsn) = need_base_image_lsn { + if records.is_empty() { + // no records, and no base image. This can happen if PostgreSQL extends a relation + // but never writes the page. + // + // Would be nice to detect that situation better. + warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn); + return Ok(ZERO_PAGE.clone()); + } + bail!( + "No base image found for page {} blk {} at {}/{}", + self.rel, + blknum, + self.timelineid, + lsn + ); + } + + // If we have a page image, and no WAL, we're all set + if records.is_empty() { + if let Some(img) = page_img { + trace!( + "found page image for blk {} in {} at {}/{}, no WAL redo required", + blknum, + self.rel, + self.timelineid, + lsn + ); + Ok(img) + } else { + // FIXME: this ought to be an error? + warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn); + Ok(ZERO_PAGE.clone()) + } + } else { + // We need to do WAL redo. + // + // If we don't have a base image, then the oldest WAL record better initialize + // the page + if page_img.is_none() && !records.first().unwrap().will_init { + // FIXME: this ought to be an error? + warn!( + "Base image for page {}/{} at {} not found, but got {} WAL records", + self.rel, + blknum, + lsn, + records.len() + ); + Ok(ZERO_PAGE.clone()) + } else { + if page_img.is_some() { + trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn); + } else { + trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn); + } + let img = walredo_mgr.request_redo(self.rel, blknum, lsn, page_img, records)?; + + self.put_page_image(blknum, lsn, img.clone())?; + + Ok(img) + } + } + } + + /// Get size of the relation at given LSN + fn get_relish_size(&self, lsn: Lsn) -> Result> { + // Scan the BTreeMap backwards, starting from the given entry. + let inner = self.inner.lock().unwrap(); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); + + if let Some((_entry_lsn, entry)) = iter.next_back() { + let result = *entry; + drop(inner); + trace!("get_relish_size: {} at {} -> {}", self.rel, lsn, result); + Ok(Some(result)) + } else { + Ok(None) + } + } + + /// Does this relation exist at given LSN? + fn get_rel_exists(&self, lsn: Lsn) -> Result { + let inner = self.inner.lock().unwrap(); + + // Is the requested LSN after the rel was dropped? + if let Some(drop_lsn) = inner.drop_lsn { + if lsn >= drop_lsn { + return Ok(false); + } + } + + // Otherwise, it exists + Ok(true) + } + + // Write operations + + /// Common subroutine of the public put_wal_record() and put_page_image() functions. + /// Adds the page version to the in-memory tree + fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> { + trace!( + "put_page_version blk {} of {} at {}/{}", + blknum, + self.rel, + self.timelineid, + lsn + ); + let mut inner = self.inner.lock().unwrap(); + + let old = inner.page_versions.insert((blknum, lsn), pv); + + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!( + "Page version of rel {:?} blk {} at {} already exists", + self.rel, blknum, lsn + ); + } + + // Also update the relation size, if this extended the relation. + if self.rel.is_blocky() { + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); + + let oldsize; + if let Some((_entry_lsn, entry)) = iter.next_back() { + oldsize = *entry; + } else { + oldsize = 0; + //bail!("No old size found for {} at {}", self.tag, lsn); + } + if blknum >= oldsize { + trace!( + "enlarging relation {} from {} to {} blocks at {}", + self.rel, + oldsize, + blknum + 1, + lsn + ); + inner.relsizes.insert(lsn, blknum + 1); + } + } + + Ok(()) + } + + /// Remember that the relation was truncated at given LSN + fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()> { + let mut inner = self.inner.lock().unwrap(); + let old = inner.relsizes.insert(lsn, relsize); + + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!("Inserting truncation, but had an entry for the LSN already"); + } + + Ok(()) + } + + /// Remember that the relation was dropped at given LSN + fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()> { + let mut inner = self.inner.lock().unwrap(); + + assert!(inner.drop_lsn.is_none()); + inner.drop_lsn = Some(lsn); + + info!("dropped relation {} at {}", self.rel, lsn); + + Ok(()) + } + + /// + /// Write the this in-memory layer to disk, as a snapshot layer. + /// + /// The cutoff point for the layer that's written to disk is 'end_lsn'. + /// + /// Returns new layers that replace this one. Always returns a + /// SnapshotLayer containing the page versions that were written to disk, + /// but if there were page versions newer than 'end_lsn', also return a new + /// in-memory layer containing those page versions. The caller replaces + /// this layer with the returned layers in the layer map. + /// + fn freeze( + &self, + cutoff_lsn: Lsn, + walredo_mgr: &dyn WalRedoManager, + ) -> Result>> { + info!( + "freezing in memory layer for {} on timeline {} at {}", + self.rel, self.timelineid, cutoff_lsn + ); + + let inner = self.inner.lock().unwrap(); + + // Normally, use the cutoff LSN as the end of the frozen layer. + // But if the relation was dropped, we know that there are no + // more changes coming in for it, and in particular we know that + // there are no changes "in flight" for the LSN anymore, so we use + // the drop LSN instead. The drop-LSN could be ahead of the + // caller-specified LSN! + let dropped = inner.drop_lsn.is_some(); + let end_lsn = + if dropped { + inner.drop_lsn.unwrap() + } else { + cutoff_lsn + }; + + // Divide all the page versions into old and new at the 'end_lsn' cutoff point. + let mut before_page_versions; + let mut before_relsizes; + let mut after_page_versions; + let mut after_relsizes; + if !dropped { + before_relsizes = BTreeMap::new(); + after_relsizes = BTreeMap::new(); + for (lsn, size) in inner.relsizes.iter() { + if *lsn > end_lsn { + after_relsizes.insert(*lsn, *size); + } else { + before_relsizes.insert(*lsn, *size); + } + } + + before_page_versions = BTreeMap::new(); + after_page_versions = BTreeMap::new(); + for ((blknum, lsn), pv) in inner.page_versions.iter() { + if *lsn > end_lsn { + after_page_versions.insert((*blknum, *lsn), pv.clone()); + } else { + before_page_versions.insert((*blknum, *lsn), pv.clone()); + } + } + } else { + before_page_versions = inner.page_versions.clone(); + before_relsizes = inner.relsizes.clone(); + after_relsizes = BTreeMap::new(); + after_page_versions = BTreeMap::new(); + } + + // we can release the lock now. + drop(inner); + + // Write the page versions before the cutoff to disk. + let snapfile = SnapshotLayer::create( + self.conf, + self.timelineid, + self.tenantid, + self.rel, + self.start_lsn, + end_lsn, + dropped, + before_page_versions, + before_relsizes, + )?; + let mut result: Vec> = Vec::new(); + + // If there were any page versions after the cutoff, initialize a new in-memory layer + // to hold them + if !after_relsizes.is_empty() || !after_page_versions.is_empty() { + info!("created new in-mem layer for {} {}-", self.rel, end_lsn); + + let new_layer = Self::copy_snapshot( + self.conf, + walredo_mgr, + &snapfile, + self.timelineid, + self.tenantid, + end_lsn, + )?; + let mut new_inner = new_layer.inner.lock().unwrap(); + new_inner.page_versions.append(&mut after_page_versions); + new_inner.relsizes.append(&mut after_relsizes); + drop(new_inner); + + result.push(Arc::new(new_layer)); + } + result.push(Arc::new(snapfile)); + + Ok(result) + } + + fn delete(&self) -> Result<()> { + // Nothing to do. When the reference is dropped, the memory is released. + Ok(()) + } + + fn unload(&self) -> Result<()> { + // cannot unload in-memory layer. Freeze instead + Ok(()) + } +} + +impl InMemoryLayer { + /// + /// Create a new, empty, in-memory layer + /// + pub fn create( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + rel: RelishTag, + start_lsn: Lsn, + ) -> Result { + trace!( + "initializing new empty InMemoryLayer for writing {} on timeline {} at {}", + rel, + timelineid, + start_lsn + ); + + Ok(InMemoryLayer { + conf, + timelineid, + tenantid, + rel, + start_lsn, + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: None, + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), + }) + } + + /// + /// Initialize a new InMemoryLayer for, by copying the state at the given + /// point in time from given existing layer. + /// + pub fn copy_snapshot( + conf: &'static PageServerConf, + walredo_mgr: &dyn WalRedoManager, + src: &dyn Layer, + timelineid: ZTimelineId, + tenantid: ZTenantId, + lsn: Lsn, + ) -> Result { + trace!( + "initializing new InMemoryLayer for writing {} on timeline {} at {}", + src.get_relish_tag(), + timelineid, + lsn + ); + let mut page_versions = BTreeMap::new(); + let mut relsizes = BTreeMap::new(); + + let size; + if src.get_relish_tag().is_blocky() { + if let Some(sz) = src.get_relish_size(lsn)? { + relsizes.insert(lsn, sz); + size = sz; + } else { + bail!("no size found or {} at {}", src.get_relish_tag(), lsn); + } + } else { + size = 1; + } + + for blknum in 0..size { + let img = src.get_page_at_lsn(walredo_mgr, blknum, lsn)?; + let pv = PageVersion { + page_image: Some(img), + record: None, + }; + page_versions.insert((blknum, lsn), pv); + } + + Ok(InMemoryLayer { + conf, + timelineid, + tenantid, + rel: src.get_relish_tag(), + start_lsn: lsn, + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: None, + page_versions: page_versions, + relsizes: relsizes, + }), + }) + } + + /// debugging function to print out the contents of the layer + #[allow(unused)] + pub fn dump(&self) -> String { + let mut result = format!( + "----- inmemory layer for {} {}-> ----\n", + self.rel, self.start_lsn + ); + + let inner = self.inner.lock().unwrap(); + + for (k, v) in inner.relsizes.iter() { + result += &format!("{}: {}\n", k, v); + } + for (k, v) in inner.page_versions.iter() { + result += &format!( + "blk {} at {}: {}/{}\n", + k.0, + k.1, + v.page_image.is_some(), + v.record.is_some() + ); + } + + result + } +} diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs new file mode 100644 index 0000000000..f0a91bd08b --- /dev/null +++ b/pageserver/src/layered_repository/layer_map.rs @@ -0,0 +1,132 @@ +//! +//! The layer map tracks what layers exist for all the relations in a timeline. +//! +//! When the timeline is first accessed, the server lists of all snapshot files +//! in the timelines/ directory, and populates this map with +//! SnapshotLayers corresponding to each file. When new WAL is received, +//! we create InMemoryLayers to hold the incoming records. Now and then, +//! in the checkpoint() function, the in-memory layers are frozen, forming +//! new snapshot layers and corresponding files are written to disk. +//! + +use crate::layered_repository::storage_layer::Layer; +use crate::relish::*; +use anyhow::Result; +use log::*; +use std::collections::BTreeMap; +use std::collections::HashSet; +use std::ops::Bound::Included; +use std::sync::Arc; +use zenith_utils::lsn::Lsn; + +/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN. +/// It provides a couple of convenience functions over a plain BTreeMap +pub struct LayerMap { + pub inner: BTreeMap<(RelishTag, Lsn), Arc>, +} + +impl LayerMap { + /// + /// Look up using the given rel tag and LSN. This differs from a plain + /// key-value lookup in that if there is any layer that covers the + /// given LSN, or precedes the given LSN, it is returned. In other words, + /// you don't need to know the exact start LSN of the layer. + /// + pub fn get(&self, tag: RelishTag, lsn: Lsn) -> Option> { + let startkey = (tag, Lsn(0)); + let endkey = (tag, lsn); + + if let Some((_k, v)) = self + .inner + .range((Included(startkey), Included(endkey))) + .next_back() + { + Some(Arc::clone(v)) + } else { + None + } + } + + pub fn insert(&mut self, layer: Arc) { + let rel = layer.get_relish_tag(); + let start_lsn = layer.get_start_lsn(); + + self.inner.insert((rel, start_lsn), Arc::clone(&layer)); + } + + pub fn remove(&mut self, layer: &dyn Layer) { + let rel = layer.get_relish_tag(); + let start_lsn = layer.get_start_lsn(); + + self.inner.remove(&(rel, start_lsn)); + } + + pub fn list_rels(&self, spcnode: u32, dbnode: u32) -> Result> { + let mut rels: HashSet = HashSet::new(); + + // Scan the timeline directory to get all rels in this timeline. + for ((rel, _lsn), _l) in self.inner.iter() { + if let RelishTag::Relation(reltag) = rel { + // FIXME: skip if it was dropped before the requested LSN. But there is no + // LSN argument + + if (spcnode == 0 || reltag.spcnode == spcnode) + && (dbnode == 0 || reltag.dbnode == dbnode) + { + rels.insert(*reltag); + } + } + } + Ok(rels) + } + + pub fn list_nonrels(&self, _lsn: Lsn) -> Result> { + let mut rels: HashSet = HashSet::new(); + + // Scan the timeline directory to get all rels in this timeline. + for ((rel, _lsn), _l) in self.inner.iter() { + // FIXME: skip if it was dropped before the requested LSN. + + if let RelishTag::Relation(_) = rel { + } else { + rels.insert(*rel); + } + } + Ok(rels) + } + + /// Is there a newer layer for given relation? + pub fn newer_layer_exists(&self, rel: RelishTag, lsn: Lsn) -> bool { + let startkey = (rel, lsn); + let endkey = (rel, Lsn(u64::MAX)); + + for ((_rel, newer_lsn), layer) in self.inner.range((Included(startkey), Included(endkey))) { + if layer.get_end_lsn() > lsn { + trace!( + "found later layer for rel {}, {} {}-{}", + rel, + lsn, + newer_lsn, + layer.get_end_lsn() + ); + return true; + } else { + trace!( + "found singleton layer for rel {}, {} {}", + rel, lsn, newer_lsn + ); + continue; + } + } + trace!("no later layer found for rel {}, {}", rel, lsn); + false + } +} + +impl Default for LayerMap { + fn default() -> Self { + LayerMap { + inner: BTreeMap::new(), + } + } +} diff --git a/pageserver/src/layered_repository/snapshot_layer.rs b/pageserver/src/layered_repository/snapshot_layer.rs new file mode 100644 index 0000000000..e2936ffd31 --- /dev/null +++ b/pageserver/src/layered_repository/snapshot_layer.rs @@ -0,0 +1,631 @@ +//! +//! A SnapshotLayer represents one snapshot file on disk. One file holds all page +//! version and size information of one relation, in a range of LSN. +//! The name "snapshot file" is a bit of a misnomer because a snapshot file doesn't +//! contain a snapshot at a specific LSN, but rather all the page versions in a range +//! of LSNs. +//! +//! Currently, a snapshot file contains full information needed to reconstruct any +//! page version in the LSN range, without consulting any other snapshot files. When +//! a new snapshot file is created for writing, the full contents of relation are +//! materialized as it is at the beginning of the LSN range. That can be very expensive, +//! we should find a way to store differential files. But this keeps the read-side +//! of things simple. You can find the correct snapshot file based on RelishTag and +//! timeline+LSN, and once you've located it, you have all the data you need to in that +//! file. +//! +//! When a snapshot file needs to be accessed, we slurp the whole file into memory, into +//! the SnapshotLayer struct. See load() and unload() functions. +//! +//! On disk, the snapshot files are stored in timelines/ directory. +//! Currently, there are no subdirectories, and each snapshot file is named like this: +//! +//! _____ +//! +//! For example: +//! +//! 1663_13990_2609_0_000000000169C348_000000000169C349 +//! +//! If a relation is dropped, we add a '_DROPPED' to the end of the filename to indicate that. +//! So the above example would become: +//! +//! 1663_13990_2609_0_000000000169C348_000000000169C349_DROPPED +//! +//! The end LSN indicates when it was dropped in that case, we don't store it in the +//! file contents in any way. +//! +//! A snapshot file is constructed using the 'bookfile' crate. Each file consists of two +//! parts: the page versions and the relation sizes. They are stored as separate chapters. +//! +use crate::layered_repository::storage_layer::Layer; +use crate::layered_repository::storage_layer::PageVersion; +use crate::layered_repository::storage_layer::ZERO_PAGE; +use crate::relish::*; +use crate::repository::WALRecord; +use crate::walredo::WalRedoManager; +use crate::PageServerConf; +use crate::{ZTenantId, ZTimelineId}; +use anyhow::{bail, Result}; +use bytes::Bytes; +use log::*; +use std::collections::BTreeMap; +use std::fmt; +use std::fs; +use std::fs::File; +use std::io::Write; +use std::ops::Bound::Included; +use std::path::PathBuf; +use std::sync::{Arc, Mutex, MutexGuard}; + +use bookfile::{Book, BookWriter}; + +use zenith_utils::bin_ser::BeSer; +use zenith_utils::lsn::Lsn; + +// Magic constant to identify a Zenith snapshot file +static SNAPSHOT_FILE_MAGIC: u32 = 0x5A616E01; + +static PAGE_VERSIONS_CHAPTER: u64 = 1; +static REL_SIZES_CHAPTER: u64 = 2; + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] +struct SnapshotFileName { + rel: RelishTag, + start_lsn: Lsn, + end_lsn: Lsn, + dropped: bool, +} + +impl SnapshotFileName { + fn from_str(fname: &str) -> Option { + // Split the filename into parts + // + // _____ + // + // or if it was dropped: + // + // ______DROPPED + // + let rel; + let mut parts; + if let Some(rest) = fname.strip_prefix("rel_") { + parts = rest.split('_'); + rel = RelishTag::Relation(RelTag { + spcnode: parts.next()?.parse::().ok()?, + dbnode: parts.next()?.parse::().ok()?, + relnode: parts.next()?.parse::().ok()?, + forknum: parts.next()?.parse::().ok()?, + }); + } else if let Some(rest) = fname.strip_prefix("pg_xact_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::Clog, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_multixact_members_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_multixact_offsets_") { + parts = rest.split('_'); + rel = RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno: u32::from_str_radix(parts.next()?, 16).ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_filenodemap_") { + parts = rest.split('_'); + rel = RelishTag::FileNodeMap { + spcnode: parts.next()?.parse::().ok()?, + dbnode: parts.next()?.parse::().ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_twophase_") { + parts = rest.split('_'); + rel = RelishTag::TwoPhase { + xid: parts.next()?.parse::().ok()?, + }; + } else if let Some(rest) = fname.strip_prefix("pg_control_checkpoint_") { + parts = rest.split('_'); + rel = RelishTag::Checkpoint; + } else if let Some(rest) = fname.strip_prefix("pg_control_") { + parts = rest.split('_'); + rel = RelishTag::ControlFile; + } else { + return None; + } + + let start_lsn = Lsn::from_hex(parts.next()?).ok()?; + let end_lsn = Lsn::from_hex(parts.next()?).ok()?; + + let mut dropped = false; + if let Some(suffix) = parts.next() { + if suffix == "DROPPED" { + dropped = true; + } else { + warn!("unrecognized filename in timeline dir: {}", fname); + return None; + } + } + if parts.next().is_some() { + warn!("unrecognized filename in timeline dir: {}", fname); + return None; + } + + Some(SnapshotFileName { + rel, + start_lsn, + end_lsn, + dropped, + }) + } + + fn to_string(&self) -> String { + let basename = match self.rel { + RelishTag::Relation(reltag) => format!( + "rel_{}_{}_{}_{}", + reltag.spcnode, reltag.dbnode, reltag.relnode, reltag.forknum + ), + RelishTag::Slru { + slru: SlruKind::Clog, + segno, + } => format!("pg_xact_{:04X}", segno), + RelishTag::Slru { + slru: SlruKind::MultiXactMembers, + segno, + } => format!("pg_multixact_members_{:04X}", segno), + RelishTag::Slru { + slru: SlruKind::MultiXactOffsets, + segno, + } => format!("pg_multixact_offsets_{:04X}", segno), + RelishTag::FileNodeMap { spcnode, dbnode } => { + format!("pg_filenodemap_{}_{}", spcnode, dbnode) + } + RelishTag::TwoPhase { xid } => format!("pg_twophase_{}", xid), + RelishTag::Checkpoint => format!("pg_control_checkpoint"), + RelishTag::ControlFile => format!("pg_control"), + }; + + format!( + "{}_{:016X}_{:016X}{}", + basename, + u64::from(self.start_lsn), + u64::from(self.end_lsn), + if self.dropped { "_DROPPED" } else { "" } + ) + } +} + +impl fmt::Display for SnapshotFileName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_string()) + } +} + +/// +/// SnapshotLayer is the in-memory data structure associated with an +/// on-disk snapshot file. We keep a SnapshotLayer in memory for each +/// file, in the LayerMap. If a layer is in "loaded" state, we have a +/// copy of the file in memory, in 'inner'. Otherwise the struct is +/// just a placeholder for a file that exists on disk, and it needs to +/// be loaded before using it in queries. +/// +pub struct SnapshotLayer { + conf: &'static PageServerConf, + pub tenantid: ZTenantId, + pub timelineid: ZTimelineId, + pub rel: RelishTag, + + // + // This entry contains all the changes from 'start_lsn' to 'end_lsn'. The + // start is inclusive, and end is exclusive. + pub start_lsn: Lsn, + pub end_lsn: Lsn, + + dropped: bool, + + inner: Mutex, +} + +pub struct SnapshotLayerInner { + /// If false, the 'page_versions' and 'relsizes' have not been + /// loaded into memory yet. + loaded: bool, + + /// All versions of all pages in the file are are kept here. + /// Indexed by block number and LSN. + page_versions: BTreeMap<(u32, Lsn), PageVersion>, + + /// `relsizes` tracks the size of the relation at different points in time. + relsizes: BTreeMap, +} + +impl Layer for SnapshotLayer { + fn is_frozen(&self) -> bool { + return true; + } + + fn get_timeline_id(&self) -> ZTimelineId { + return self.timelineid; + } + + fn get_relish_tag(&self) -> RelishTag { + return self.rel; + } + + fn is_dropped(&self) -> bool { + return self.dropped; + } + + fn get_start_lsn(&self) -> Lsn { + return self.start_lsn; + } + + fn get_end_lsn(&self) -> Lsn { + return self.end_lsn; + } + + /// Look up given page in the cache. + fn get_page_at_lsn( + &self, + walredo_mgr: &dyn WalRedoManager, + blknum: u32, + lsn: Lsn, + ) -> Result { + // Scan the BTreeMap backwards, starting from the given entry. + let mut records: Vec = Vec::new(); + let mut page_img: Option = None; + let mut need_base_image_lsn: Option = Some(lsn); + { + let inner = self.load()?; + let minkey = (blknum, Lsn(0)); + let maxkey = (blknum, lsn); + let mut iter = inner + .page_versions + .range((Included(&minkey), Included(&maxkey))); + while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { + if let Some(img) = &entry.page_image { + page_img = Some(img.clone()); + need_base_image_lsn = None; + break; + } else if let Some(rec) = &entry.record { + records.push(rec.clone()); + if rec.will_init { + // This WAL record initializes the page, so no need to go further back + need_base_image_lsn = None; + break; + } else { + need_base_image_lsn = Some(*entry_lsn); + } + } else { + // No base image, and no WAL record. Huh? + bail!("no page image or WAL record for requested page"); + } + } + + // release lock on 'inner' + } + records.reverse(); + + // If we needed a base image to apply the WAL records against, we should have found it in memory. + if let Some(lsn) = need_base_image_lsn { + if records.is_empty() { + // no records, and no base image. This can happen if PostgreSQL extends a relation + // but never writes the page. + // + // Would be nice to detect that situation better. + warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn); + return Ok(ZERO_PAGE.clone()); + } + bail!( + "No base image found for page {} blk {} at {}/{}", + self.rel, + blknum, + self.timelineid, + lsn + ); + } + + // If we have a page image, and no WAL, we're all set + if records.is_empty() { + if let Some(img) = page_img { + trace!( + "found page image for blk {} in {} at {}/{}, no WAL redo required", + blknum, + self.rel, + self.timelineid, + lsn + ); + Ok(img) + } else { + // FIXME: this ought to be an error? + warn!("Page {} blk {} at {} not found", self.rel, blknum, lsn); + Ok(ZERO_PAGE.clone()) + } + } else { + // We need to do WAL redo. + // + // If we don't have a base image, then the oldest WAL record better initialize + // the page + if page_img.is_none() && !records.first().unwrap().will_init { + // FIXME: this ought to be an error? + warn!( + "Base image for page {} blk {} at {} not found, but got {} WAL records", + self.rel, + blknum, + lsn, + records.len() + ); + Ok(ZERO_PAGE.clone()) + } else { + if page_img.is_some() { + trace!("found {} WAL records and a base image for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn); + } else { + trace!("found {} WAL records that will init the page for blk {} in {} at {}/{}, performing WAL redo", records.len(), blknum, self.rel, self.timelineid, lsn); + } + let img = walredo_mgr.request_redo(self.rel, blknum, lsn, page_img, records)?; + + // FIXME: Should we memoize the page image in memory, so that + // we wouldn't need to reconstruct it again, if it's requested again? + //self.put_page_image(blknum, lsn, img.clone())?; + + Ok(img) + } + } + } + + /// Get size of the relation at given LSN + fn get_relish_size(&self, lsn: Lsn) -> Result> { + // Scan the BTreeMap backwards, starting from the given entry. + let inner = self.load()?; + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); + + if let Some((_entry_lsn, entry)) = iter.next_back() { + let result = *entry; + drop(inner); + trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result); + Ok(Some(result)) + } else { + Ok(None) + } + } + + /// Does this relation exist at given LSN? + fn get_rel_exists(&self, lsn: Lsn) -> Result { + // Is the requested LSN after the rel was dropped? + if self.dropped && lsn >= self.end_lsn { + return Ok(false); + } + + // Otherwise, it exists. + Ok(true) + } + + // Unsupported write operations + fn put_page_version(&self, blknum: u32, lsn: Lsn, _pv: PageVersion) -> Result<()> { + panic!( + "cannot modify historical snapshot layer, rel {} blk {} at {}/{}, {}-{}", + self.rel, blknum, self.timelineid, lsn, self.start_lsn, self.end_lsn + ); + } + fn put_truncation(&self, _lsn: Lsn, _relsize: u32) -> anyhow::Result<()> { + bail!("cannot modify historical snapshot layer"); + } + + fn put_unlink(&self, _lsn: Lsn) -> anyhow::Result<()> { + bail!("cannot modify historical snapshot layer"); + } + + fn freeze( + &self, + _end_lsn: Lsn, + _walredo_mgr: &dyn WalRedoManager, + ) -> Result>> { + bail!("cannot freeze historical snapshot layer"); + } + + fn delete(&self) -> Result<()> { + // delete underlying file + fs::remove_file(self.path())?; + Ok(()) + } + + /// + /// Release most of the memory used by this layer. If it's accessed again later, + /// it will need to be loaded back. + /// + fn unload(&self) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.page_versions = BTreeMap::new(); + inner.relsizes = BTreeMap::new(); + inner.loaded = false; + Ok(()) + } +} + +impl SnapshotLayer { + fn path(&self) -> PathBuf { + Self::path_for( + self.conf, + self.timelineid, + self.tenantid, + &SnapshotFileName { + rel: self.rel, + start_lsn: self.start_lsn, + end_lsn: self.end_lsn, + dropped: self.dropped, + }, + ) + } + + fn path_for( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + fname: &SnapshotFileName, + ) -> PathBuf { + conf.timeline_path(&timelineid, &tenantid) + .join(fname.to_string()) + } + + /// Create a new snapshot file, using the given btreemaps containing the page versions and + /// relsizes. + /// + /// This is used to write the in-memory layer to disk. The in-memory layer uses the same + /// data structure with two btreemaps as we do, so passing the btreemaps is currently + /// expedient. + pub fn create( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + rel: RelishTag, + start_lsn: Lsn, + end_lsn: Lsn, + dropped: bool, + page_versions: BTreeMap<(u32, Lsn), PageVersion>, + relsizes: BTreeMap, + ) -> Result { + let snapfile = SnapshotLayer { + conf: conf, + timelineid: timelineid, + tenantid: tenantid, + rel: rel, + start_lsn: start_lsn, + end_lsn, + dropped, + inner: Mutex::new(SnapshotLayerInner { + loaded: true, + page_versions: page_versions, + relsizes: relsizes, + }), + }; + let inner = snapfile.inner.lock().unwrap(); + + // Write the in-memory btreemaps into a file + let path = snapfile.path(); + + // Note: This overwrites any existing file. There shouldn't be any. + // FIXME: throw an error instead? + let file = File::create(&path)?; + let book = BookWriter::new(file, SNAPSHOT_FILE_MAGIC)?; + + // Write out page versions + let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER); + let buf = BTreeMap::ser(&inner.page_versions)?; + chapter.write_all(&buf)?; + let book = chapter.close()?; + + // and relsizes to separate chapter + let mut chapter = book.new_chapter(REL_SIZES_CHAPTER); + let buf = BTreeMap::ser(&inner.relsizes)?; + chapter.write_all(&buf)?; + let book = chapter.close()?; + + book.close()?; + + trace!("saved {}", &path.display()); + + drop(inner); + + Ok(snapfile) + } + + /// + /// Load the contents of the file into memory + /// + fn load(&self) -> Result> { + // quick exit if already loaded + let mut inner = self.inner.lock().unwrap(); + + if inner.loaded { + return Ok(inner); + } + + let path = Self::path_for( + self.conf, + self.timelineid, + self.tenantid, + &SnapshotFileName { + rel: self.rel, + start_lsn: self.start_lsn, + end_lsn: self.end_lsn, + dropped: self.dropped, + }, + ); + + let file = File::open(&path)?; + let book = Book::new(file)?; + + let chapter = book.read_chapter(PAGE_VERSIONS_CHAPTER)?; + let page_versions = BTreeMap::des(&chapter)?; + + let chapter = book.read_chapter(REL_SIZES_CHAPTER)?; + let relsizes = BTreeMap::des(&chapter)?; + + debug!("loaded from {}", &path.display()); + + *inner = SnapshotLayerInner { + loaded: true, + page_versions, + relsizes, + }; + + Ok(inner) + } + + /// Create SnapshotLayers representing all files on dik + /// + // TODO: returning an Iterator would be more idiomatic + pub fn list_snapshot_files( + conf: &'static PageServerConf, + timelineid: ZTimelineId, + tenantid: ZTenantId, + ) -> Result>> { + let path = conf.timeline_path(&timelineid, &tenantid); + + let mut snapfiles: Vec> = Vec::new(); + for direntry in fs::read_dir(path)? { + let fname = direntry?.file_name(); + let fname = fname.to_str().unwrap(); + + if let Some(snapfilename) = SnapshotFileName::from_str(fname) { + let snapfile = SnapshotLayer { + conf, + timelineid, + tenantid, + rel: snapfilename.rel, + start_lsn: snapfilename.start_lsn, + end_lsn: snapfilename.end_lsn, + dropped: snapfilename.dropped, + inner: Mutex::new(SnapshotLayerInner { + loaded: false, + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), + }; + + snapfiles.push(Arc::new(snapfile)); + } + } + return Ok(snapfiles); + } + + /// debugging function to print out the contents of the layer + #[allow(unused)] + pub fn dump(&self) -> String { + let mut result = format!( + "----- snapshot layer for {} {}-{} ----\n", + self.rel, self.start_lsn, self.end_lsn + ); + + let inner = self.inner.lock().unwrap(); + for (k, v) in inner.relsizes.iter() { + result += &format!("{}: {}\n", k, v); + } + //for (k, v) in inner.page_versions.iter() { + // result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some()); + //} + + result + } +} diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs new file mode 100644 index 0000000000..7ba5769e2d --- /dev/null +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -0,0 +1,123 @@ +use crate::relish::RelishTag; +use crate::repository::WALRecord; +use crate::walredo::WalRedoManager; +use crate::ZTimelineId; +use anyhow::Result; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +use zenith_utils::lsn::Lsn; + +pub static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); + +/// +/// Represents a version of a page at a specific LSN. The LSN is the key of the +/// entry in the 'page_versions' hash, it is not duplicated here. +/// +/// A page version can be stored as a full page image, or as WAL record that needs +/// to be applied over the previous page version to reconstruct this version. +/// +/// It's also possible to have both a WAL record and a page image in the same +/// PageVersion. That happens if page version is originally stored as a WAL record +/// but it is later reconstructed by a GetPage@LSN request by performing WAL +/// redo. The get_page_at_lsn() code will store the reconstructed pag image next to +/// the WAL record in that case. TODO: That's pretty accidental, not the result +/// of any grand design. If we want to keep reconstructed page versions around, we +/// probably should have a separate buffer cache so that we could control the +/// replacement policy globally. Or if we keep a reconstructed page image, we +/// could throw away the WAL record. +/// +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PageVersion { + /// an 8kb page image + pub page_image: Option, + /// WAL record to get from previous page version to this one. + pub record: Option, +} + +/// +/// A Layer holds all page versions for one relish, in a range of LSNs. +/// There are two kinds of layers, in-memory and snapshot layers. In-memory +/// layers are used to ingest incoming WAL, and provide fast access +/// to the recent page versions. Snaphot layers are stored on disk, and +/// are immutable. +/// +/// Each layer contains a full snapshot of the relish at the start +/// LSN. In addition to that, it contains WAL (or more page images) +/// needed to recontruct any page version up to the end LSN. +/// +pub trait Layer: Send + Sync { + // These functions identify the relish and the LSN range that this Layer + // holds. + fn get_timeline_id(&self) -> ZTimelineId; + fn get_relish_tag(&self) -> RelishTag; + fn get_start_lsn(&self) -> Lsn; + fn get_end_lsn(&self) -> Lsn; + fn is_dropped(&self) -> bool; + + /// Frozen layers are stored on disk, an cannot accept cannot accept new WAL + /// records, whereas an unfrozen layer can still be modified, but is not + /// durable in case of a crash. Snapshot layers are always frozen, and + /// in-memory layers are always unfrozen. + fn is_frozen(&self) -> bool; + + // Functions that correspond to the Timeline trait functions. + fn get_page_at_lsn( + &self, + walredo_mgr: &dyn WalRedoManager, + blknum: u32, + lsn: Lsn, + ) -> Result; + + fn get_relish_size(&self, lsn: Lsn) -> Result>; + + fn get_rel_exists(&self, lsn: Lsn) -> Result; + + fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()>; + + fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()>; + + fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()>; + + /// Remember new page version, as a WAL record over previous version + fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<()> { + self.put_page_version( + blknum, + rec.lsn, + PageVersion { + page_image: None, + record: Some(rec), + }, + ) + } + + /// Remember new page version, as a full page image + fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> { + self.put_page_version( + blknum, + lsn, + PageVersion { + page_image: Some(img), + record: None, + }, + ) + } + + /// + /// Split off an immutable layer from existing layer. + /// + /// Returns new layers that replace this one. + /// + fn freeze(&self, end_lsn: Lsn, walredo_mgr: &dyn WalRedoManager) + -> Result>>; + + /// Permanently delete this layer + fn delete(&self) -> Result<()>; + + /// Try to release memory used by this layer. This is currently + /// only used by snapshot layers, to free the copy of the file + /// from memory. (TODO: a smarter, more granular caching scheme + /// would be nice) + fn unload(&self) -> Result<()>; +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 8354426c22..8bc8d62fc2 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -9,6 +9,7 @@ use zenith_metrics::{register_int_gauge_vec, IntGaugeVec}; pub mod basebackup; pub mod branches; +pub mod layered_repository; pub mod logger; pub mod object_key; pub mod object_repository; @@ -54,6 +55,14 @@ pub struct PageServerConf { pub auth_type: AuthType, pub auth_validation_public_key_path: Option, + + pub repository_format: RepositoryFormat, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum RepositoryFormat { + Layered, + RocksDb, } impl PageServerConf { diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index db24df861f..b4990eaba1 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -2,11 +2,12 @@ //! page server. use crate::branches; +use crate::layered_repository::LayeredRepository; use crate::object_repository::ObjectRepository; use crate::repository::Repository; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::PostgresRedoManager; -use crate::PageServerConf; +use crate::{PageServerConf, RepositoryFormat}; use anyhow::{anyhow, bail, Result}; use lazy_static::lazy_static; use log::info; @@ -27,16 +28,35 @@ pub fn init(conf: &'static PageServerConf) { for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() { let tenantid = ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap(); - let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap(); // Set up a WAL redo manager, for applying WAL records. let walredo_mgr = PostgresRedoManager::new(conf, tenantid); // Set up an object repository, for actual data storage. - let repo = - ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid); + let repo: Arc = match conf.repository_format { + RepositoryFormat::Layered => { + let repo = Arc::new(LayeredRepository::new( + conf, + Arc::new(walredo_mgr), + tenantid, + )); + LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); + repo + } + RepositoryFormat::RocksDb => { + let obj_store = RocksObjectStore::open(conf, &tenantid).unwrap(); + + Arc::new(ObjectRepository::new( + conf, + Arc::new(obj_store), + Arc::new(walredo_mgr), + tenantid, + )) + } + }; + info!("initialized storage for tenant: {}", &tenantid); - m.insert(tenantid, Arc::new(repo)); + m.insert(tenantid, repo); } } @@ -53,7 +73,7 @@ pub fn create_repository_for_tenant( let wal_redo_manager = Arc::new(PostgresRedoManager::new(conf, tenantid)); let repo = branches::create_repo(conf, tenantid, wal_redo_manager)?; - m.insert(tenantid, Arc::new(repo)); + m.insert(tenantid, repo); Ok(()) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2caffb1eb2..ce36dce505 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -693,6 +693,18 @@ impl postgres_backend::Handler for PageServerHandler { RowDescriptor::int8_col(b"control_deleted"), RowDescriptor::int8_col(b"filenodemap_deleted"), RowDescriptor::int8_col(b"dropped"), + RowDescriptor::int8_col(b"snapshot_relfiles_total"), + RowDescriptor::int8_col(b"snapshot_relfiles_needed_by_cutoff"), + RowDescriptor::int8_col(b"snapshot_relfiles_needed_by_branches"), + RowDescriptor::int8_col(b"snapshot_relfiles_not_updated"), + RowDescriptor::int8_col(b"snapshot_relfiles_removed"), + RowDescriptor::int8_col(b"snapshot_relfiles_dropped"), + RowDescriptor::int8_col(b"snapshot_nonrelfiles_total"), + RowDescriptor::int8_col(b"snapshot_nonrelfiles_needed_by_cutoff"), + RowDescriptor::int8_col(b"snapshot_nonrelfiles_needed_by_branches"), + RowDescriptor::int8_col(b"snapshot_nonrelfiles_not_updated"), + RowDescriptor::int8_col(b"snapshot_nonrelfiles_removed"), + RowDescriptor::int8_col(b"snapshot_nonrelfiles_dropped"), RowDescriptor::int8_col(b"elapsed"), ]))? .write_message_noflush(&BeMessage::DataRow(&[ @@ -705,6 +717,43 @@ impl postgres_backend::Handler for PageServerHandler { Some(&result.control_deleted.to_string().as_bytes()), Some(&result.filenodemap_deleted.to_string().as_bytes()), Some(&result.dropped.to_string().as_bytes()), + Some(&result.snapshot_relfiles_total.to_string().as_bytes()), + Some( + &result + .snapshot_relfiles_needed_by_cutoff + .to_string() + .as_bytes(), + ), + Some( + &result + .snapshot_relfiles_needed_by_branches + .to_string() + .as_bytes(), + ), + Some(&result.snapshot_relfiles_not_updated.to_string().as_bytes()), + Some(&result.snapshot_relfiles_removed.to_string().as_bytes()), + Some(&result.snapshot_relfiles_dropped.to_string().as_bytes()), + Some(&result.snapshot_nonrelfiles_total.to_string().as_bytes()), + Some( + &result + .snapshot_nonrelfiles_needed_by_cutoff + .to_string() + .as_bytes(), + ), + Some( + &result + .snapshot_nonrelfiles_needed_by_branches + .to_string() + .as_bytes(), + ), + Some( + &result + .snapshot_nonrelfiles_not_updated + .to_string() + .as_bytes(), + ), + Some(&result.snapshot_nonrelfiles_removed.to_string().as_bytes()), + Some(&result.snapshot_nonrelfiles_dropped.to_string().as_bytes()), Some(&result.elapsed.as_millis().to_string().as_bytes()), ]))? .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; diff --git a/pageserver/src/relish.rs b/pageserver/src/relish.rs index 7484e0848f..4c050e4617 100644 --- a/pageserver/src/relish.rs +++ b/pageserver/src/relish.rs @@ -120,7 +120,16 @@ impl RelishTag { // and these don't | RelishTag::ControlFile - | RelishTag::Checkpoint => false, + | RelishTag::Checkpoint => false, + } + } + + // convenience function to check if this relish is a normal relation. + pub const fn is_relation(&self) -> bool { + if let RelishTag::Relation(_) = self { + true + } else { + false } } } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index bb6388a532..501b7a6254 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -5,6 +5,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::iter::Iterator; +use std::ops::AddAssign; use std::sync::Arc; use std::time::Duration; use zenith_utils::lsn::Lsn; @@ -56,6 +57,8 @@ pub trait Repository: Send + Sync { /// #[derive(Default)] pub struct GcResult { + // FIXME: These counters make sense for the ObjectRepository. They are not used + // by the LayeredRepository. pub n_relations: u64, pub inspected: u64, pub truncated: u64, @@ -66,9 +69,51 @@ pub struct GcResult { pub control_deleted: u64, // RelishTag::ControlFile pub filenodemap_deleted: u64, // RelishTag::FileNodeMap pub dropped: u64, + + // These are used for the LayeredRepository instead + pub snapshot_relfiles_total: u64, + pub snapshot_relfiles_needed_by_cutoff: u64, + pub snapshot_relfiles_needed_by_branches: u64, + pub snapshot_relfiles_not_updated: u64, + pub snapshot_relfiles_removed: u64, // # of snapshot files removed because they have been made obsolete by newer snapshot files. + pub snapshot_relfiles_dropped: u64, // # of snapshot files removed because the relation was dropped + + pub snapshot_nonrelfiles_total: u64, + pub snapshot_nonrelfiles_needed_by_cutoff: u64, + pub snapshot_nonrelfiles_needed_by_branches: u64, + pub snapshot_nonrelfiles_not_updated: u64, + pub snapshot_nonrelfiles_removed: u64, // # of snapshot files removed because they have been made obsolete by newer snapshot files. + pub snapshot_nonrelfiles_dropped: u64, // # of snapshot files removed because the relation was dropped + pub elapsed: Duration, } +impl AddAssign for GcResult { + fn add_assign(&mut self, other: Self) { + self.n_relations += other.n_relations; + self.truncated += other.truncated; + self.deleted += other.deleted; + self.dropped += other.dropped; + + self.snapshot_relfiles_total += other.snapshot_relfiles_total; + self.snapshot_relfiles_needed_by_cutoff += other.snapshot_relfiles_needed_by_cutoff; + self.snapshot_relfiles_needed_by_branches += other.snapshot_relfiles_needed_by_branches; + self.snapshot_relfiles_not_updated += other.snapshot_relfiles_not_updated; + self.snapshot_relfiles_removed += other.snapshot_relfiles_removed; + self.snapshot_relfiles_dropped += other.snapshot_relfiles_dropped; + + self.snapshot_nonrelfiles_total += other.snapshot_nonrelfiles_total; + self.snapshot_nonrelfiles_needed_by_cutoff += other.snapshot_nonrelfiles_needed_by_cutoff; + self.snapshot_nonrelfiles_needed_by_branches += + other.snapshot_nonrelfiles_needed_by_branches; + self.snapshot_nonrelfiles_not_updated += other.snapshot_nonrelfiles_not_updated; + self.snapshot_nonrelfiles_removed += other.snapshot_nonrelfiles_removed; + self.snapshot_nonrelfiles_dropped += other.snapshot_nonrelfiles_dropped; + + self.elapsed += other.elapsed; + } +} + pub trait Timeline: Send + Sync { //------------------------------------------------------------------------------ // Public GET functions @@ -234,11 +279,12 @@ impl WALRecord { #[cfg(test)] mod tests { use super::*; + use crate::layered_repository::LayeredRepository; use crate::object_repository::ObjectRepository; use crate::object_repository::{ObjectValue, PageEntry, RelationSizeEntry}; use crate::rocksdb_storage::RocksObjectStore; use crate::walredo::{WalRedoError, WalRedoManager}; - use crate::PageServerConf; + use crate::{PageServerConf, RepositoryFormat}; use postgres_ffi::pg_constants; use std::fs; use std::path::PathBuf; @@ -272,10 +318,16 @@ mod tests { buf.freeze() } - fn get_test_repo(test_name: &str) -> Result> { + static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); + + fn get_test_repo( + test_name: &str, + repository_format: RepositoryFormat, + ) -> Result> { let repo_dir = PathBuf::from(format!("../tmp_check/test_{}", test_name)); let _ = fs::remove_dir_all(&repo_dir); - fs::create_dir_all(&repo_dir).unwrap(); + fs::create_dir_all(&repo_dir)?; + fs::create_dir_all(&repo_dir.join("timelines"))?; let conf = PageServerConf { daemonize: false, @@ -288,6 +340,7 @@ mod tests { pg_distrib_dir: "".into(), auth_type: AuthType::Trust, auth_validation_public_key_path: None, + repository_format, }; // Make a static copy of the config. This can never be free'd, but that's // OK in a test. @@ -295,24 +348,47 @@ mod tests { let tenantid = ZTenantId::generate(); fs::create_dir_all(conf.tenant_path(&tenantid)).unwrap(); - let obj_store = RocksObjectStore::create(conf, &tenantid)?; - let walredo_mgr = TestRedoManager {}; - let repo = - ObjectRepository::new(conf, Arc::new(obj_store), Arc::new(walredo_mgr), tenantid); + let repo: Box = match conf.repository_format { + RepositoryFormat::Layered => Box::new(LayeredRepository::new( + conf, + Arc::new(walredo_mgr), + tenantid, + )), + RepositoryFormat::RocksDb => { + let obj_store = RocksObjectStore::create(conf, &tenantid)?; - Ok(Box::new(repo)) + Box::new(ObjectRepository::new( + conf, + Arc::new(obj_store), + Arc::new(walredo_mgr), + tenantid, + )) + } + }; + + Ok(repo) } /// Test get_relsize() and truncation. #[test] - fn test_relsize() -> Result<()> { + fn test_relsize_rocksdb() -> Result<()> { + let repo = get_test_repo("test_relsize_rocksdb", RepositoryFormat::RocksDb)?; + test_relsize(&*repo) + } + + #[test] + fn test_relsize_layered() -> Result<()> { + let repo = get_test_repo("test_relsize_layered", RepositoryFormat::Layered)?; + test_relsize(&*repo) + } + + fn test_relsize(repo: &dyn Repository) -> Result<()> { // get_timeline() with non-existent timeline id should fail //repo.get_timeline("11223344556677881122334455667788"); // Create timeline to work on - let repo = get_test_repo("test_relsize")?; let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; @@ -397,14 +473,24 @@ mod tests { /// This isn't very interesting with the RocksDb implementation, as we don't pay /// any attention to Postgres segment boundaries there. #[test] - fn test_large_rel() -> Result<()> { - let repo = get_test_repo("test_large_rel")?; + fn test_large_rel_rocksdb() -> Result<()> { + let repo = get_test_repo("test_large_rel_rocksdb", RepositoryFormat::RocksDb)?; + test_large_rel(&*repo) + } + + #[test] + fn test_large_rel_layered() -> Result<()> { + let repo = get_test_repo("test_large_rel_layered", RepositoryFormat::Layered)?; + test_large_rel(&*repo) + } + + fn test_large_rel(repo: &dyn Repository) -> Result<()> { let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; tline.init_valid_lsn(Lsn(1)); - let mut lsn = 0; + let mut lsn = 1; for blknum in 0..pg_constants::RELSEG_SIZE + 1 { let img = TEST_IMG(&format!("foo blk {} at {}", blknum, Lsn(lsn))); lsn += 1; @@ -450,15 +536,29 @@ mod tests { })) } + #[test] + fn test_branch_rocksdb() -> Result<()> { + let repo = get_test_repo("test_branch_rocksdb", RepositoryFormat::RocksDb)?; + test_branch(&*repo) + } + + #[test] + fn test_branch_layered() -> Result<()> { + let repo = get_test_repo("test_branch_layered", RepositoryFormat::Layered)?; + test_branch(&*repo) + } + /// /// Test branch creation /// - #[test] - fn test_branch() -> Result<()> { - let repo = get_test_repo("test_branch")?; + fn test_branch(repo: &dyn Repository) -> Result<()> { let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; + // Import initial dummy checkpoint record, otherwise the get_timeline() call + // after branching fails below + tline.put_page_image(RelishTag::Checkpoint, 0, Lsn(1), ZERO_PAGE.clone(), false)?; + // Create a relation on the timeline tline.init_valid_lsn(Lsn(1)); tline.put_page_image(TESTREL_A, 0, Lsn(2), TEST_IMG("foo blk 0 at 2"), true)?; @@ -500,8 +600,19 @@ mod tests { } #[test] - fn test_history() -> Result<()> { - let repo = get_test_repo("test_snapshot")?; + fn test_history_rocksdb() -> Result<()> { + let repo = get_test_repo("test_history_rocksdb", RepositoryFormat::RocksDb)?; + test_history(&*repo) + } + #[test] + // TODO: This doesn't work with the layered storage, the functions needed for push/pull + // functionality haven't been implemented yet. + #[ignore] + fn test_history_layered() -> Result<()> { + let repo = get_test_repo("test_history_layered", RepositoryFormat::Layered)?; + test_history(&*repo) + } + fn test_history(repo: &dyn Repository) -> Result<()> { let timelineid = ZTimelineId::from_str("11223344556677881122334455667788").unwrap(); let tline = repo.create_empty_timeline(timelineid, Lsn(0))?; diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index b3523c3bbb..c7f84d7bad 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -132,6 +132,7 @@ pub fn import_timeline_from_postgres_datadir( } // TODO: Scan pg_tblspc + timeline.advance_last_valid_lsn(lsn); timeline.checkpoint()?; Ok(()) @@ -425,12 +426,12 @@ pub fn save_decoded_record( save_xact_record(timeline, lsn, &parsed_xact, decoded)?; // Remove twophase file. see RemoveTwoPhaseFile() in postgres code info!( - "unlink twophaseFile for xid {} parsed_xact.xid {} here", - decoded.xl_xid, parsed_xact.xid + "unlink twophaseFile for xid {} parsed_xact.xid {} here at {}", + decoded.xl_xid, parsed_xact.xid, lsn ); timeline.put_unlink( RelishTag::TwoPhase { - xid: decoded.xl_xid, + xid: parsed_xact.xid, }, lsn, )?; @@ -795,7 +796,13 @@ fn save_clog_truncate_record( // Iterate via SLRU CLOG segments and unlink segments that we're ready to truncate // TODO This implementation is very inefficient - // it scans all non-rels only to find Clog - for obj in timeline.list_nonrels(lsn)? { + // + // We cannot pass 'lsn' to the Timeline.list_nonrels(), or it + // will block waiting for the last valid LSN to advance up to + // it. So we use the previous record's LSN in the get calls + // instead. + let req_lsn = min(timeline.get_last_record_lsn(), lsn); + for obj in timeline.list_nonrels(req_lsn)? { match obj { RelishTag::Slru { slru, segno } => { if slru == SlruKind::Clog { diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 8ae690a251..069f84e2ff 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -8,7 +8,7 @@ use crate::page_cache; use crate::relish::*; use crate::restore_local_repo; use crate::waldecoder::*; -use crate::PageServerConf; +use crate::{PageServerConf, RepositoryFormat}; use anyhow::{Error, Result}; use lazy_static::lazy_static; use log::*; @@ -264,7 +264,11 @@ fn walreceiver_main( )?; if newest_segno - oldest_segno >= 10 { - timeline.checkpoint()?; + // FIXME: The layered repository performs checkpointing in a separate thread, so this + // isn't needed anymore. Remove 'checkpoint' from the Timeline trait altogether? + if conf.repository_format == RepositoryFormat::RocksDb { + timeline.checkpoint()?; + } // TODO: This is where we could remove WAL older than last_rec_lsn. //remove_wal_files(timelineid, pg_constants::WAL_SEGMENT_SIZE, last_rec_lsn)?; diff --git a/test_runner/batch_others/test_gc.py b/test_runner/batch_others/test_gc.py index 9a3f7f3d25..2e58b5096d 100644 --- a/test_runner/batch_others/test_gc.py +++ b/test_runner/batch_others/test_gc.py @@ -14,7 +14,8 @@ pytest_plugins = ("fixtures.zenith_fixtures") # @pytest.mark.skip(reason="""" Current GC test is flaky and overly strict. Since we are migrating to the layered repo format - with different GC implementation let's just silence this test for now. + with different GC implementation let's just silence this test for now. This test only + works with the RocksDB implementation. """) def test_gc(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): zenith_cli.run(["branch", "test_gc", "empty"]) diff --git a/test_runner/batch_others/test_snapfiles_gc.py b/test_runner/batch_others/test_snapfiles_gc.py new file mode 100644 index 0000000000..761dc95b31 --- /dev/null +++ b/test_runner/batch_others/test_snapfiles_gc.py @@ -0,0 +1,122 @@ +from contextlib import closing +import psycopg2.extras +import time; + +pytest_plugins = ("fixtures.zenith_fixtures") + +def print_gc_result(row): + print("GC duration {elapsed} ms".format_map(row)); + print(" REL total: {snapshot_relfiles_total}, needed_by_cutoff {snapshot_relfiles_needed_by_cutoff}, needed_by_branches: {snapshot_relfiles_needed_by_branches}, not_updated: {snapshot_relfiles_not_updated}, removed: {snapshot_relfiles_removed}, dropped: {snapshot_relfiles_dropped}".format_map(row)) + print(" NONREL total: {snapshot_nonrelfiles_total}, needed_by_cutoff {snapshot_nonrelfiles_needed_by_cutoff}, needed_by_branches: {snapshot_nonrelfiles_needed_by_branches}, not_updated: {snapshot_nonrelfiles_not_updated}, removed: {snapshot_nonrelfiles_removed}, dropped: {snapshot_nonrelfiles_dropped}".format_map(row)) + + +# +# Test Garbage Collection of old snapshot files +# +# This test is pretty tightly coupled with the current implementation of layered +# storage, in layered_repository.rs. +# +def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin): + zenith_cli.run(["branch", "test_snapfiles_gc", "empty"]) + pg = postgres.create_start('test_snapfiles_gc') + + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + with closing(pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory = psycopg2.extras.DictCursor) as pscur: + + # Get the timeline ID of our branch. We need it for the 'do_gc' command + cur.execute("SHOW zenith.zenith_timeline") + timeline = cur.fetchone()[0] + + # Create a test table + cur.execute("CREATE TABLE foo(x integer)") + + print("Inserting two more rows and running GC") + cur.execute("select relfilenode from pg_class where oid = 'foo'::regclass"); + row = cur.fetchone(); + print("relfilenode is {}", row[0]); + + # Run GC, to clear out any garbage left behind in the catalogs by + # the CREATE TABLE command. We want to have a clean slate with no garbage + # before running the actual tests below, otherwise the counts won't match + # what we expect. + # + # Also run vacuum first to make it less likely that autovacuum or pruning + # kicks in and confuses our numbers. + cur.execute("VACUUM") + + print("Running GC before test") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row); + # remember the number of files + snapshot_relfiles_remain = row['snapshot_relfiles_total'] - row['snapshot_relfiles_removed'] + assert snapshot_relfiles_remain > 0 + + # Insert a row. The first insert will also create a metadata entry for the + # relation, with size == 1 block. Hence, bump up the expected relation count. + snapshot_relfiles_remain += 1; + print("Inserting one row and running GC") + cur.execute("INSERT INTO foo VALUES (1)") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row); + assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + assert row['snapshot_relfiles_removed'] == 0 + assert row['snapshot_relfiles_dropped'] == 0 + + # Insert two more rows and run GC. + # This should create a new snapshot file with the new contents, and + # remove the old one. + print("Inserting two more rows and running GC") + cur.execute("INSERT INTO foo VALUES (2)") + cur.execute("INSERT INTO foo VALUES (3)") + + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row); + assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + 1 + assert row['snapshot_relfiles_removed'] == 1 + assert row['snapshot_relfiles_dropped'] == 0 + + # Do it again. Should again create a new snapshot file and remove old one. + print("Inserting two more rows and running GC") + cur.execute("INSERT INTO foo VALUES (2)") + cur.execute("INSERT INTO foo VALUES (3)") + + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row); + assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + 1 + assert row['snapshot_relfiles_removed'] == 1 + assert row['snapshot_relfiles_dropped'] == 0 + + # Run GC again, with no changes in the database. Should not remove anything. + print("Run GC again, with nothing to do") + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row); + assert row['snapshot_relfiles_total'] == snapshot_relfiles_remain + assert row['snapshot_relfiles_removed'] == 0 + assert row['snapshot_relfiles_dropped'] == 0 + + # + # Test DROP TABLE checks that relation data and metadata was deleted by GC from object storage + # + print("Drop table and run GC again"); + cur.execute("DROP TABLE foo") + + pscur.execute(f"do_gc {pageserver.initial_tenant} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row); + + # Each relation fork is counted separately, hence 3. + assert row['snapshot_relfiles_dropped'] == 3 + + # The catalog updates also create new snapshot files of the catalogs, which + # are counted as 'removed' + assert row['snapshot_relfiles_removed'] > 0 + + # TODO: perhaps we should count catalog and user relations separately, + # to make this kind of testing more robust diff --git a/vendor/postgres b/vendor/postgres index 04cfa326a5..e3175fe60a 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 04cfa326a543171967c16954306f5a9dd8a470ea +Subproject commit e3175fe60a996dfb54568855ba93e6134e62f052 diff --git a/zenith/src/main.rs b/zenith/src/main.rs index e3f6464ef0..abaa972cc1 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -61,6 +61,13 @@ fn main() -> Result<()> { .long("enable-auth") .takes_value(false) .help("Enable authentication using ZenithJWT") + ) + .arg( + Arg::with_name("repository-format") + .long("repository-format") + .takes_value(false) + .value_name("repository-format") + .help("Choose repository format, 'layered' or 'rocksdb'") ), ) .subcommand( @@ -131,8 +138,8 @@ fn main() -> Result<()> { } else { AuthType::Trust }; - - local_env::init(pageserver_uri, tenantid, auth_type) + let repository_format = init_match.value_of("repository-format"); + local_env::init(pageserver_uri, tenantid, auth_type, repository_format) .with_context(|| "Failed to create config file")?; } @@ -151,6 +158,7 @@ fn main() -> Result<()> { if let Err(e) = pageserver.init( Some(&env.tenantid.to_string()), init_match.is_present("enable-auth"), + init_match.value_of("repository-format"), ) { eprintln!("pageserver init failed: {}", e); exit(1); diff --git a/zenith_utils/src/zid.rs b/zenith_utils/src/zid.rs index c5b4128527..a1cc60ca9e 100644 --- a/zenith_utils/src/zid.rs +++ b/zenith_utils/src/zid.rs @@ -126,7 +126,7 @@ macro_rules! zid_newtype { /// is separate from PostgreSQL timelines, and doesn't have those /// limitations. A zenith timeline is identified by a 128-bit ID, which /// is usually printed out as a hex string. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, Serialize, Deserialize)] pub struct ZTimelineId(ZId); zid_newtype!(ZTimelineId);