From be1386a555b05d453ecc553f4337b1007030afda Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 2 Aug 2021 14:59:26 +0300 Subject: [PATCH] Hold list of all layers in memory. Previously, the LayerMap was only used as a cache to hold the snapshot layers that were loaded into memory. As a result, we often had to scan the filesystem to get list of all the other snapshot files that exist on disk, but hadn't been loaded into memory yet. That was very slow, consuming huge amounts of CPU and causing timeouts in any non-trivial tests. Refactor so that on startup, we scan the directory once and keep the list of layers in memory. --- pageserver/src/layered_repository.rs | 467 ++++++++++-------- .../src/layered_repository/inmemory_layer.rs | 145 +++--- .../src/layered_repository/layer_map.rs | 127 +++++ .../src/layered_repository/snapshot_layer.rs | 385 ++++----------- .../src/layered_repository/storage_layer.rs | 7 +- pageserver/src/object_repository.rs | 276 ++++++----- pageserver/src/page_service.rs | 6 +- pageserver/src/repository.rs | 28 +- test_runner/batch_others/test_snapfiles_gc.py | 5 + zenith_utils/src/postgres_backend.rs | 3 +- 10 files changed, 745 insertions(+), 704 deletions(-) create mode 100644 pageserver/src/layered_repository/layer_map.rs diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 342032c534..5a19da2e07 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -27,7 +27,7 @@ use bytes::Bytes; use log::*; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::collections::{BTreeSet, HashSet}; use std::fs; use std::fs::File; @@ -50,10 +50,12 @@ use zenith_utils::seqwait::SeqWait; mod inmemory_layer; mod snapshot_layer; mod storage_layer; +mod layer_map; use inmemory_layer::InMemoryLayer; use snapshot_layer::SnapshotLayer; use storage_layer::Layer; +use layer_map::LayerMap; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -131,6 +133,101 @@ impl Repository for LayeredRepository { Ok(()) } + + // + // How garbage collection works + // -------- + // + // +--bar-------------> + // / + // +----+-----foo----------------> + // / + // ----main--+--------------------------> + // \ + // +-----baz--------> + // + // + // 1. Grab a mutex to prevent new timelines from being created + // 2. Scan all timelines, and on each timeline, make note of the + // all the points where other timelines have been branched off. + // We will refrain from removing page versions at those LSNs. + // 3. For each timeline, scan all snapshot files on the timeline. + // Remove all files for which a newer file exists and which + // don't cover any branch point LSNs. + // + // TODO: + // - if a relation has been modified on a child branch, then we + // we don't need to keep that in the parent anymore. + // + // - Currently, this is only triggered manually by the 'do_gc' command. + // There is no background thread to do it automatically. + fn gc_iteration(&self, target_timelineid: Option, horizon: u64, _compact: bool) -> Result { + let mut totals: GcResult = Default::default(); + let now = Instant::now(); + + // grab mutex to prevent new timelines from being created here. + // TODO: We will hold it for a long time + let mut timelines = self.timelines.lock().unwrap(); + + // Scan all timelines for the branch points. + let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); + + // Remember timelineid and its last_record_lsn for each timeline + let mut timelineids: Vec = Vec::new(); + + let timelines_path = self.conf.timelines_path(&self.tenantid); + for direntry in fs::read_dir(timelines_path)? { + let direntry = direntry?; + if let Some(fname) = direntry.file_name().to_str() { + if let Ok(timelineid) = fname.parse::() { + // Read the metadata of this timeline to get its parent timeline. + // FIXME: we open the timeline below with get_timeline() anyway. + // shouldn't we fetch the metadata from the in-memory Timeline + // struct? + let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)?; + + timelineids.push(timelineid); + + if let Some(ancestor_timeline) = metadata.ancestor_timeline { + all_branchpoints.insert((ancestor_timeline, metadata.ancestor_lsn)); + } + } + } + } + + // Ok, we now know all the branch points. Iterate through them. + for timelineid in timelineids { + // If a target timeline was specified, leave the other timelines alone. + // This is a bit inefficient, we still collect the information for all + // the timelines above. + if let Some(x) = target_timelineid { + if x != timelineid { + continue; + } + } + + let branchpoints: Vec = all_branchpoints + .range(( + Included((timelineid, Lsn(0))), + Included((timelineid, Lsn(u64::MAX))), + )) + .map(|&x| x.1) + .collect(); + + let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?; + let last_lsn = timeline.get_last_valid_lsn(); + + if let Some(cutoff) = last_lsn.checked_sub(horizon) { + info!("running GC on timeline {}, last_lsn {}, cutoff {}", timelineid, last_lsn, cutoff); + let result = timeline.gc_timeline(branchpoints, cutoff)?; + + totals += result; + } + } + + totals.elapsed = now.elapsed(); + Ok(totals) + } } /// Private functions @@ -162,6 +259,9 @@ impl LayeredRepository { self.walredo_mgr.clone(), )?; + // List the snapshot layers on disk, and load them into the layer map + timeline.load_layer_map()?; + // Load any new WAL after the last checkpoint into memory. info!( "Loading WAL for timeline {} starting at {}", @@ -218,123 +318,6 @@ impl LayeredRepository { Ok(TimelineMetadata::des(&data)?) } - - // - // How garbage collection works - // -------- - // - // +--bar-------------> - // / - // +----+-----foo----------------> - // / - // ----main--+--------------------------> - // \ - // +-----baz--------> - // - // - // 1. Grab a mutex to prevent new timelines from being created - // 2. Scan all timelines, and on each timeline, make note of the - // all the points where other timelines have been branched off. - // We will refrain from removing page versions at those LSNs. - // 3. For each timeline, scan all snapshot files on the timeline. - // Remove all files for which a newer file exists and which - // don't cover any branch point LSNs. - // - // TODO: - // - if a relation has been modified on a child branch, then we - // we don't need to keep that in the parent anymore. - // - // - Currently, this is only triggered manually by the 'do_gc' command. - // There is no background thread to do it automatically. - fn gc_iteration(conf: &'static PageServerConf, tenantid: ZTenantId, horizon: u64) -> Result { - let mut totals: GcResult = Default::default(); - let now = Instant::now(); - - // TODO: grab mutex to prevent new timelines from being created here. - - // Scan all timelines for the branch points. - let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); - - // Remember timelineid and its last_record_lsn for each timeline - let mut timelines: Vec<(ZTimelineId, Lsn)> = Vec::new(); - - let timelines_path = conf.timelines_path(&tenantid); - for direntry in fs::read_dir(timelines_path)? { - let direntry = direntry?; - if let Some(fname) = direntry.file_name().to_str() { - if let Ok(timelineid) = fname.parse::() { - // Read the metadata of this timeline to get its parent timeline. - let metadata = Self::load_metadata(conf, timelineid, tenantid)?; - - timelines.push((timelineid, metadata.last_record_lsn)); - - if let Some(ancestor_timeline) = metadata.ancestor_timeline { - all_branchpoints.insert((ancestor_timeline, metadata.ancestor_lsn)); - } - } - } - } - - // Ok, we now know all the branch points. Iterate through them. - for (timelineid, last_lsn) in timelines { - let branchpoints: Vec = all_branchpoints - .range(( - Included((timelineid, Lsn(0))), - Included((timelineid, Lsn(u64::MAX))), - )) - .map(|&x| x.1) - .collect(); - - if let Some(cutoff) = last_lsn.checked_sub(horizon) { - let result = SnapshotLayer::gc_timeline(conf, timelineid, tenantid, branchpoints, cutoff)?; - - totals += result; - } - } - - totals.elapsed = now.elapsed(); - Ok(totals) - } -} - -/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN. -/// It provides a couple of convenience functions over a plain BTreeMap -struct LayerMap(BTreeMap<(RelishTag, Lsn), Arc>); - -impl LayerMap { - /// - /// Look up using the given rel tag and LSN. This differs from a plain - /// key-value lookup in that if there is any layer that covers the - /// given LSN, or precedes the given LSN, it is returned. In other words, - /// you don't need to know the exact start LSN of the layer. - /// - fn get(&self, tag: RelishTag, lsn: Lsn) -> Option> { - let startkey = (tag, Lsn(0)); - let endkey = (tag, lsn); - - if let Some((_k, v)) = self - .0 - .range((Included(startkey), Included(endkey))) - .next_back() - { - Some(Arc::clone(v)) - } else { - None - } - } - - fn insert(&mut self, layer: Arc) { - let rel = layer.get_relish_tag(); - let start_lsn = layer.get_start_lsn(); - - self.0.insert((rel, start_lsn), Arc::clone(&layer)); - } -} - -impl Default for LayerMap { - fn default() -> Self { - LayerMap(BTreeMap::new()) - } } /// Metadata stored on disk for each timeline @@ -462,15 +445,12 @@ impl Timeline for LayeredTimeline { } fn list_rels(&self, spcnode: u32, dbnode: u32, _lsn: Lsn) -> Result> { - // SnapshotLayer::list_rels works by scanning the directory on disk. Make sure - // we have a file on disk for each relation. - self.checkpoint()?; // List all rels in this timeline, and all its ancestors. let mut all_rels = HashSet::new(); let mut timeline = self; loop { - let rels = SnapshotLayer::list_rels(self.conf, timeline.timelineid, timeline.tenantid, spcnode, dbnode)?; + let rels = timeline.layers.lock().unwrap().list_rels(spcnode, dbnode)?; // FIXME: We should filter out relations that don't exist at the given LSN. all_rels.extend(rels.iter()); @@ -493,11 +473,7 @@ impl Timeline for LayeredTimeline { let mut all_rels = HashSet::new(); let mut timeline = self; loop { - // SnapshotFile::list_rels works by scanning the directory on disk. Make sure - // we have a file on disk for each relation. - timeline.checkpoint()?; - - let rels = SnapshotLayer::list_nonrels(self.conf, timeline.timelineid, timeline.tenantid, lsn)?; + let rels = timeline.layers.lock().unwrap().list_nonrels(lsn)?; // FIXME: We should filter out relishes that don't exist at the given LSN. all_rels.extend(rels.iter()); @@ -518,23 +494,6 @@ impl Timeline for LayeredTimeline { todo!(); } - fn gc_iteration(&self, horizon: u64, _compact: bool) -> Result { - // In the layered repository, event to GC a single timeline, - // we have to scan all the timelines to determine what child - // timelines there are, so that we know to retain snapshot - // files that are still needed by the children. So we just do - // GC on the whole repository. - // - // FIXME: This makes writing repeatable tests harder, if - // activity on other timelines can affect the counters that - // we return - - // But do flush the in-memory layers to disk first. - self.checkpoint()?; - - LayeredRepository::gc_iteration(self.conf, self.tenantid, horizon) - } - fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { if !rel.is_blocky() && blknum != 0 { bail!( @@ -633,12 +592,28 @@ impl Timeline for LayeredTimeline { // probably too aggressive. Some kind of LRU policy would be // appropriate. // + + // Call freeze() for any unfrozen layers (that is, layers that + // haven't been written to disk yet) + // Call unload() for all layers, to release memory. + // + // FIXME: We do this by creating a whole new LayerMap. I couldn't + // figure out the borrowing rules to remove and insert entries + // to the old LayerMap while iterating through it. let old_layers = std::mem::take(&mut *layers); - for old_layer in old_layers.0.values() { - if !old_layer.is_frozen() { - if let Some(new_layer) = old_layer.freeze(last_valid_lsn)? { + for layer in old_layers.inner.values() { + if !layer.is_frozen() { + let new_layers = layer.freeze(last_valid_lsn, &*self.walredo_mgr)?; + + for new_layer in new_layers { + info!("freeze returned {} {}-{}", new_layer.get_relish_tag(), new_layer.get_start_lsn(), new_layer.get_end_lsn()); layers.insert(Arc::clone(&new_layer)); } + } else { + // FIXME: if this fails, we're in trouble because we already + // swapped the 'layers' with the empty one. Hence panic on error. + layer.unload().expect("could not unload layer from memory"); + layers.insert(Arc::clone(layer)); } } @@ -786,15 +761,13 @@ impl LayeredTimeline { } loop { - let mut layers = timeline.layers.lock().unwrap(); + let layers = timeline.layers.lock().unwrap(); // // FIXME: If the relation has been dropped, does this return the right // thing? The compute node should not normally request dropped relations, // but if OID wraparound happens the same relfilenode might get reused // for an unrelated relation. // - let mut best_candidate = None; - let mut best_end_lsn = Lsn(0); // First, see if we have loaded a layer in the cache ready. if let Some(layer) = layers.get(rel, lsn) { @@ -826,43 +799,19 @@ impl LayeredTimeline { // So if we find a layer in cache with end-LSN before the request LSN, remember // that, but fall through to check if there is a newer snapshot file on disk before // returning it. - if layer.get_end_lsn() >= lsn { - return Ok(Some((layer.clone(), lsn))); - } - best_candidate = Some(layer.clone()); - best_end_lsn = layer.get_end_lsn(); + // FIXME: obsolete comment, the layer map contains all layers now + return Ok(Some((layer.clone(), lsn))); } - // Proceed to check if there is a (better) snapshot file on disk. - if let Some(layer) = - SnapshotLayer::load(timeline.conf, timeline.timelineid, timeline.tenantid, rel, best_end_lsn, lsn)? - { - trace!( - "found snapshot file on disk: {}-{}", - layer.get_start_lsn(), - layer.get_end_lsn() - ); - let layer_rc: Arc = Arc::new(layer); - layers.insert(Arc::clone(&layer_rc)); - - return Ok(Some((layer_rc, lsn))); - } else { - // No (better) snapshot files for this relation on this timeline. If we found - // something in cache, return that. - if let Some(result) = best_candidate { - return Ok(Some((result, lsn))); - } - - // If we got nothing on this timeline, check if there's a layer on the ancestor - // timeline - if let Some(ancestor) = &timeline.ancestor_timeline { - lsn = timeline.ancestor_lsn; - timeline = &ancestor.as_ref(); - trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); - continue; - } - return Ok(None); + // If we got nothing on this timeline, check if there's a layer on the ancestor + // timeline + if let Some(ancestor) = &timeline.ancestor_timeline { + lsn = timeline.ancestor_lsn; + timeline = &ancestor.as_ref(); + trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); + continue; } + return Ok(None); } } @@ -938,26 +887,7 @@ impl LayeredTimeline { lsn ); - // Scan the directory for latest existing file. - // FIXME: if this is truly a new rel, none should exist right? - let start_lsn; - if let Some((_start, end, dropped)) = SnapshotLayer::find_latest_snapshot_file( - self.conf, - self.timelineid, - self.tenantid, - rel, - Lsn(0), - Lsn(u64::MAX), - )? { - if dropped { - start_lsn = lsn; - } else { - start_lsn = end; - } - } else { - start_lsn = lsn; - } - layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, rel, start_lsn)?; + layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, rel, lsn)?; } let mut layers = self.layers.lock().unwrap(); @@ -967,6 +897,22 @@ impl LayeredTimeline { Ok(layer_rc) } + /// + /// + /// + fn load_layer_map(&self) -> anyhow::Result<()> { + info!("loading layer map for timeline {} into memory", self.timelineid); + let mut layers = self.layers.lock().unwrap(); + let snapfiles = SnapshotLayer::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?; + + for layer_rc in snapfiles.iter() { + info!("found layer {} {}-{} {} on timeline {}", layer_rc.get_relish_tag(), layer_rc.get_start_lsn(), layer_rc.get_end_lsn(), layer_rc.is_dropped(), self.timelineid ); + layers.insert(Arc::clone(layer_rc)); + } + + Ok(()) + } + /// /// Wait until WAL has been received up to the given LSN. /// @@ -994,4 +940,117 @@ impl LayeredTimeline { Ok(lsn) } + + + /// + /// Garbage collect snapshot files on a timeline that are no longer needed. + /// + /// The caller specifies how much history is needed with the two arguments: + /// + /// retain_lsns: keep page a version of each page at these LSNs + /// cutoff: also keep everything newer than this LSN + /// + /// The 'retain_lsns' lists is currently used to prevent removing files that + /// are needed by child timelines. In the future, the user might be able to + /// name additional points in time to retain. The caller is responsible for + /// collecting that information. + /// + /// The 'cutoff' point is used to retain recent versions that might still be + /// needed by read-only nodes. (As of this writing, the caller just passes + /// the latest LSN subtracted by a constant, and doesn't do anything smart + /// to figure out what read-only nodes might actually need.) + /// + /// Currently, we don't make any attempt at removing unneeded page versions + /// within a snapshot file. We can only remove the whole file if it's fully + /// obsolete. + /// + pub fn gc_timeline(&self, retain_lsns: Vec, cutoff: Lsn) -> Result { + let now = Instant::now(); + let mut result: GcResult = Default::default(); + + // Scan all snapshot files in the directory. For each file, if a newer file + // exists, we can remove the old one. + self.checkpoint()?; + + let mut layers = self.layers.lock().unwrap(); + + info!("running GC on timeline {}, cutoff {}", self.timelineid, cutoff); + + let mut layers_to_remove: Vec> = Vec::new(); + + // Determine for each file if it needs to be retained + 'outer: for ((rel, _lsn), l) in layers.inner.iter() { + if rel.is_relation() { + result.snapshot_relfiles_total += 1; + } else { + result.snapshot_nonrelfiles_total += 1; + } + + // Is it newer than cutoff point? + if l.get_end_lsn() > cutoff { + info!("keeping {} {}-{} because it's newer than cutoff {}", + rel, l.get_start_lsn(), l.get_end_lsn(), cutoff); + if rel.is_relation() { + result.snapshot_relfiles_needed_by_cutoff += 1; + } else { + result.snapshot_nonrelfiles_needed_by_cutoff += 1; + } + continue 'outer; + } + + // Is it needed by a child branch? + for retain_lsn in &retain_lsns { + // FIXME: are the bounds inclusive or exclusive? + if l.get_start_lsn() <= *retain_lsn && *retain_lsn <= l.get_end_lsn() { + info!("keeping {} {}-{} because it's needed by branch point {}", + rel, l.get_start_lsn(), l.get_end_lsn(), *retain_lsn); + if rel.is_relation() { + result.snapshot_relfiles_needed_by_branches += 1; + } else { + result.snapshot_nonrelfiles_needed_by_branches += 1; + } + continue 'outer; + } + } + + // Unless the relation was dropped, is there a later snapshot file for this relation? + if !l.is_dropped() && !layers.newer_layer_exists(l.get_relish_tag(), l.get_end_lsn()) { + if rel.is_relation() { + result.snapshot_relfiles_not_updated += 1; + } else { + result.snapshot_nonrelfiles_not_updated += 1; + } + continue 'outer; + } + + // We didn't find any reason to keep this file, so remove it. + info!("garbage collecting {} {}-{} {}", l.get_relish_tag(), l.get_start_lsn(), l.get_end_lsn(), l.is_dropped()); + layers_to_remove.push(Arc::clone(l)); + } + + // Actually delete the layers from disk and remove them from the map. + // (couldn't do this in the loop above, because you cannot modify a collection + // while iterating it. BTreeMap::retain() would be another option) + for doomed_layer in layers_to_remove { + doomed_layer.delete()?; + layers.remove(&*doomed_layer); + + if doomed_layer.is_dropped() { + if doomed_layer.get_relish_tag().is_relation() { + result.snapshot_relfiles_dropped += 1; + } else { + result.snapshot_nonrelfiles_dropped += 1; + } + } else { + if doomed_layer.get_relish_tag().is_relation() { + result.snapshot_relfiles_removed += 1; + } else { + result.snapshot_nonrelfiles_removed += 1; + } + } + } + + result.elapsed = now.elapsed(); + Ok(result) + } } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index dd57525b10..f2dd94afc8 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -35,22 +35,24 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, - // FIXME: the three mutex-protected fields below should probably be protected - // by a single mutex. + inner: Mutex, +} + +pub struct InMemoryLayerInner { /// If this relation was dropped, remember when that happened. Lsn(0) means /// it hasn't been dropped - drop_lsn: Mutex, + drop_lsn: Lsn, /// /// All versions of all pages in the layer are are kept here. /// Indexed by block number and LSN. /// - page_versions: Mutex>, + page_versions: BTreeMap<(u32, Lsn), PageVersion>, /// /// `relsizes` tracks the size of the relation at different points in time. /// - relsizes: Mutex>, + relsizes: BTreeMap, } impl Layer for InMemoryLayer { @@ -74,6 +76,11 @@ impl Layer for InMemoryLayer { return Lsn(u64::MAX); } + fn is_dropped(&self) -> bool { + let inner = self.inner.lock().unwrap(); + return inner.drop_lsn != Lsn(0); + } + /// Look up given page in the cache. fn get_page_at_lsn( &self, @@ -85,11 +92,12 @@ impl Layer for InMemoryLayer { let mut records: Vec = Vec::new(); let mut page_img: Option = None; let mut need_base_image_lsn: Option = Some(lsn); + { - let page_versions = self.page_versions.lock().unwrap(); + let inner = self.inner.lock().unwrap(); let minkey = (blknum, Lsn(0)); let maxkey = (blknum, lsn); - let mut iter = page_versions.range((Included(&minkey), Included(&maxkey))); + let mut iter = inner. page_versions.range((Included(&minkey), Included(&maxkey))); while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { if let Some(img) = &entry.page_image { page_img = Some(img.clone()); @@ -182,12 +190,12 @@ impl Layer for InMemoryLayer { /// Get size of the relation at given LSN fn get_rel_size(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let inner = self.inner.lock().unwrap(); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); if let Some((_entry_lsn, entry)) = iter.next_back() { let result = *entry; - drop(relsizes); + drop(inner); trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result); Ok(result) } else { @@ -198,9 +206,9 @@ impl Layer for InMemoryLayer { /// Does this relation exist at given LSN? fn get_rel_exists(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); + let inner = self.inner.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); let result = if let Some((_entry_lsn, _entry)) = iter.next_back() { true @@ -222,25 +230,21 @@ impl Layer for InMemoryLayer { self.timelineid, lsn ); - { - let mut page_versions = self.page_versions.lock().unwrap(); - let old = page_versions.insert((blknum, lsn), pv); + let mut inner = self.inner.lock().unwrap(); - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!( - "Page version of rel {:?} blk {} at {} already exists", - self.rel, blknum, lsn - ); - } + let old = inner.page_versions.insert((blknum, lsn), pv); - // release lock on 'page_versions' + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!( + "Page version of rel {:?} blk {} at {} already exists", + self.rel, blknum, lsn + ); } // Also update the relation size, if this extended the relation. if self.rel.is_blocky() { - let mut relsizes = self.relsizes.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); let oldsize; if let Some((_entry_lsn, entry)) = iter.next_back() { @@ -257,7 +261,7 @@ impl Layer for InMemoryLayer { blknum + 1, lsn ); - relsizes.insert(lsn, blknum + 1); + inner.relsizes.insert(lsn, blknum + 1); } } @@ -266,8 +270,8 @@ impl Layer for InMemoryLayer { /// Remember that the relation was truncated at given LSN fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()> { - let mut relsizes = self.relsizes.lock().unwrap(); - let old = relsizes.insert(lsn, relsize); + let mut inner = self.inner.lock().unwrap(); + let old = inner.relsizes.insert(lsn, relsize); if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -279,10 +283,10 @@ impl Layer for InMemoryLayer { /// Remember that the relation was truncated at given LSN fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()> { - let mut drop_lsn = self.drop_lsn.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); - assert!(*drop_lsn == Lsn(0)); - *drop_lsn = lsn; + assert!(inner.drop_lsn == Lsn(0)); + inner.drop_lsn = lsn; info!("dropped relation {} at {}", self.rel, lsn); @@ -296,17 +300,15 @@ impl Layer for InMemoryLayer { /// If there were page versions newer than 'end_lsn', a new in-memory /// layer is returned with those page versions. Otherwise returns None. /// - fn freeze(&self, end_lsn: Lsn) -> Result>> { + fn freeze(&self, end_lsn: Lsn, walredo_mgr: &dyn WalRedoManager) -> Result>> { info!( "freezing in memory layer for {} on timeline {} at {}", self.rel, self.timelineid, end_lsn ); - let page_versions = self.page_versions.lock().unwrap(); - let relsizes = self.relsizes.lock().unwrap(); - let drop_lsn = self.drop_lsn.lock().unwrap(); - - let dropped = *drop_lsn != Lsn(0); + let inner = self.inner.lock().unwrap(); + + let dropped = inner.drop_lsn != Lsn(0); // Divide all the page versions into old and new at the 'end_lsn' cutoff point. let mut old_page_versions = BTreeMap::new(); @@ -315,7 +317,7 @@ impl Layer for InMemoryLayer { let mut new_page_versions = BTreeMap::new(); if !dropped { - for (lsn, size) in relsizes.iter() { + for (lsn, size) in inner.relsizes.iter() { if *lsn > end_lsn { new_relsizes.insert(*lsn, *size); } else { @@ -323,7 +325,7 @@ impl Layer for InMemoryLayer { } } - for ((blknum, lsn), pv) in page_versions.iter() { + for ((blknum, lsn), pv) in inner.page_versions.iter() { if *lsn > end_lsn { new_page_versions.insert((*blknum, *lsn), pv.clone()); } else { @@ -333,14 +335,14 @@ impl Layer for InMemoryLayer { } let end_lsn = if dropped { - assert!(*drop_lsn < end_lsn); - *drop_lsn + assert!(inner.drop_lsn < end_lsn); + inner.drop_lsn } else { end_lsn }; // Write the old page versions to disk. - let _snapfile = SnapshotLayer::create( + let snapfile = SnapshotLayer::create( self.conf, self.timelineid, self.tenantid, @@ -351,24 +353,34 @@ impl Layer for InMemoryLayer { old_page_versions, old_relsizes, )?; + let mut result: Vec> = Vec::new(); // If there were any "new" page versions, initialize a new in-memory layer to hold // them if !new_relsizes.is_empty() || !new_page_versions.is_empty() { - let new_layer: Arc = Arc::new(InMemoryLayer { - conf: self.conf, - timelineid: self.timelineid, - tenantid: self.tenantid, - rel: self.rel, - start_lsn: end_lsn, - drop_lsn: Mutex::new(Lsn(0)), - page_versions: Mutex::new(new_page_versions), - relsizes: Mutex::new(new_relsizes), - }); - Ok(Some(new_layer)) - } else { - Ok(None) + info!("created new in-mem layer for {} {}-", self.rel, end_lsn); + + let new_layer = Self::copy_snapshot(self.conf, walredo_mgr, &snapfile, self.timelineid, self.tenantid, end_lsn)?; + let mut new_inner = new_layer.inner.lock().unwrap(); + new_inner.page_versions.append(&mut new_page_versions); + new_inner.relsizes.append(&mut new_relsizes); + drop(new_inner); + + result.push(Arc::new(new_layer)); } + result.push(Arc::new(snapfile)); + + Ok(result) + } + + fn delete(&self) -> Result<()> { + // Nothing to do. When the reference is dropped, the memory is released. + Ok(()) + } + + fn unload(&self) -> Result<()> { + // cannot unload in-memory layer. Freeze instead + Ok(()) } } @@ -396,9 +408,11 @@ impl InMemoryLayer { tenantid, rel, start_lsn, - drop_lsn: Mutex::new(Lsn(0)), - page_versions: Mutex::new(BTreeMap::new()), - relsizes: Mutex::new(BTreeMap::new()), + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: Lsn(0), + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), }) } @@ -446,9 +460,11 @@ impl InMemoryLayer { tenantid, rel: src.get_relish_tag(), start_lsn: lsn, - drop_lsn: Mutex::new(Lsn(0)), - page_versions: Mutex::new(page_versions), - relsizes: Mutex::new(relsizes), + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: Lsn(0), + page_versions: page_versions, + relsizes: relsizes, + }), }) } @@ -460,13 +476,12 @@ impl InMemoryLayer { self.rel, self.start_lsn ); - let relsizes = self.relsizes.lock().unwrap(); - let page_versions = self.page_versions.lock().unwrap(); + let inner = self.inner.lock().unwrap(); - for (k, v) in relsizes.iter() { + for (k, v) in inner.relsizes.iter() { result += &format!("{}: {}\n", k, v); } - for (k, v) in page_versions.iter() { + for (k, v) in inner.page_versions.iter() { result += &format!( "blk {} at {}: {}/{}\n", k.0, diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs new file mode 100644 index 0000000000..f7bffa297e --- /dev/null +++ b/pageserver/src/layered_repository/layer_map.rs @@ -0,0 +1,127 @@ +/// +/// The layer map tracks what layers exist for all the relations in a timeline. +/// +/// When the timeline is first accessed, the server lists of all snapshot files +/// in the timelines/ directory, and populates this map with +/// SnapshotLayers corresponding to each file. When new WAL is received, +/// we create InMemoryLayers to hold the incoming records. Now and then, +/// in the checkpoint() function, the in-memory layers are frozen, forming +/// new snapshot layers and corresponding files are written to disk. +/// + +use anyhow::Result; +use log::*; +use crate::relish::*; +use crate::layered_repository::storage_layer::Layer; +use std::collections::HashSet; +use std::collections::BTreeMap; +use std::ops::Bound::Included; +use zenith_utils::lsn::Lsn; +use std::sync::Arc; + +/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN. +/// It provides a couple of convenience functions over a plain BTreeMap +pub struct LayerMap { + pub inner: BTreeMap<(RelishTag, Lsn), Arc>, +} + +impl LayerMap { + /// + /// Look up using the given rel tag and LSN. This differs from a plain + /// key-value lookup in that if there is any layer that covers the + /// given LSN, or precedes the given LSN, it is returned. In other words, + /// you don't need to know the exact start LSN of the layer. + /// + pub fn get(&self, tag: RelishTag, lsn: Lsn) -> Option> { + let startkey = (tag, Lsn(0)); + let endkey = (tag, lsn); + + if let Some((_k, v)) = self + .inner + .range((Included(startkey), Included(endkey))) + .next_back() + { + Some(Arc::clone(v)) + } else { + None + } + } + + pub fn insert(&mut self, layer: Arc) { + let rel = layer.get_relish_tag(); + let start_lsn = layer.get_start_lsn(); + + self.inner.insert((rel, start_lsn), Arc::clone(&layer)); + } + + pub fn remove(&mut self, layer: &dyn Layer) { + let rel = layer.get_relish_tag(); + let start_lsn = layer.get_start_lsn(); + + self.inner.remove(&(rel, start_lsn)); + } + + pub fn list_rels(&self, + spcnode: u32, + dbnode: u32, + ) -> Result> { + let mut rels: HashSet = HashSet::new(); + + // Scan the timeline directory to get all rels in this timeline. + for ((rel, _lsn), _l) in self.inner.iter() { + if let RelishTag::Relation(reltag) = rel { + // FIXME: skip if it was dropped before the requested LSN. But there is no + // LSN argument + + if (spcnode == 0 || reltag.spcnode == spcnode) + && (dbnode == 0 || reltag.dbnode == dbnode) + { + rels.insert(*reltag); + } + } + } + Ok(rels) + } + + pub fn list_nonrels(&self, _lsn: Lsn) -> Result> { + let mut rels: HashSet = HashSet::new(); + + // Scan the timeline directory to get all rels in this timeline. + for ((rel, _lsn), _l) in self.inner.iter() { + // FIXME: skip if it was dropped before the requested LSN. + + if let RelishTag::Relation(_) = rel { + } else { + rels.insert(*rel); + } + } + Ok(rels) + } + + /// Is there a newer layer for given relation? + pub fn newer_layer_exists(&self, rel: RelishTag, lsn: Lsn) -> bool { + let startkey = (rel, lsn); + let endkey = (rel, Lsn(u64::MAX)); + + for ((_rel, newer_lsn), layer) in self + .inner + .range((Included(startkey), Included(endkey))) + { + if layer.get_end_lsn() > lsn { + info!("found later layer for rel {}, {} {}-{}", rel, lsn, newer_lsn, layer.get_end_lsn()); + return true; + } else { + info!("found singleton layer for rel {}, {} {}", rel, lsn, newer_lsn); + continue; + } + } + info!("no later layer found for rel {}, {}", rel, lsn); + false + } +} + +impl Default for LayerMap { + fn default() -> Self { + LayerMap { inner: BTreeMap::new() } + } +} diff --git a/pageserver/src/layered_repository/snapshot_layer.rs b/pageserver/src/layered_repository/snapshot_layer.rs index 575a337706..11076a53b7 100644 --- a/pageserver/src/layered_repository/snapshot_layer.rs +++ b/pageserver/src/layered_repository/snapshot_layer.rs @@ -41,22 +41,21 @@ use crate::layered_repository::storage_layer::Layer; use crate::layered_repository::storage_layer::PageVersion; use crate::layered_repository::storage_layer::ZERO_PAGE; use crate::relish::*; -use crate::repository::{GcResult, WALRecord}; +use crate::repository::WALRecord; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::{ZTimelineId, ZTenantId}; use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use log::*; -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::collections::BTreeMap; use std::fmt; use std::fs; use std::fs::File; use std::io::Write; -use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::ops::Bound::Included; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::sync::{Arc, Mutex, MutexGuard}; use bookfile::{Book, BookWriter}; @@ -205,8 +204,10 @@ impl fmt::Display for SnapshotFileName { /// /// SnapshotLayer is the in-memory data structure associated with an on-disk snapshot file. -/// It is also used to accumulate new changes at the tip of a branch; end_lsn is u64::MAX -/// in that case. +/// We hold a SnapshotLayer in memory for each file, in the LayerMap. If a layer is in +/// "loaded" state, we have a copy of the file in memory, in 'inner'. Otherwise the struct +/// is just a placeholder for a file that exists in memory, and it needs to be loaded +/// before using it in queries. /// pub struct SnapshotLayer { conf: &'static PageServerConf, @@ -222,16 +223,24 @@ pub struct SnapshotLayer { dropped: bool, + inner: Mutex +} + +pub struct SnapshotLayerInner { + // If false, the 'page_versions' and 'relsizes' have not been loaded into memory + // yet. + loaded: bool, + /// /// All versions of all pages in the file are are kept here. /// Indexed by block number and LSN. /// - page_versions: Mutex>, + page_versions: BTreeMap<(u32, Lsn), PageVersion>, /// /// `relsizes` tracks the size of the relation at different points in time. /// - relsizes: Mutex>, + relsizes: BTreeMap, } impl Layer for SnapshotLayer { @@ -247,6 +256,10 @@ impl Layer for SnapshotLayer { return self.rel; } + fn is_dropped(&self) -> bool { + return self.dropped; + } + fn get_start_lsn(&self) -> Lsn { return self.start_lsn; } @@ -267,10 +280,10 @@ impl Layer for SnapshotLayer { let mut page_img: Option = None; let mut need_base_image_lsn: Option = Some(lsn); { - let page_versions = self.page_versions.lock().unwrap(); + let inner = self.load()?; let minkey = (blknum, Lsn(0)); let maxkey = (blknum, lsn); - let mut iter = page_versions.range((Included(&minkey), Included(&maxkey))); + let mut iter = inner.page_versions.range((Included(&minkey), Included(&maxkey))); while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { if let Some(img) = &entry.page_image { page_img = Some(img.clone()); @@ -291,7 +304,7 @@ impl Layer for SnapshotLayer { } } - // release lock on 'page_versions' + // release lock on 'inner' } records.reverse(); @@ -365,12 +378,12 @@ impl Layer for SnapshotLayer { /// Get size of the relation at given LSN fn get_rel_size(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let inner = self.load()?; + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); if let Some((_entry_lsn, entry)) = iter.next_back() { let result = *entry; - drop(relsizes); + drop(inner); trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result); Ok(result) } else { @@ -382,9 +395,9 @@ impl Layer for SnapshotLayer { /// Does this relation exist at given LSN? fn get_rel_exists(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); + let inner = self.load()?; - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); let result = if let Some((_entry_lsn, _entry)) = iter.next_back() { true @@ -409,9 +422,23 @@ impl Layer for SnapshotLayer { bail!("cannot modify historical snapshot layer"); } - fn freeze(&self, _end_lsn: Lsn) -> Result>> { + fn freeze(&self, _end_lsn: Lsn, _walredo_mgr: &dyn WalRedoManager) -> Result>> { bail!("cannot freeze historical snapshot layer"); } + + fn delete(&self) -> Result<()> { + // delete underlying file + fs::remove_file(self.path())?; + Ok(()) + } + + fn unload(&self) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.page_versions = BTreeMap::new(); + inner.relsizes = BTreeMap::new(); + inner.loaded = false; + Ok(()) + } } impl SnapshotLayer { @@ -463,20 +490,16 @@ impl SnapshotLayer { start_lsn: start_lsn, end_lsn, dropped, - page_versions: Mutex::new(page_versions), - relsizes: Mutex::new(relsizes), + inner: Mutex::new(SnapshotLayerInner { + loaded: true, + page_versions: page_versions, + relsizes: relsizes, + }), }; + let inner = snapfile.inner.lock().unwrap(); - snapfile.save()?; - Ok(snapfile) - } - - /// Write the in-memory btreemaps into files - fn save(&self) -> Result<()> { - let path = self.path(); - - let page_versions = self.page_versions.lock().unwrap(); - let relsizes = self.relsizes.lock().unwrap(); + // Write the in-memory btreemaps into files + let path = snapfile.path(); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? @@ -486,13 +509,13 @@ impl SnapshotLayer { // Write out page versions let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER); - let buf = BTreeMap::ser(&page_versions)?; + let buf = BTreeMap::ser(&inner.page_versions)?; chapter.write_all(&buf)?; let book = chapter.close()?; // and relsizes to separate chapter let mut chapter = book.new_chapter(REL_SIZES_CHAPTER); - let buf = BTreeMap::ser(&relsizes)?; + let buf = BTreeMap::ser(&inner.relsizes)?; chapter.write_all(&buf)?; let book = chapter.close()?; @@ -500,83 +523,29 @@ impl SnapshotLayer { trace!("saved {}", &path.display()); - Ok(()) + drop(inner); + + Ok(snapfile) } - /// - /// Find the snapshot file with latest LSN that covers the given 'lsn', or is before it. - /// - pub fn find_latest_snapshot_file( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - rel: RelishTag, - earliest_lsn: Lsn, - lsn: Lsn, - ) -> Result> { - // Scan the timeline directory to get all rels in this timeline. - let mut result_start_lsn = Lsn(0); - let mut result_end_lsn = Lsn(0); - let mut result_dropped = false; - for fname in Self::list_snapshot_files(conf, timelineid, tenantid)? { - if fname.end_lsn <= earliest_lsn { - continue; - } + fn load(&self) -> Result> { - if fname.rel == rel && fname.start_lsn <= lsn && fname.end_lsn > result_end_lsn { - result_start_lsn = fname.start_lsn; - result_end_lsn = fname.end_lsn; - result_dropped = fname.dropped; - } - } - if result_start_lsn != Lsn(0) { - Ok(Some((result_start_lsn, result_end_lsn, result_dropped))) - } else { - Ok(None) - } - } + // quick exit if already loaded + let mut inner = self.inner.lock().unwrap(); - /// - /// Load the state for one relation back into memory. - /// - /// Returns the latest snapshot file that before the given 'lsn', but newer than 'earliest_lsn' - /// - pub fn load( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - rel: RelishTag, - earliest_lsn: Lsn, - lsn: Lsn, - ) -> Result> { - if let Some((start_lsn, end_lsn, dropped)) = - Self::find_latest_snapshot_file(conf, timelineid, tenantid, rel, earliest_lsn, lsn)? - { - let snap = Self::load_path(conf, timelineid, tenantid, rel, start_lsn, end_lsn, dropped)?; - Ok(Some(snap)) - } else { - Ok(None) + if inner.loaded { + return Ok(inner); } - } - fn load_path( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - rel: RelishTag, - start_lsn: Lsn, - end_lsn: Lsn, - dropped: bool, - ) -> Result { let path = Self::path_for( - conf, - timelineid, - tenantid, + self.conf, + self.timelineid, + self.tenantid, &SnapshotFileName { - rel, - start_lsn, - end_lsn, - dropped, + rel: self.rel, + start_lsn: self.start_lsn, + end_lsn: self.end_lsn, + dropped: self.dropped, }, ); @@ -597,200 +566,46 @@ impl SnapshotLayer { debug!("loaded from {}", &path.display()); - Ok(SnapshotLayer { - conf, - timelineid, - tenantid, - rel, - start_lsn, - end_lsn, - dropped, - page_versions: Mutex::new(page_versions), - relsizes: Mutex::new(relsizes), - }) - } + *inner = SnapshotLayerInner { + loaded: true, + page_versions, + relsizes, + }; - pub fn list_rels( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - spcnode: u32, - dbnode: u32, - ) -> Result> { - let mut rels: HashSet = HashSet::new(); - - // Scan the timeline directory to get all rels in this timeline. - for snapfiles in Self::list_snapshot_files(conf, timelineid, tenantid)? { - if let RelishTag::Relation(reltag) = snapfiles.rel { - // FIXME: skip if it was dropped before the requested LSN. But there is no - // LSN argument - - if (spcnode == 0 || reltag.spcnode == spcnode) - && (dbnode == 0 || reltag.dbnode == dbnode) - { - rels.insert(reltag); - } - } - } - Ok(rels) - } - - pub fn list_nonrels( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - _lsn: Lsn, - ) -> Result> { - let mut rels: HashSet = HashSet::new(); - - // Scan the timeline directory to get all rels in this timeline. - for snapfile in Self::list_snapshot_files(conf, timelineid, tenantid)? { - // FIXME: skip if it was dropped before the requested LSN. - - if let RelishTag::Relation(_) = snapfile.rel { - } else { - rels.insert(snapfile.rel); - } - } - Ok(rels) - } - - /// - /// Garbage collect snapshot files on a timeline that are no longer needed. - /// - /// The caller specifies how much history is needed with the two arguments: - /// - /// retain_lsns: keep page a version of each page at these LSNs - /// cutoff: also keep everything newer than this LSN - /// - /// The 'retain_lsns' lists is currently used to prevent removing files that - /// are needed by child timelines. In the future, the user might be able to - /// name additional points in time to retain. The caller is responsible for - /// collecting that information. - /// - /// The 'cutoff' point is used to retain recent versions that might still be - /// needed by read-only nodes. (As of this writing, the caller just passes - /// the latest LSN subtracted by a constant, and doesn't do anything smart - /// to figure out what read-only nodes might actually need.) - /// - /// Currently, we don't make any attempt at removing unneeded page versions - /// within a snapshot file. We can only remove the whole file if it's fully - /// obsolete. - /// - pub fn gc_timeline( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - retain_lsns: Vec, - cutoff: Lsn, - ) -> Result { - let now = Instant::now(); - let mut result: GcResult = Default::default(); - - // Scan all snapshot files in the directory. For each file, if a newer file - // exists, we can remove the old one. - - // For convenience and speed, slurp the list of files in the directory into memory first. - let mut snapfiles: BTreeSet = BTreeSet::new(); - - for fname in Self::list_snapshot_files(conf, timelineid, tenantid)? { - snapfiles.insert(fname.clone()); - - if fname.rel.is_relation() { - result.snapshot_relfiles_total += 1; - } else { - result.snapshot_nonrelfiles_total += 1; - } - } - - // Now determine for each file if it needs to be retained - 'outer: for snapfile in &snapfiles { - // Is it newer than cutoff point? - if snapfile.end_lsn > cutoff { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_needed_by_cutoff += 1; - } else { - result.snapshot_nonrelfiles_needed_by_cutoff += 1; - } - continue 'outer; - } - - // Is it needed by a child branch? - for retain_lsn in &retain_lsns { - // FIXME: are the bounds inclusive or exclusive? - if snapfile.start_lsn <= *retain_lsn && *retain_lsn <= snapfile.end_lsn { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_needed_by_branches += 1; - } else { - result.snapshot_nonrelfiles_needed_by_branches += 1; - } - continue 'outer; - } - } - - // Unless the relation was dropped, is there a later snapshot file for this relation? - if !snapfile.dropped { - let mut found_later_file = false; - if let Some(other_snapfile) = - snapfiles.range((Excluded(snapfile), Unbounded)).next() - { - if other_snapfile.rel != snapfile.rel { - // walked past the files for this rel. So there is no later file. - } else { - // found a later file. - found_later_file = true; - } - } - - if !found_later_file { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_not_updated += 1; - } else { - result.snapshot_nonrelfiles_not_updated += 1; - } - continue 'outer; - } - } - - // We didn't find any reason to keep this file, so remove it. - let path = Self::path_for(conf, timelineid, tenantid, snapfile); - info!("garbage collecting {}", path.display()); - fs::remove_file(path)?; - - if snapfile.dropped { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_dropped += 1; - } else { - result.snapshot_nonrelfiles_dropped += 1; - } - } else { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_removed += 1; - } else { - result.snapshot_nonrelfiles_removed += 1; - } - } - } - - result.elapsed = now.elapsed(); - Ok(result) + Ok(inner) } // TODO: returning an Iterator would be more idiomatic - fn list_snapshot_files( + pub fn list_snapshot_files( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, - ) -> Result> { + ) -> Result>> { let path = conf.timeline_path(&timelineid, &tenantid); - let mut snapfiles = Vec::new(); + let mut snapfiles: Vec> = Vec::new(); for direntry in fs::read_dir(path)? { let fname = direntry?.file_name(); let fname = fname.to_str().unwrap(); if let Some(snapfilename) = SnapshotFileName::from_str(fname) { - snapfiles.push(snapfilename); + + let snapfile = SnapshotLayer { + conf, + timelineid, + tenantid, + rel: snapfilename.rel, + start_lsn: snapfilename.start_lsn, + end_lsn: snapfilename.end_lsn, + dropped: snapfilename.dropped, + inner: Mutex::new(SnapshotLayerInner { + loaded: false, + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), + }; + + snapfiles.push(Arc::new(snapfile)); } } return Ok(snapfiles); @@ -804,13 +619,11 @@ impl SnapshotLayer { self.rel, self.start_lsn, self.end_lsn ); - let relsizes = self.relsizes.lock().unwrap(); - //let page_versions = self.page_versions.lock().unwrap(); - - for (k, v) in relsizes.iter() { + let inner = self.inner.lock().unwrap(); + for (k, v) in inner.relsizes.iter() { result += &format!("{}: {}\n", k, v); } - //for (k, v) in page_versions.iter() { + //for (k, v) in inner.page_versions.iter() { // result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some()); //} diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 1586bfcae1..8531563d61 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -33,6 +33,7 @@ pub trait Layer: Send + Sync { fn get_relish_tag(&self) -> RelishTag; fn get_start_lsn(&self) -> Lsn; fn get_end_lsn(&self) -> Lsn; + fn is_dropped(&self) -> bool; fn get_page_at_lsn( &self, @@ -76,5 +77,9 @@ pub trait Layer: Send + Sync { ) } - fn freeze(&self, end_lsn: Lsn) -> Result>>; + fn freeze(&self, end_lsn: Lsn, walredo_mgr: &dyn WalRedoManager) -> Result>>; + + fn delete(&self) -> Result<()>; + + fn unload(&self) -> Result<()>; } diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 4000973d6d..145706e931 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -165,6 +165,21 @@ impl Repository for ObjectRepository { Ok(()) } + + fn gc_iteration(&self, timelineid: Option, horizon: u64, compact: bool) -> Result { + if let Some(timelineid) = timelineid { + let timelines = self.timelines.lock().unwrap(); + + // FIXME: If the timeline isn't opened yet, don't open it just for GC. + if let Some(timeline) = timelines.get(&timelineid) { + return timeline.gc_iteration(horizon, compact); + } + } else { + // FIXME: the object repository doesn't support GC on all timelines. Should + // iterate all the timelines here + } + return Ok(GcResult::default()) + } } /// @@ -671,136 +686,6 @@ impl Timeline for ObjectTimeline { Ok(Box::new(ObjectHistory { lsn, iter })) } - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { - let last_lsn = self.get_last_valid_lsn(); - let mut result: GcResult = Default::default(); - - // checked_sub() returns None on overflow. - if let Some(horizon) = last_lsn.checked_sub(horizon) { - // WAL is large enough to perform GC - let now = Instant::now(); - let mut prepared_horizon = Lsn(u64::MAX); - // Iterate through all objects in timeline - for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { - result.inspected += 1; - match obj { - // Prepared transactions - ObjectTag::Buffer(RelishTag::TwoPhase { xid }, _blknum) => { - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - prepared_horizon = Lsn::min(lsn, prepared_horizon); - if self.get_tx_status(xid, horizon)? - != pg_constants::TRANSACTION_STATUS_IN_PROGRESS - { - self.obj_store.unlink(&key, lsn)?; - result.prep_deleted += 1; - } - } - } - ObjectTag::RelationMetadata(_) => { - // Do not need to reconstruct page images, - // just delete all old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - let content = vers.1; - match ObjectValue::des(&content[..])? { - ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - result.dropped += 1; - } - _ => (), // preserve last version - } - last_version = false; - result.truncated += 1; - result.n_relations += 1; - } else { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } - } - ObjectTag::Buffer(rel, blknum) => { - if rel.is_blocky() { - // Reconstruct page at horizon unless relation was dropped - // and delete all older versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - result.truncated += 1; - last_version = false; - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - if rel_size > blknum { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(rel, blknum, lsn)?; - continue; - } - debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); - } else { - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); - continue; - } - debug!("Relation {} was dropped at {}", rel, lsn); - } - // relation was dropped or truncated so this block can be removed - } - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } else { - // versioned always materialized objects: no need to reconstruct pages - - // Remove old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - // preserve last version - last_version = false; - } else { - // delete deteriorated version - self.obj_store.unlink(&key, lsn)?; - result.chkp_deleted += 1; - } - } - } - } - _ => (), // do nothing - } - } - result.elapsed = now.elapsed(); - info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", - result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); - if compact { - self.obj_store.compact(); - } - } - Ok(result) - } } impl ObjectTimeline { @@ -993,6 +878,137 @@ impl ObjectTimeline { self.obj_store.put(&key, Lsn(0), &ObjectValue::ser(&val)?) } + + fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { + let last_lsn = self.get_last_valid_lsn(); + let mut result: GcResult = Default::default(); + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(horizon) { + // WAL is large enough to perform GC + let now = Instant::now(); + let mut prepared_horizon = Lsn(u64::MAX); + // Iterate through all objects in timeline + for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { + result.inspected += 1; + match obj { + // Prepared transactions + ObjectTag::Buffer(RelishTag::TwoPhase { xid }, _blknum) => { + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + prepared_horizon = Lsn::min(lsn, prepared_horizon); + if self.get_tx_status(xid, horizon)? + != pg_constants::TRANSACTION_STATUS_IN_PROGRESS + { + self.obj_store.unlink(&key, lsn)?; + result.prep_deleted += 1; + } + } + } + ObjectTag::RelationMetadata(_) => { + // Do not need to reconstruct page images, + // just delete all old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + result.dropped += 1; + } + _ => (), // preserve last version + } + last_version = false; + result.truncated += 1; + result.n_relations += 1; + } else { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } + } + } + ObjectTag::Buffer(rel, blknum) => { + if rel.is_blocky() { + // Reconstruct page at horizon unless relation was dropped + // and delete all older versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + result.truncated += 1; + last_version = false; + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + if rel_size > blknum { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(rel, blknum, lsn)?; + continue; + } + debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); + } else { + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); + continue; + } + debug!("Relation {} was dropped at {}", rel, lsn); + } + // relation was dropped or truncated so this block can be removed + } + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } + } else { + // versioned always materialized objects: no need to reconstruct pages + + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + // preserve last version + last_version = false; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + result.chkp_deleted += 1; + } + } + } + } + _ => (), // do nothing + } + } + result.elapsed = now.elapsed(); + info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", + result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); + if compact { + self.obj_store.compact(); + } + } + Ok(result) + } } struct ObjectHistory<'a> { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 90c3e5514b..35dc993c96 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -537,10 +537,9 @@ impl postgres_backend::Handler for PageServerHandler { .map(|h| h.as_str().parse()) .unwrap_or(Ok(self.conf.gc_horizon))?; - let timeline = - page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; + let repo = page_cache::get_repository_for_tenant(&tenantid)?; - let result = timeline.gc_iteration(gc_horizon, true)?; + let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"n_relations"), @@ -613,6 +612,7 @@ impl postgres_backend::Handler for PageServerHandler { ]))? .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { + error!("received unknown command from client"); bail!("unknown command"); } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index adfcc981f0..41e091077b 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -31,6 +31,20 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; + /// perform one garbage collection iteration. + /// garbage collection is periodically performed by gc thread, + /// but it can be explicitly requested through page server api. + /// + /// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval). + /// `compact` parameter is used to force compaction of storage. + /// some storage implementation are based on lsm tree and require periodic merge (compaction). + /// usually storage implementation determines itself when compaction should be performed. + /// but for gc tests it way be useful to force compaction just after completion of gc iteration + /// to make sure that all detected garbage is removed. + /// so right now `compact` is set to true when gc explicitly requested through page srver api, + /// and is st to false in gc threads which infinitely repeats gc iterations in loop. + fn gc_iteration(&self, timelineid: Option, horizon: u64, compact: bool) -> Result; + // TODO get timelines? //fn get_stats(&self) -> RepositoryStats; } @@ -187,20 +201,6 @@ pub trait Timeline: Send + Sync { // TODO ordering guarantee? fn history<'a>(&'a self) -> Result>; - /// Perform one garbage collection iteration. - /// Garbage collection is periodically performed by GC thread, - /// but it can be explicitly requested through page server API. - /// - /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). - /// `compact` parameter is used to force compaction of storage. - /// Some storage implementation are based on LSM tree and require periodic merge (compaction). - /// Usually storage implementation determines itself when compaction should be performed. - /// But for GC tests it way be useful to force compaction just after completion of GC iteration - /// to make sure that all detected garbage is removed. - /// So right now `compact` is set to true when GC explicitly requested through page srver API, - /// and is st to false in GC threads which infinitely repeats GC iterations in loop. - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result; - // Check transaction status fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result { let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; diff --git a/test_runner/batch_others/test_snapfiles_gc.py b/test_runner/batch_others/test_snapfiles_gc.py index 1cd05d1aa8..761dc95b31 100644 --- a/test_runner/batch_others/test_snapfiles_gc.py +++ b/test_runner/batch_others/test_snapfiles_gc.py @@ -32,6 +32,11 @@ def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin): # Create a test table cur.execute("CREATE TABLE foo(x integer)") + print("Inserting two more rows and running GC") + cur.execute("select relfilenode from pg_class where oid = 'foo'::regclass"); + row = cur.fetchone(); + print("relfilenode is {}", row[0]); + # Run GC, to clear out any garbage left behind in the catalogs by # the CREATE TABLE command. We want to have a clean slate with no garbage # before running the actual tests below, otherwise the counts won't match diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 8c54b31e1e..142cc22f93 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -231,10 +231,11 @@ impl PostgresBackend { } Some(FeMessage::Query(m)) => { - trace!("got query {:?}", m.body); + info!("got query {:?}", m.body); // xxx distinguish fatal and recoverable errors? if let Err(e) = handler.process_query(self, m.body) { let errmsg = format!("{}", e); + error!("process_query errored: {}", errmsg); self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; } self.write_message(&BeMessage::ReadyForQuery)?;