diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 342032c534..5a19da2e07 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -27,7 +27,7 @@ use bytes::Bytes; use log::*; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::collections::{BTreeSet, HashSet}; use std::fs; use std::fs::File; @@ -50,10 +50,12 @@ use zenith_utils::seqwait::SeqWait; mod inmemory_layer; mod snapshot_layer; mod storage_layer; +mod layer_map; use inmemory_layer::InMemoryLayer; use snapshot_layer::SnapshotLayer; use storage_layer::Layer; +use layer_map::LayerMap; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -131,6 +133,101 @@ impl Repository for LayeredRepository { Ok(()) } + + // + // How garbage collection works + // -------- + // + // +--bar-------------> + // / + // +----+-----foo----------------> + // / + // ----main--+--------------------------> + // \ + // +-----baz--------> + // + // + // 1. Grab a mutex to prevent new timelines from being created + // 2. Scan all timelines, and on each timeline, make note of the + // all the points where other timelines have been branched off. + // We will refrain from removing page versions at those LSNs. + // 3. For each timeline, scan all snapshot files on the timeline. + // Remove all files for which a newer file exists and which + // don't cover any branch point LSNs. + // + // TODO: + // - if a relation has been modified on a child branch, then we + // we don't need to keep that in the parent anymore. + // + // - Currently, this is only triggered manually by the 'do_gc' command. + // There is no background thread to do it automatically. + fn gc_iteration(&self, target_timelineid: Option, horizon: u64, _compact: bool) -> Result { + let mut totals: GcResult = Default::default(); + let now = Instant::now(); + + // grab mutex to prevent new timelines from being created here. + // TODO: We will hold it for a long time + let mut timelines = self.timelines.lock().unwrap(); + + // Scan all timelines for the branch points. + let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); + + // Remember timelineid and its last_record_lsn for each timeline + let mut timelineids: Vec = Vec::new(); + + let timelines_path = self.conf.timelines_path(&self.tenantid); + for direntry in fs::read_dir(timelines_path)? { + let direntry = direntry?; + if let Some(fname) = direntry.file_name().to_str() { + if let Ok(timelineid) = fname.parse::() { + // Read the metadata of this timeline to get its parent timeline. + // FIXME: we open the timeline below with get_timeline() anyway. + // shouldn't we fetch the metadata from the in-memory Timeline + // struct? + let metadata = Self::load_metadata(self.conf, timelineid, self.tenantid)?; + + timelineids.push(timelineid); + + if let Some(ancestor_timeline) = metadata.ancestor_timeline { + all_branchpoints.insert((ancestor_timeline, metadata.ancestor_lsn)); + } + } + } + } + + // Ok, we now know all the branch points. Iterate through them. + for timelineid in timelineids { + // If a target timeline was specified, leave the other timelines alone. + // This is a bit inefficient, we still collect the information for all + // the timelines above. + if let Some(x) = target_timelineid { + if x != timelineid { + continue; + } + } + + let branchpoints: Vec = all_branchpoints + .range(( + Included((timelineid, Lsn(0))), + Included((timelineid, Lsn(u64::MAX))), + )) + .map(|&x| x.1) + .collect(); + + let timeline = self.get_timeline_locked(timelineid, &mut *timelines)?; + let last_lsn = timeline.get_last_valid_lsn(); + + if let Some(cutoff) = last_lsn.checked_sub(horizon) { + info!("running GC on timeline {}, last_lsn {}, cutoff {}", timelineid, last_lsn, cutoff); + let result = timeline.gc_timeline(branchpoints, cutoff)?; + + totals += result; + } + } + + totals.elapsed = now.elapsed(); + Ok(totals) + } } /// Private functions @@ -162,6 +259,9 @@ impl LayeredRepository { self.walredo_mgr.clone(), )?; + // List the snapshot layers on disk, and load them into the layer map + timeline.load_layer_map()?; + // Load any new WAL after the last checkpoint into memory. info!( "Loading WAL for timeline {} starting at {}", @@ -218,123 +318,6 @@ impl LayeredRepository { Ok(TimelineMetadata::des(&data)?) } - - // - // How garbage collection works - // -------- - // - // +--bar-------------> - // / - // +----+-----foo----------------> - // / - // ----main--+--------------------------> - // \ - // +-----baz--------> - // - // - // 1. Grab a mutex to prevent new timelines from being created - // 2. Scan all timelines, and on each timeline, make note of the - // all the points where other timelines have been branched off. - // We will refrain from removing page versions at those LSNs. - // 3. For each timeline, scan all snapshot files on the timeline. - // Remove all files for which a newer file exists and which - // don't cover any branch point LSNs. - // - // TODO: - // - if a relation has been modified on a child branch, then we - // we don't need to keep that in the parent anymore. - // - // - Currently, this is only triggered manually by the 'do_gc' command. - // There is no background thread to do it automatically. - fn gc_iteration(conf: &'static PageServerConf, tenantid: ZTenantId, horizon: u64) -> Result { - let mut totals: GcResult = Default::default(); - let now = Instant::now(); - - // TODO: grab mutex to prevent new timelines from being created here. - - // Scan all timelines for the branch points. - let mut all_branchpoints: BTreeSet<(ZTimelineId, Lsn)> = BTreeSet::new(); - - // Remember timelineid and its last_record_lsn for each timeline - let mut timelines: Vec<(ZTimelineId, Lsn)> = Vec::new(); - - let timelines_path = conf.timelines_path(&tenantid); - for direntry in fs::read_dir(timelines_path)? { - let direntry = direntry?; - if let Some(fname) = direntry.file_name().to_str() { - if let Ok(timelineid) = fname.parse::() { - // Read the metadata of this timeline to get its parent timeline. - let metadata = Self::load_metadata(conf, timelineid, tenantid)?; - - timelines.push((timelineid, metadata.last_record_lsn)); - - if let Some(ancestor_timeline) = metadata.ancestor_timeline { - all_branchpoints.insert((ancestor_timeline, metadata.ancestor_lsn)); - } - } - } - } - - // Ok, we now know all the branch points. Iterate through them. - for (timelineid, last_lsn) in timelines { - let branchpoints: Vec = all_branchpoints - .range(( - Included((timelineid, Lsn(0))), - Included((timelineid, Lsn(u64::MAX))), - )) - .map(|&x| x.1) - .collect(); - - if let Some(cutoff) = last_lsn.checked_sub(horizon) { - let result = SnapshotLayer::gc_timeline(conf, timelineid, tenantid, branchpoints, cutoff)?; - - totals += result; - } - } - - totals.elapsed = now.elapsed(); - Ok(totals) - } -} - -/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN. -/// It provides a couple of convenience functions over a plain BTreeMap -struct LayerMap(BTreeMap<(RelishTag, Lsn), Arc>); - -impl LayerMap { - /// - /// Look up using the given rel tag and LSN. This differs from a plain - /// key-value lookup in that if there is any layer that covers the - /// given LSN, or precedes the given LSN, it is returned. In other words, - /// you don't need to know the exact start LSN of the layer. - /// - fn get(&self, tag: RelishTag, lsn: Lsn) -> Option> { - let startkey = (tag, Lsn(0)); - let endkey = (tag, lsn); - - if let Some((_k, v)) = self - .0 - .range((Included(startkey), Included(endkey))) - .next_back() - { - Some(Arc::clone(v)) - } else { - None - } - } - - fn insert(&mut self, layer: Arc) { - let rel = layer.get_relish_tag(); - let start_lsn = layer.get_start_lsn(); - - self.0.insert((rel, start_lsn), Arc::clone(&layer)); - } -} - -impl Default for LayerMap { - fn default() -> Self { - LayerMap(BTreeMap::new()) - } } /// Metadata stored on disk for each timeline @@ -462,15 +445,12 @@ impl Timeline for LayeredTimeline { } fn list_rels(&self, spcnode: u32, dbnode: u32, _lsn: Lsn) -> Result> { - // SnapshotLayer::list_rels works by scanning the directory on disk. Make sure - // we have a file on disk for each relation. - self.checkpoint()?; // List all rels in this timeline, and all its ancestors. let mut all_rels = HashSet::new(); let mut timeline = self; loop { - let rels = SnapshotLayer::list_rels(self.conf, timeline.timelineid, timeline.tenantid, spcnode, dbnode)?; + let rels = timeline.layers.lock().unwrap().list_rels(spcnode, dbnode)?; // FIXME: We should filter out relations that don't exist at the given LSN. all_rels.extend(rels.iter()); @@ -493,11 +473,7 @@ impl Timeline for LayeredTimeline { let mut all_rels = HashSet::new(); let mut timeline = self; loop { - // SnapshotFile::list_rels works by scanning the directory on disk. Make sure - // we have a file on disk for each relation. - timeline.checkpoint()?; - - let rels = SnapshotLayer::list_nonrels(self.conf, timeline.timelineid, timeline.tenantid, lsn)?; + let rels = timeline.layers.lock().unwrap().list_nonrels(lsn)?; // FIXME: We should filter out relishes that don't exist at the given LSN. all_rels.extend(rels.iter()); @@ -518,23 +494,6 @@ impl Timeline for LayeredTimeline { todo!(); } - fn gc_iteration(&self, horizon: u64, _compact: bool) -> Result { - // In the layered repository, event to GC a single timeline, - // we have to scan all the timelines to determine what child - // timelines there are, so that we know to retain snapshot - // files that are still needed by the children. So we just do - // GC on the whole repository. - // - // FIXME: This makes writing repeatable tests harder, if - // activity on other timelines can affect the counters that - // we return - - // But do flush the in-memory layers to disk first. - self.checkpoint()?; - - LayeredRepository::gc_iteration(self.conf, self.tenantid, horizon) - } - fn put_wal_record(&self, rel: RelishTag, blknum: u32, rec: WALRecord) -> Result<()> { if !rel.is_blocky() && blknum != 0 { bail!( @@ -633,12 +592,28 @@ impl Timeline for LayeredTimeline { // probably too aggressive. Some kind of LRU policy would be // appropriate. // + + // Call freeze() for any unfrozen layers (that is, layers that + // haven't been written to disk yet) + // Call unload() for all layers, to release memory. + // + // FIXME: We do this by creating a whole new LayerMap. I couldn't + // figure out the borrowing rules to remove and insert entries + // to the old LayerMap while iterating through it. let old_layers = std::mem::take(&mut *layers); - for old_layer in old_layers.0.values() { - if !old_layer.is_frozen() { - if let Some(new_layer) = old_layer.freeze(last_valid_lsn)? { + for layer in old_layers.inner.values() { + if !layer.is_frozen() { + let new_layers = layer.freeze(last_valid_lsn, &*self.walredo_mgr)?; + + for new_layer in new_layers { + info!("freeze returned {} {}-{}", new_layer.get_relish_tag(), new_layer.get_start_lsn(), new_layer.get_end_lsn()); layers.insert(Arc::clone(&new_layer)); } + } else { + // FIXME: if this fails, we're in trouble because we already + // swapped the 'layers' with the empty one. Hence panic on error. + layer.unload().expect("could not unload layer from memory"); + layers.insert(Arc::clone(layer)); } } @@ -786,15 +761,13 @@ impl LayeredTimeline { } loop { - let mut layers = timeline.layers.lock().unwrap(); + let layers = timeline.layers.lock().unwrap(); // // FIXME: If the relation has been dropped, does this return the right // thing? The compute node should not normally request dropped relations, // but if OID wraparound happens the same relfilenode might get reused // for an unrelated relation. // - let mut best_candidate = None; - let mut best_end_lsn = Lsn(0); // First, see if we have loaded a layer in the cache ready. if let Some(layer) = layers.get(rel, lsn) { @@ -826,43 +799,19 @@ impl LayeredTimeline { // So if we find a layer in cache with end-LSN before the request LSN, remember // that, but fall through to check if there is a newer snapshot file on disk before // returning it. - if layer.get_end_lsn() >= lsn { - return Ok(Some((layer.clone(), lsn))); - } - best_candidate = Some(layer.clone()); - best_end_lsn = layer.get_end_lsn(); + // FIXME: obsolete comment, the layer map contains all layers now + return Ok(Some((layer.clone(), lsn))); } - // Proceed to check if there is a (better) snapshot file on disk. - if let Some(layer) = - SnapshotLayer::load(timeline.conf, timeline.timelineid, timeline.tenantid, rel, best_end_lsn, lsn)? - { - trace!( - "found snapshot file on disk: {}-{}", - layer.get_start_lsn(), - layer.get_end_lsn() - ); - let layer_rc: Arc = Arc::new(layer); - layers.insert(Arc::clone(&layer_rc)); - - return Ok(Some((layer_rc, lsn))); - } else { - // No (better) snapshot files for this relation on this timeline. If we found - // something in cache, return that. - if let Some(result) = best_candidate { - return Ok(Some((result, lsn))); - } - - // If we got nothing on this timeline, check if there's a layer on the ancestor - // timeline - if let Some(ancestor) = &timeline.ancestor_timeline { - lsn = timeline.ancestor_lsn; - timeline = &ancestor.as_ref(); - trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); - continue; - } - return Ok(None); + // If we got nothing on this timeline, check if there's a layer on the ancestor + // timeline + if let Some(ancestor) = &timeline.ancestor_timeline { + lsn = timeline.ancestor_lsn; + timeline = &ancestor.as_ref(); + trace!("recursing into ancestor at {}/{}", timeline.timelineid, lsn); + continue; } + return Ok(None); } } @@ -938,26 +887,7 @@ impl LayeredTimeline { lsn ); - // Scan the directory for latest existing file. - // FIXME: if this is truly a new rel, none should exist right? - let start_lsn; - if let Some((_start, end, dropped)) = SnapshotLayer::find_latest_snapshot_file( - self.conf, - self.timelineid, - self.tenantid, - rel, - Lsn(0), - Lsn(u64::MAX), - )? { - if dropped { - start_lsn = lsn; - } else { - start_lsn = end; - } - } else { - start_lsn = lsn; - } - layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, rel, start_lsn)?; + layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, rel, lsn)?; } let mut layers = self.layers.lock().unwrap(); @@ -967,6 +897,22 @@ impl LayeredTimeline { Ok(layer_rc) } + /// + /// + /// + fn load_layer_map(&self) -> anyhow::Result<()> { + info!("loading layer map for timeline {} into memory", self.timelineid); + let mut layers = self.layers.lock().unwrap(); + let snapfiles = SnapshotLayer::list_snapshot_files(self.conf, self.timelineid, self.tenantid)?; + + for layer_rc in snapfiles.iter() { + info!("found layer {} {}-{} {} on timeline {}", layer_rc.get_relish_tag(), layer_rc.get_start_lsn(), layer_rc.get_end_lsn(), layer_rc.is_dropped(), self.timelineid ); + layers.insert(Arc::clone(layer_rc)); + } + + Ok(()) + } + /// /// Wait until WAL has been received up to the given LSN. /// @@ -994,4 +940,117 @@ impl LayeredTimeline { Ok(lsn) } + + + /// + /// Garbage collect snapshot files on a timeline that are no longer needed. + /// + /// The caller specifies how much history is needed with the two arguments: + /// + /// retain_lsns: keep page a version of each page at these LSNs + /// cutoff: also keep everything newer than this LSN + /// + /// The 'retain_lsns' lists is currently used to prevent removing files that + /// are needed by child timelines. In the future, the user might be able to + /// name additional points in time to retain. The caller is responsible for + /// collecting that information. + /// + /// The 'cutoff' point is used to retain recent versions that might still be + /// needed by read-only nodes. (As of this writing, the caller just passes + /// the latest LSN subtracted by a constant, and doesn't do anything smart + /// to figure out what read-only nodes might actually need.) + /// + /// Currently, we don't make any attempt at removing unneeded page versions + /// within a snapshot file. We can only remove the whole file if it's fully + /// obsolete. + /// + pub fn gc_timeline(&self, retain_lsns: Vec, cutoff: Lsn) -> Result { + let now = Instant::now(); + let mut result: GcResult = Default::default(); + + // Scan all snapshot files in the directory. For each file, if a newer file + // exists, we can remove the old one. + self.checkpoint()?; + + let mut layers = self.layers.lock().unwrap(); + + info!("running GC on timeline {}, cutoff {}", self.timelineid, cutoff); + + let mut layers_to_remove: Vec> = Vec::new(); + + // Determine for each file if it needs to be retained + 'outer: for ((rel, _lsn), l) in layers.inner.iter() { + if rel.is_relation() { + result.snapshot_relfiles_total += 1; + } else { + result.snapshot_nonrelfiles_total += 1; + } + + // Is it newer than cutoff point? + if l.get_end_lsn() > cutoff { + info!("keeping {} {}-{} because it's newer than cutoff {}", + rel, l.get_start_lsn(), l.get_end_lsn(), cutoff); + if rel.is_relation() { + result.snapshot_relfiles_needed_by_cutoff += 1; + } else { + result.snapshot_nonrelfiles_needed_by_cutoff += 1; + } + continue 'outer; + } + + // Is it needed by a child branch? + for retain_lsn in &retain_lsns { + // FIXME: are the bounds inclusive or exclusive? + if l.get_start_lsn() <= *retain_lsn && *retain_lsn <= l.get_end_lsn() { + info!("keeping {} {}-{} because it's needed by branch point {}", + rel, l.get_start_lsn(), l.get_end_lsn(), *retain_lsn); + if rel.is_relation() { + result.snapshot_relfiles_needed_by_branches += 1; + } else { + result.snapshot_nonrelfiles_needed_by_branches += 1; + } + continue 'outer; + } + } + + // Unless the relation was dropped, is there a later snapshot file for this relation? + if !l.is_dropped() && !layers.newer_layer_exists(l.get_relish_tag(), l.get_end_lsn()) { + if rel.is_relation() { + result.snapshot_relfiles_not_updated += 1; + } else { + result.snapshot_nonrelfiles_not_updated += 1; + } + continue 'outer; + } + + // We didn't find any reason to keep this file, so remove it. + info!("garbage collecting {} {}-{} {}", l.get_relish_tag(), l.get_start_lsn(), l.get_end_lsn(), l.is_dropped()); + layers_to_remove.push(Arc::clone(l)); + } + + // Actually delete the layers from disk and remove them from the map. + // (couldn't do this in the loop above, because you cannot modify a collection + // while iterating it. BTreeMap::retain() would be another option) + for doomed_layer in layers_to_remove { + doomed_layer.delete()?; + layers.remove(&*doomed_layer); + + if doomed_layer.is_dropped() { + if doomed_layer.get_relish_tag().is_relation() { + result.snapshot_relfiles_dropped += 1; + } else { + result.snapshot_nonrelfiles_dropped += 1; + } + } else { + if doomed_layer.get_relish_tag().is_relation() { + result.snapshot_relfiles_removed += 1; + } else { + result.snapshot_nonrelfiles_removed += 1; + } + } + } + + result.elapsed = now.elapsed(); + Ok(result) + } } diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index dd57525b10..f2dd94afc8 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -35,22 +35,24 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, - // FIXME: the three mutex-protected fields below should probably be protected - // by a single mutex. + inner: Mutex, +} + +pub struct InMemoryLayerInner { /// If this relation was dropped, remember when that happened. Lsn(0) means /// it hasn't been dropped - drop_lsn: Mutex, + drop_lsn: Lsn, /// /// All versions of all pages in the layer are are kept here. /// Indexed by block number and LSN. /// - page_versions: Mutex>, + page_versions: BTreeMap<(u32, Lsn), PageVersion>, /// /// `relsizes` tracks the size of the relation at different points in time. /// - relsizes: Mutex>, + relsizes: BTreeMap, } impl Layer for InMemoryLayer { @@ -74,6 +76,11 @@ impl Layer for InMemoryLayer { return Lsn(u64::MAX); } + fn is_dropped(&self) -> bool { + let inner = self.inner.lock().unwrap(); + return inner.drop_lsn != Lsn(0); + } + /// Look up given page in the cache. fn get_page_at_lsn( &self, @@ -85,11 +92,12 @@ impl Layer for InMemoryLayer { let mut records: Vec = Vec::new(); let mut page_img: Option = None; let mut need_base_image_lsn: Option = Some(lsn); + { - let page_versions = self.page_versions.lock().unwrap(); + let inner = self.inner.lock().unwrap(); let minkey = (blknum, Lsn(0)); let maxkey = (blknum, lsn); - let mut iter = page_versions.range((Included(&minkey), Included(&maxkey))); + let mut iter = inner. page_versions.range((Included(&minkey), Included(&maxkey))); while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { if let Some(img) = &entry.page_image { page_img = Some(img.clone()); @@ -182,12 +190,12 @@ impl Layer for InMemoryLayer { /// Get size of the relation at given LSN fn get_rel_size(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let inner = self.inner.lock().unwrap(); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); if let Some((_entry_lsn, entry)) = iter.next_back() { let result = *entry; - drop(relsizes); + drop(inner); trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result); Ok(result) } else { @@ -198,9 +206,9 @@ impl Layer for InMemoryLayer { /// Does this relation exist at given LSN? fn get_rel_exists(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); + let inner = self.inner.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); let result = if let Some((_entry_lsn, _entry)) = iter.next_back() { true @@ -222,25 +230,21 @@ impl Layer for InMemoryLayer { self.timelineid, lsn ); - { - let mut page_versions = self.page_versions.lock().unwrap(); - let old = page_versions.insert((blknum, lsn), pv); + let mut inner = self.inner.lock().unwrap(); - if old.is_some() { - // We already had an entry for this LSN. That's odd.. - warn!( - "Page version of rel {:?} blk {} at {} already exists", - self.rel, blknum, lsn - ); - } + let old = inner.page_versions.insert((blknum, lsn), pv); - // release lock on 'page_versions' + if old.is_some() { + // We already had an entry for this LSN. That's odd.. + warn!( + "Page version of rel {:?} blk {} at {} already exists", + self.rel, blknum, lsn + ); } // Also update the relation size, if this extended the relation. if self.rel.is_blocky() { - let mut relsizes = self.relsizes.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); let oldsize; if let Some((_entry_lsn, entry)) = iter.next_back() { @@ -257,7 +261,7 @@ impl Layer for InMemoryLayer { blknum + 1, lsn ); - relsizes.insert(lsn, blknum + 1); + inner.relsizes.insert(lsn, blknum + 1); } } @@ -266,8 +270,8 @@ impl Layer for InMemoryLayer { /// Remember that the relation was truncated at given LSN fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()> { - let mut relsizes = self.relsizes.lock().unwrap(); - let old = relsizes.insert(lsn, relsize); + let mut inner = self.inner.lock().unwrap(); + let old = inner.relsizes.insert(lsn, relsize); if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -279,10 +283,10 @@ impl Layer for InMemoryLayer { /// Remember that the relation was truncated at given LSN fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()> { - let mut drop_lsn = self.drop_lsn.lock().unwrap(); + let mut inner = self.inner.lock().unwrap(); - assert!(*drop_lsn == Lsn(0)); - *drop_lsn = lsn; + assert!(inner.drop_lsn == Lsn(0)); + inner.drop_lsn = lsn; info!("dropped relation {} at {}", self.rel, lsn); @@ -296,17 +300,15 @@ impl Layer for InMemoryLayer { /// If there were page versions newer than 'end_lsn', a new in-memory /// layer is returned with those page versions. Otherwise returns None. /// - fn freeze(&self, end_lsn: Lsn) -> Result>> { + fn freeze(&self, end_lsn: Lsn, walredo_mgr: &dyn WalRedoManager) -> Result>> { info!( "freezing in memory layer for {} on timeline {} at {}", self.rel, self.timelineid, end_lsn ); - let page_versions = self.page_versions.lock().unwrap(); - let relsizes = self.relsizes.lock().unwrap(); - let drop_lsn = self.drop_lsn.lock().unwrap(); - - let dropped = *drop_lsn != Lsn(0); + let inner = self.inner.lock().unwrap(); + + let dropped = inner.drop_lsn != Lsn(0); // Divide all the page versions into old and new at the 'end_lsn' cutoff point. let mut old_page_versions = BTreeMap::new(); @@ -315,7 +317,7 @@ impl Layer for InMemoryLayer { let mut new_page_versions = BTreeMap::new(); if !dropped { - for (lsn, size) in relsizes.iter() { + for (lsn, size) in inner.relsizes.iter() { if *lsn > end_lsn { new_relsizes.insert(*lsn, *size); } else { @@ -323,7 +325,7 @@ impl Layer for InMemoryLayer { } } - for ((blknum, lsn), pv) in page_versions.iter() { + for ((blknum, lsn), pv) in inner.page_versions.iter() { if *lsn > end_lsn { new_page_versions.insert((*blknum, *lsn), pv.clone()); } else { @@ -333,14 +335,14 @@ impl Layer for InMemoryLayer { } let end_lsn = if dropped { - assert!(*drop_lsn < end_lsn); - *drop_lsn + assert!(inner.drop_lsn < end_lsn); + inner.drop_lsn } else { end_lsn }; // Write the old page versions to disk. - let _snapfile = SnapshotLayer::create( + let snapfile = SnapshotLayer::create( self.conf, self.timelineid, self.tenantid, @@ -351,24 +353,34 @@ impl Layer for InMemoryLayer { old_page_versions, old_relsizes, )?; + let mut result: Vec> = Vec::new(); // If there were any "new" page versions, initialize a new in-memory layer to hold // them if !new_relsizes.is_empty() || !new_page_versions.is_empty() { - let new_layer: Arc = Arc::new(InMemoryLayer { - conf: self.conf, - timelineid: self.timelineid, - tenantid: self.tenantid, - rel: self.rel, - start_lsn: end_lsn, - drop_lsn: Mutex::new(Lsn(0)), - page_versions: Mutex::new(new_page_versions), - relsizes: Mutex::new(new_relsizes), - }); - Ok(Some(new_layer)) - } else { - Ok(None) + info!("created new in-mem layer for {} {}-", self.rel, end_lsn); + + let new_layer = Self::copy_snapshot(self.conf, walredo_mgr, &snapfile, self.timelineid, self.tenantid, end_lsn)?; + let mut new_inner = new_layer.inner.lock().unwrap(); + new_inner.page_versions.append(&mut new_page_versions); + new_inner.relsizes.append(&mut new_relsizes); + drop(new_inner); + + result.push(Arc::new(new_layer)); } + result.push(Arc::new(snapfile)); + + Ok(result) + } + + fn delete(&self) -> Result<()> { + // Nothing to do. When the reference is dropped, the memory is released. + Ok(()) + } + + fn unload(&self) -> Result<()> { + // cannot unload in-memory layer. Freeze instead + Ok(()) } } @@ -396,9 +408,11 @@ impl InMemoryLayer { tenantid, rel, start_lsn, - drop_lsn: Mutex::new(Lsn(0)), - page_versions: Mutex::new(BTreeMap::new()), - relsizes: Mutex::new(BTreeMap::new()), + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: Lsn(0), + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), }) } @@ -446,9 +460,11 @@ impl InMemoryLayer { tenantid, rel: src.get_relish_tag(), start_lsn: lsn, - drop_lsn: Mutex::new(Lsn(0)), - page_versions: Mutex::new(page_versions), - relsizes: Mutex::new(relsizes), + inner: Mutex::new(InMemoryLayerInner { + drop_lsn: Lsn(0), + page_versions: page_versions, + relsizes: relsizes, + }), }) } @@ -460,13 +476,12 @@ impl InMemoryLayer { self.rel, self.start_lsn ); - let relsizes = self.relsizes.lock().unwrap(); - let page_versions = self.page_versions.lock().unwrap(); + let inner = self.inner.lock().unwrap(); - for (k, v) in relsizes.iter() { + for (k, v) in inner.relsizes.iter() { result += &format!("{}: {}\n", k, v); } - for (k, v) in page_versions.iter() { + for (k, v) in inner.page_versions.iter() { result += &format!( "blk {} at {}: {}/{}\n", k.0, diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs new file mode 100644 index 0000000000..f7bffa297e --- /dev/null +++ b/pageserver/src/layered_repository/layer_map.rs @@ -0,0 +1,127 @@ +/// +/// The layer map tracks what layers exist for all the relations in a timeline. +/// +/// When the timeline is first accessed, the server lists of all snapshot files +/// in the timelines/ directory, and populates this map with +/// SnapshotLayers corresponding to each file. When new WAL is received, +/// we create InMemoryLayers to hold the incoming records. Now and then, +/// in the checkpoint() function, the in-memory layers are frozen, forming +/// new snapshot layers and corresponding files are written to disk. +/// + +use anyhow::Result; +use log::*; +use crate::relish::*; +use crate::layered_repository::storage_layer::Layer; +use std::collections::HashSet; +use std::collections::BTreeMap; +use std::ops::Bound::Included; +use zenith_utils::lsn::Lsn; +use std::sync::Arc; + +/// LayerMap is a BTreeMap keyed by RelishTag and the layer's start LSN. +/// It provides a couple of convenience functions over a plain BTreeMap +pub struct LayerMap { + pub inner: BTreeMap<(RelishTag, Lsn), Arc>, +} + +impl LayerMap { + /// + /// Look up using the given rel tag and LSN. This differs from a plain + /// key-value lookup in that if there is any layer that covers the + /// given LSN, or precedes the given LSN, it is returned. In other words, + /// you don't need to know the exact start LSN of the layer. + /// + pub fn get(&self, tag: RelishTag, lsn: Lsn) -> Option> { + let startkey = (tag, Lsn(0)); + let endkey = (tag, lsn); + + if let Some((_k, v)) = self + .inner + .range((Included(startkey), Included(endkey))) + .next_back() + { + Some(Arc::clone(v)) + } else { + None + } + } + + pub fn insert(&mut self, layer: Arc) { + let rel = layer.get_relish_tag(); + let start_lsn = layer.get_start_lsn(); + + self.inner.insert((rel, start_lsn), Arc::clone(&layer)); + } + + pub fn remove(&mut self, layer: &dyn Layer) { + let rel = layer.get_relish_tag(); + let start_lsn = layer.get_start_lsn(); + + self.inner.remove(&(rel, start_lsn)); + } + + pub fn list_rels(&self, + spcnode: u32, + dbnode: u32, + ) -> Result> { + let mut rels: HashSet = HashSet::new(); + + // Scan the timeline directory to get all rels in this timeline. + for ((rel, _lsn), _l) in self.inner.iter() { + if let RelishTag::Relation(reltag) = rel { + // FIXME: skip if it was dropped before the requested LSN. But there is no + // LSN argument + + if (spcnode == 0 || reltag.spcnode == spcnode) + && (dbnode == 0 || reltag.dbnode == dbnode) + { + rels.insert(*reltag); + } + } + } + Ok(rels) + } + + pub fn list_nonrels(&self, _lsn: Lsn) -> Result> { + let mut rels: HashSet = HashSet::new(); + + // Scan the timeline directory to get all rels in this timeline. + for ((rel, _lsn), _l) in self.inner.iter() { + // FIXME: skip if it was dropped before the requested LSN. + + if let RelishTag::Relation(_) = rel { + } else { + rels.insert(*rel); + } + } + Ok(rels) + } + + /// Is there a newer layer for given relation? + pub fn newer_layer_exists(&self, rel: RelishTag, lsn: Lsn) -> bool { + let startkey = (rel, lsn); + let endkey = (rel, Lsn(u64::MAX)); + + for ((_rel, newer_lsn), layer) in self + .inner + .range((Included(startkey), Included(endkey))) + { + if layer.get_end_lsn() > lsn { + info!("found later layer for rel {}, {} {}-{}", rel, lsn, newer_lsn, layer.get_end_lsn()); + return true; + } else { + info!("found singleton layer for rel {}, {} {}", rel, lsn, newer_lsn); + continue; + } + } + info!("no later layer found for rel {}, {}", rel, lsn); + false + } +} + +impl Default for LayerMap { + fn default() -> Self { + LayerMap { inner: BTreeMap::new() } + } +} diff --git a/pageserver/src/layered_repository/snapshot_layer.rs b/pageserver/src/layered_repository/snapshot_layer.rs index 575a337706..11076a53b7 100644 --- a/pageserver/src/layered_repository/snapshot_layer.rs +++ b/pageserver/src/layered_repository/snapshot_layer.rs @@ -41,22 +41,21 @@ use crate::layered_repository::storage_layer::Layer; use crate::layered_repository::storage_layer::PageVersion; use crate::layered_repository::storage_layer::ZERO_PAGE; use crate::relish::*; -use crate::repository::{GcResult, WALRecord}; +use crate::repository::WALRecord; use crate::walredo::WalRedoManager; use crate::PageServerConf; use crate::{ZTimelineId, ZTenantId}; use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use log::*; -use std::collections::{BTreeMap, BTreeSet, HashSet}; +use std::collections::BTreeMap; use std::fmt; use std::fs; use std::fs::File; use std::io::Write; -use std::ops::Bound::{Excluded, Included, Unbounded}; +use std::ops::Bound::Included; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; -use std::time::Instant; +use std::sync::{Arc, Mutex, MutexGuard}; use bookfile::{Book, BookWriter}; @@ -205,8 +204,10 @@ impl fmt::Display for SnapshotFileName { /// /// SnapshotLayer is the in-memory data structure associated with an on-disk snapshot file. -/// It is also used to accumulate new changes at the tip of a branch; end_lsn is u64::MAX -/// in that case. +/// We hold a SnapshotLayer in memory for each file, in the LayerMap. If a layer is in +/// "loaded" state, we have a copy of the file in memory, in 'inner'. Otherwise the struct +/// is just a placeholder for a file that exists in memory, and it needs to be loaded +/// before using it in queries. /// pub struct SnapshotLayer { conf: &'static PageServerConf, @@ -222,16 +223,24 @@ pub struct SnapshotLayer { dropped: bool, + inner: Mutex +} + +pub struct SnapshotLayerInner { + // If false, the 'page_versions' and 'relsizes' have not been loaded into memory + // yet. + loaded: bool, + /// /// All versions of all pages in the file are are kept here. /// Indexed by block number and LSN. /// - page_versions: Mutex>, + page_versions: BTreeMap<(u32, Lsn), PageVersion>, /// /// `relsizes` tracks the size of the relation at different points in time. /// - relsizes: Mutex>, + relsizes: BTreeMap, } impl Layer for SnapshotLayer { @@ -247,6 +256,10 @@ impl Layer for SnapshotLayer { return self.rel; } + fn is_dropped(&self) -> bool { + return self.dropped; + } + fn get_start_lsn(&self) -> Lsn { return self.start_lsn; } @@ -267,10 +280,10 @@ impl Layer for SnapshotLayer { let mut page_img: Option = None; let mut need_base_image_lsn: Option = Some(lsn); { - let page_versions = self.page_versions.lock().unwrap(); + let inner = self.load()?; let minkey = (blknum, Lsn(0)); let maxkey = (blknum, lsn); - let mut iter = page_versions.range((Included(&minkey), Included(&maxkey))); + let mut iter = inner.page_versions.range((Included(&minkey), Included(&maxkey))); while let Some(((_blknum, entry_lsn), entry)) = iter.next_back() { if let Some(img) = &entry.page_image { page_img = Some(img.clone()); @@ -291,7 +304,7 @@ impl Layer for SnapshotLayer { } } - // release lock on 'page_versions' + // release lock on 'inner' } records.reverse(); @@ -365,12 +378,12 @@ impl Layer for SnapshotLayer { /// Get size of the relation at given LSN fn get_rel_size(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let inner = self.load()?; + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); if let Some((_entry_lsn, entry)) = iter.next_back() { let result = *entry; - drop(relsizes); + drop(inner); trace!("get_relsize: {} at {} -> {}", self.rel, lsn, result); Ok(result) } else { @@ -382,9 +395,9 @@ impl Layer for SnapshotLayer { /// Does this relation exist at given LSN? fn get_rel_exists(&self, lsn: Lsn) -> Result { // Scan the BTreeMap backwards, starting from the given entry. - let relsizes = self.relsizes.lock().unwrap(); + let inner = self.load()?; - let mut iter = relsizes.range((Included(&Lsn(0)), Included(&lsn))); + let mut iter = inner.relsizes.range((Included(&Lsn(0)), Included(&lsn))); let result = if let Some((_entry_lsn, _entry)) = iter.next_back() { true @@ -409,9 +422,23 @@ impl Layer for SnapshotLayer { bail!("cannot modify historical snapshot layer"); } - fn freeze(&self, _end_lsn: Lsn) -> Result>> { + fn freeze(&self, _end_lsn: Lsn, _walredo_mgr: &dyn WalRedoManager) -> Result>> { bail!("cannot freeze historical snapshot layer"); } + + fn delete(&self) -> Result<()> { + // delete underlying file + fs::remove_file(self.path())?; + Ok(()) + } + + fn unload(&self) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.page_versions = BTreeMap::new(); + inner.relsizes = BTreeMap::new(); + inner.loaded = false; + Ok(()) + } } impl SnapshotLayer { @@ -463,20 +490,16 @@ impl SnapshotLayer { start_lsn: start_lsn, end_lsn, dropped, - page_versions: Mutex::new(page_versions), - relsizes: Mutex::new(relsizes), + inner: Mutex::new(SnapshotLayerInner { + loaded: true, + page_versions: page_versions, + relsizes: relsizes, + }), }; + let inner = snapfile.inner.lock().unwrap(); - snapfile.save()?; - Ok(snapfile) - } - - /// Write the in-memory btreemaps into files - fn save(&self) -> Result<()> { - let path = self.path(); - - let page_versions = self.page_versions.lock().unwrap(); - let relsizes = self.relsizes.lock().unwrap(); + // Write the in-memory btreemaps into files + let path = snapfile.path(); // Note: This overwrites any existing file. There shouldn't be any. // FIXME: throw an error instead? @@ -486,13 +509,13 @@ impl SnapshotLayer { // Write out page versions let mut chapter = book.new_chapter(PAGE_VERSIONS_CHAPTER); - let buf = BTreeMap::ser(&page_versions)?; + let buf = BTreeMap::ser(&inner.page_versions)?; chapter.write_all(&buf)?; let book = chapter.close()?; // and relsizes to separate chapter let mut chapter = book.new_chapter(REL_SIZES_CHAPTER); - let buf = BTreeMap::ser(&relsizes)?; + let buf = BTreeMap::ser(&inner.relsizes)?; chapter.write_all(&buf)?; let book = chapter.close()?; @@ -500,83 +523,29 @@ impl SnapshotLayer { trace!("saved {}", &path.display()); - Ok(()) + drop(inner); + + Ok(snapfile) } - /// - /// Find the snapshot file with latest LSN that covers the given 'lsn', or is before it. - /// - pub fn find_latest_snapshot_file( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - rel: RelishTag, - earliest_lsn: Lsn, - lsn: Lsn, - ) -> Result> { - // Scan the timeline directory to get all rels in this timeline. - let mut result_start_lsn = Lsn(0); - let mut result_end_lsn = Lsn(0); - let mut result_dropped = false; - for fname in Self::list_snapshot_files(conf, timelineid, tenantid)? { - if fname.end_lsn <= earliest_lsn { - continue; - } + fn load(&self) -> Result> { - if fname.rel == rel && fname.start_lsn <= lsn && fname.end_lsn > result_end_lsn { - result_start_lsn = fname.start_lsn; - result_end_lsn = fname.end_lsn; - result_dropped = fname.dropped; - } - } - if result_start_lsn != Lsn(0) { - Ok(Some((result_start_lsn, result_end_lsn, result_dropped))) - } else { - Ok(None) - } - } + // quick exit if already loaded + let mut inner = self.inner.lock().unwrap(); - /// - /// Load the state for one relation back into memory. - /// - /// Returns the latest snapshot file that before the given 'lsn', but newer than 'earliest_lsn' - /// - pub fn load( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - rel: RelishTag, - earliest_lsn: Lsn, - lsn: Lsn, - ) -> Result> { - if let Some((start_lsn, end_lsn, dropped)) = - Self::find_latest_snapshot_file(conf, timelineid, tenantid, rel, earliest_lsn, lsn)? - { - let snap = Self::load_path(conf, timelineid, tenantid, rel, start_lsn, end_lsn, dropped)?; - Ok(Some(snap)) - } else { - Ok(None) + if inner.loaded { + return Ok(inner); } - } - fn load_path( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - rel: RelishTag, - start_lsn: Lsn, - end_lsn: Lsn, - dropped: bool, - ) -> Result { let path = Self::path_for( - conf, - timelineid, - tenantid, + self.conf, + self.timelineid, + self.tenantid, &SnapshotFileName { - rel, - start_lsn, - end_lsn, - dropped, + rel: self.rel, + start_lsn: self.start_lsn, + end_lsn: self.end_lsn, + dropped: self.dropped, }, ); @@ -597,200 +566,46 @@ impl SnapshotLayer { debug!("loaded from {}", &path.display()); - Ok(SnapshotLayer { - conf, - timelineid, - tenantid, - rel, - start_lsn, - end_lsn, - dropped, - page_versions: Mutex::new(page_versions), - relsizes: Mutex::new(relsizes), - }) - } + *inner = SnapshotLayerInner { + loaded: true, + page_versions, + relsizes, + }; - pub fn list_rels( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - spcnode: u32, - dbnode: u32, - ) -> Result> { - let mut rels: HashSet = HashSet::new(); - - // Scan the timeline directory to get all rels in this timeline. - for snapfiles in Self::list_snapshot_files(conf, timelineid, tenantid)? { - if let RelishTag::Relation(reltag) = snapfiles.rel { - // FIXME: skip if it was dropped before the requested LSN. But there is no - // LSN argument - - if (spcnode == 0 || reltag.spcnode == spcnode) - && (dbnode == 0 || reltag.dbnode == dbnode) - { - rels.insert(reltag); - } - } - } - Ok(rels) - } - - pub fn list_nonrels( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - _lsn: Lsn, - ) -> Result> { - let mut rels: HashSet = HashSet::new(); - - // Scan the timeline directory to get all rels in this timeline. - for snapfile in Self::list_snapshot_files(conf, timelineid, tenantid)? { - // FIXME: skip if it was dropped before the requested LSN. - - if let RelishTag::Relation(_) = snapfile.rel { - } else { - rels.insert(snapfile.rel); - } - } - Ok(rels) - } - - /// - /// Garbage collect snapshot files on a timeline that are no longer needed. - /// - /// The caller specifies how much history is needed with the two arguments: - /// - /// retain_lsns: keep page a version of each page at these LSNs - /// cutoff: also keep everything newer than this LSN - /// - /// The 'retain_lsns' lists is currently used to prevent removing files that - /// are needed by child timelines. In the future, the user might be able to - /// name additional points in time to retain. The caller is responsible for - /// collecting that information. - /// - /// The 'cutoff' point is used to retain recent versions that might still be - /// needed by read-only nodes. (As of this writing, the caller just passes - /// the latest LSN subtracted by a constant, and doesn't do anything smart - /// to figure out what read-only nodes might actually need.) - /// - /// Currently, we don't make any attempt at removing unneeded page versions - /// within a snapshot file. We can only remove the whole file if it's fully - /// obsolete. - /// - pub fn gc_timeline( - conf: &'static PageServerConf, - timelineid: ZTimelineId, - tenantid: ZTenantId, - retain_lsns: Vec, - cutoff: Lsn, - ) -> Result { - let now = Instant::now(); - let mut result: GcResult = Default::default(); - - // Scan all snapshot files in the directory. For each file, if a newer file - // exists, we can remove the old one. - - // For convenience and speed, slurp the list of files in the directory into memory first. - let mut snapfiles: BTreeSet = BTreeSet::new(); - - for fname in Self::list_snapshot_files(conf, timelineid, tenantid)? { - snapfiles.insert(fname.clone()); - - if fname.rel.is_relation() { - result.snapshot_relfiles_total += 1; - } else { - result.snapshot_nonrelfiles_total += 1; - } - } - - // Now determine for each file if it needs to be retained - 'outer: for snapfile in &snapfiles { - // Is it newer than cutoff point? - if snapfile.end_lsn > cutoff { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_needed_by_cutoff += 1; - } else { - result.snapshot_nonrelfiles_needed_by_cutoff += 1; - } - continue 'outer; - } - - // Is it needed by a child branch? - for retain_lsn in &retain_lsns { - // FIXME: are the bounds inclusive or exclusive? - if snapfile.start_lsn <= *retain_lsn && *retain_lsn <= snapfile.end_lsn { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_needed_by_branches += 1; - } else { - result.snapshot_nonrelfiles_needed_by_branches += 1; - } - continue 'outer; - } - } - - // Unless the relation was dropped, is there a later snapshot file for this relation? - if !snapfile.dropped { - let mut found_later_file = false; - if let Some(other_snapfile) = - snapfiles.range((Excluded(snapfile), Unbounded)).next() - { - if other_snapfile.rel != snapfile.rel { - // walked past the files for this rel. So there is no later file. - } else { - // found a later file. - found_later_file = true; - } - } - - if !found_later_file { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_not_updated += 1; - } else { - result.snapshot_nonrelfiles_not_updated += 1; - } - continue 'outer; - } - } - - // We didn't find any reason to keep this file, so remove it. - let path = Self::path_for(conf, timelineid, tenantid, snapfile); - info!("garbage collecting {}", path.display()); - fs::remove_file(path)?; - - if snapfile.dropped { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_dropped += 1; - } else { - result.snapshot_nonrelfiles_dropped += 1; - } - } else { - if snapfile.rel.is_relation() { - result.snapshot_relfiles_removed += 1; - } else { - result.snapshot_nonrelfiles_removed += 1; - } - } - } - - result.elapsed = now.elapsed(); - Ok(result) + Ok(inner) } // TODO: returning an Iterator would be more idiomatic - fn list_snapshot_files( + pub fn list_snapshot_files( conf: &'static PageServerConf, timelineid: ZTimelineId, tenantid: ZTenantId, - ) -> Result> { + ) -> Result>> { let path = conf.timeline_path(&timelineid, &tenantid); - let mut snapfiles = Vec::new(); + let mut snapfiles: Vec> = Vec::new(); for direntry in fs::read_dir(path)? { let fname = direntry?.file_name(); let fname = fname.to_str().unwrap(); if let Some(snapfilename) = SnapshotFileName::from_str(fname) { - snapfiles.push(snapfilename); + + let snapfile = SnapshotLayer { + conf, + timelineid, + tenantid, + rel: snapfilename.rel, + start_lsn: snapfilename.start_lsn, + end_lsn: snapfilename.end_lsn, + dropped: snapfilename.dropped, + inner: Mutex::new(SnapshotLayerInner { + loaded: false, + page_versions: BTreeMap::new(), + relsizes: BTreeMap::new(), + }), + }; + + snapfiles.push(Arc::new(snapfile)); } } return Ok(snapfiles); @@ -804,13 +619,11 @@ impl SnapshotLayer { self.rel, self.start_lsn, self.end_lsn ); - let relsizes = self.relsizes.lock().unwrap(); - //let page_versions = self.page_versions.lock().unwrap(); - - for (k, v) in relsizes.iter() { + let inner = self.inner.lock().unwrap(); + for (k, v) in inner.relsizes.iter() { result += &format!("{}: {}\n", k, v); } - //for (k, v) in page_versions.iter() { + //for (k, v) in inner.page_versions.iter() { // result += &format!("blk {} at {}: {}/{}\n", k.0, k.1, v.page_image.is_some(), v.record.is_some()); //} diff --git a/pageserver/src/layered_repository/storage_layer.rs b/pageserver/src/layered_repository/storage_layer.rs index 1586bfcae1..8531563d61 100644 --- a/pageserver/src/layered_repository/storage_layer.rs +++ b/pageserver/src/layered_repository/storage_layer.rs @@ -33,6 +33,7 @@ pub trait Layer: Send + Sync { fn get_relish_tag(&self) -> RelishTag; fn get_start_lsn(&self) -> Lsn; fn get_end_lsn(&self) -> Lsn; + fn is_dropped(&self) -> bool; fn get_page_at_lsn( &self, @@ -76,5 +77,9 @@ pub trait Layer: Send + Sync { ) } - fn freeze(&self, end_lsn: Lsn) -> Result>>; + fn freeze(&self, end_lsn: Lsn, walredo_mgr: &dyn WalRedoManager) -> Result>>; + + fn delete(&self) -> Result<()>; + + fn unload(&self) -> Result<()>; } diff --git a/pageserver/src/object_repository.rs b/pageserver/src/object_repository.rs index 4000973d6d..145706e931 100644 --- a/pageserver/src/object_repository.rs +++ b/pageserver/src/object_repository.rs @@ -165,6 +165,21 @@ impl Repository for ObjectRepository { Ok(()) } + + fn gc_iteration(&self, timelineid: Option, horizon: u64, compact: bool) -> Result { + if let Some(timelineid) = timelineid { + let timelines = self.timelines.lock().unwrap(); + + // FIXME: If the timeline isn't opened yet, don't open it just for GC. + if let Some(timeline) = timelines.get(&timelineid) { + return timeline.gc_iteration(horizon, compact); + } + } else { + // FIXME: the object repository doesn't support GC on all timelines. Should + // iterate all the timelines here + } + return Ok(GcResult::default()) + } } /// @@ -671,136 +686,6 @@ impl Timeline for ObjectTimeline { Ok(Box::new(ObjectHistory { lsn, iter })) } - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { - let last_lsn = self.get_last_valid_lsn(); - let mut result: GcResult = Default::default(); - - // checked_sub() returns None on overflow. - if let Some(horizon) = last_lsn.checked_sub(horizon) { - // WAL is large enough to perform GC - let now = Instant::now(); - let mut prepared_horizon = Lsn(u64::MAX); - // Iterate through all objects in timeline - for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { - result.inspected += 1; - match obj { - // Prepared transactions - ObjectTag::Buffer(RelishTag::TwoPhase { xid }, _blknum) => { - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - prepared_horizon = Lsn::min(lsn, prepared_horizon); - if self.get_tx_status(xid, horizon)? - != pg_constants::TRANSACTION_STATUS_IN_PROGRESS - { - self.obj_store.unlink(&key, lsn)?; - result.prep_deleted += 1; - } - } - } - ObjectTag::RelationMetadata(_) => { - // Do not need to reconstruct page images, - // just delete all old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - let content = vers.1; - match ObjectValue::des(&content[..])? { - ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - result.dropped += 1; - } - _ => (), // preserve last version - } - last_version = false; - result.truncated += 1; - result.n_relations += 1; - } else { - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } - } - ObjectTag::Buffer(rel, blknum) => { - if rel.is_blocky() { - // Reconstruct page at horizon unless relation was dropped - // and delete all older versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - result.truncated += 1; - last_version = false; - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - if rel_size > blknum { - // preserve and materialize last version before deleting all preceeding - self.get_page_at_lsn_nowait(rel, blknum, lsn)?; - continue; - } - debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); - } else { - if let Some(rel_size) = - self.relsize_get_nowait(rel, last_lsn)? - { - debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); - continue; - } - debug!("Relation {} was dropped at {}", rel, lsn); - } - // relation was dropped or truncated so this block can be removed - } - self.obj_store.unlink(&key, lsn)?; - result.deleted += 1; - } - } else { - // versioned always materialized objects: no need to reconstruct pages - - // Remove old versions over horizon - let mut last_version = true; - let key = ObjectKey { - timeline: self.timelineid, - tag: obj, - }; - for vers in self.obj_store.object_versions(&key, horizon)? { - let lsn = vers.0; - if last_version { - // preserve last version - last_version = false; - } else { - // delete deteriorated version - self.obj_store.unlink(&key, lsn)?; - result.chkp_deleted += 1; - } - } - } - } - _ => (), // do nothing - } - } - result.elapsed = now.elapsed(); - info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", - result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); - if compact { - self.obj_store.compact(); - } - } - Ok(result) - } } impl ObjectTimeline { @@ -993,6 +878,137 @@ impl ObjectTimeline { self.obj_store.put(&key, Lsn(0), &ObjectValue::ser(&val)?) } + + fn gc_iteration(&self, horizon: u64, compact: bool) -> Result { + let last_lsn = self.get_last_valid_lsn(); + let mut result: GcResult = Default::default(); + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(horizon) { + // WAL is large enough to perform GC + let now = Instant::now(); + let mut prepared_horizon = Lsn(u64::MAX); + // Iterate through all objects in timeline + for obj in self.obj_store.list_objects(self.timelineid, last_lsn)? { + result.inspected += 1; + match obj { + // Prepared transactions + ObjectTag::Buffer(RelishTag::TwoPhase { xid }, _blknum) => { + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + prepared_horizon = Lsn::min(lsn, prepared_horizon); + if self.get_tx_status(xid, horizon)? + != pg_constants::TRANSACTION_STATUS_IN_PROGRESS + { + self.obj_store.unlink(&key, lsn)?; + result.prep_deleted += 1; + } + } + } + ObjectTag::RelationMetadata(_) => { + // Do not need to reconstruct page images, + // just delete all old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + let content = vers.1; + match ObjectValue::des(&content[..])? { + ObjectValue::RelationSize(RelationSizeEntry::Unlink) => { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + result.dropped += 1; + } + _ => (), // preserve last version + } + last_version = false; + result.truncated += 1; + result.n_relations += 1; + } else { + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } + } + } + ObjectTag::Buffer(rel, blknum) => { + if rel.is_blocky() { + // Reconstruct page at horizon unless relation was dropped + // and delete all older versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + result.truncated += 1; + last_version = false; + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + if rel_size > blknum { + // preserve and materialize last version before deleting all preceeding + self.get_page_at_lsn_nowait(rel, blknum, lsn)?; + continue; + } + debug!("Drop last block {} of relation {} at {} because it is beyond relation size {}", blknum, rel, lsn, rel_size); + } else { + if let Some(rel_size) = + self.relsize_get_nowait(rel, last_lsn)? + { + debug!("Preserve block {} of relation {} at {} because relation has size {} at {}", blknum, rel, lsn, rel_size, last_lsn); + continue; + } + debug!("Relation {} was dropped at {}", rel, lsn); + } + // relation was dropped or truncated so this block can be removed + } + self.obj_store.unlink(&key, lsn)?; + result.deleted += 1; + } + } else { + // versioned always materialized objects: no need to reconstruct pages + + // Remove old versions over horizon + let mut last_version = true; + let key = ObjectKey { + timeline: self.timelineid, + tag: obj, + }; + for vers in self.obj_store.object_versions(&key, horizon)? { + let lsn = vers.0; + if last_version { + // preserve last version + last_version = false; + } else { + // delete deteriorated version + self.obj_store.unlink(&key, lsn)?; + result.chkp_deleted += 1; + } + } + } + } + _ => (), // do nothing + } + } + result.elapsed = now.elapsed(); + info!("Garbage collection completed in {:?}: {} relations inspected, {} object inspected, {} version histories truncated, {} versions deleted, {} relations dropped", + result.elapsed, result.n_relations, result.inspected, result.truncated, result.deleted, result.dropped); + if compact { + self.obj_store.compact(); + } + } + Ok(result) + } } struct ObjectHistory<'a> { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 90c3e5514b..35dc993c96 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -537,10 +537,9 @@ impl postgres_backend::Handler for PageServerHandler { .map(|h| h.as_str().parse()) .unwrap_or(Ok(self.conf.gc_horizon))?; - let timeline = - page_cache::get_repository_for_tenant(&tenantid)?.get_timeline(timelineid)?; + let repo = page_cache::get_repository_for_tenant(&tenantid)?; - let result = timeline.gc_iteration(gc_horizon, true)?; + let result = repo.gc_iteration(Some(timelineid), gc_horizon, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"n_relations"), @@ -613,6 +612,7 @@ impl postgres_backend::Handler for PageServerHandler { ]))? .write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { + error!("received unknown command from client"); bail!("unknown command"); } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index adfcc981f0..41e091077b 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -31,6 +31,20 @@ pub trait Repository: Send + Sync { /// Branch a timeline fn branch_timeline(&self, src: ZTimelineId, dst: ZTimelineId, start_lsn: Lsn) -> Result<()>; + /// perform one garbage collection iteration. + /// garbage collection is periodically performed by gc thread, + /// but it can be explicitly requested through page server api. + /// + /// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval). + /// `compact` parameter is used to force compaction of storage. + /// some storage implementation are based on lsm tree and require periodic merge (compaction). + /// usually storage implementation determines itself when compaction should be performed. + /// but for gc tests it way be useful to force compaction just after completion of gc iteration + /// to make sure that all detected garbage is removed. + /// so right now `compact` is set to true when gc explicitly requested through page srver api, + /// and is st to false in gc threads which infinitely repeats gc iterations in loop. + fn gc_iteration(&self, timelineid: Option, horizon: u64, compact: bool) -> Result; + // TODO get timelines? //fn get_stats(&self) -> RepositoryStats; } @@ -187,20 +201,6 @@ pub trait Timeline: Send + Sync { // TODO ordering guarantee? fn history<'a>(&'a self) -> Result>; - /// Perform one garbage collection iteration. - /// Garbage collection is periodically performed by GC thread, - /// but it can be explicitly requested through page server API. - /// - /// `horizon` specifies delta from last LSN to preserve all object versions (PITR interval). - /// `compact` parameter is used to force compaction of storage. - /// Some storage implementation are based on LSM tree and require periodic merge (compaction). - /// Usually storage implementation determines itself when compaction should be performed. - /// But for GC tests it way be useful to force compaction just after completion of GC iteration - /// to make sure that all detected garbage is removed. - /// So right now `compact` is set to true when GC explicitly requested through page srver API, - /// and is st to false in GC threads which infinitely repeats GC iterations in loop. - fn gc_iteration(&self, horizon: u64, compact: bool) -> Result; - // Check transaction status fn get_tx_status(&self, xid: TransactionId, lsn: Lsn) -> anyhow::Result { let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE; diff --git a/test_runner/batch_others/test_snapfiles_gc.py b/test_runner/batch_others/test_snapfiles_gc.py index 1cd05d1aa8..761dc95b31 100644 --- a/test_runner/batch_others/test_snapfiles_gc.py +++ b/test_runner/batch_others/test_snapfiles_gc.py @@ -32,6 +32,11 @@ def test_snapfiles_gc(zenith_cli, pageserver, postgres, pg_bin): # Create a test table cur.execute("CREATE TABLE foo(x integer)") + print("Inserting two more rows and running GC") + cur.execute("select relfilenode from pg_class where oid = 'foo'::regclass"); + row = cur.fetchone(); + print("relfilenode is {}", row[0]); + # Run GC, to clear out any garbage left behind in the catalogs by # the CREATE TABLE command. We want to have a clean slate with no garbage # before running the actual tests below, otherwise the counts won't match diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 8c54b31e1e..142cc22f93 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -231,10 +231,11 @@ impl PostgresBackend { } Some(FeMessage::Query(m)) => { - trace!("got query {:?}", m.body); + info!("got query {:?}", m.body); // xxx distinguish fatal and recoverable errors? if let Err(e) = handler.process_query(self, m.body) { let errmsg = format!("{}", e); + error!("process_query errored: {}", errmsg); self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?; } self.write_message(&BeMessage::ReadyForQuery)?;