diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index c792171aa0..ff1a2a247d 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -54,14 +54,14 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); -// Perform a checkpoint in the GC thread, when the LSN has advanced this much since -// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if -// the page server is restarted. +// Flush out an inmemory layer, if it's holding WAL older than this. +// This puts a backstop on how much WAL needs to be re-digested if the +// page server crashes. // // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. -static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024; +static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024; // Metrics collected on operations on the storage repository. lazy_static! { @@ -103,9 +103,8 @@ impl Repository for LayeredRepository { std::fs::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?; let metadata = TimelineMetadata { - last_valid_lsn: start_lsn, - last_record_lsn: start_lsn, - prev_record_lsn: Lsn(0), + disk_consistent_lsn: start_lsn, + prev_record_lsn: None, ancestor_timeline: None, ancestor_lsn: start_lsn, }; @@ -134,9 +133,8 @@ impl Repository for LayeredRepository { // There is initially no data in it, but all the read-calls know to look // into the ancestor. let metadata = TimelineMetadata { - last_valid_lsn: start_lsn, - last_record_lsn: start_lsn, - prev_record_lsn: src_timeline.get_prev_record_lsn(), + disk_consistent_lsn: start_lsn, + prev_record_lsn: Some(src_timeline.get_prev_record_lsn()), // FIXME not atomic with start_lsn ancestor_timeline: Some(src), ancestor_lsn: start_lsn, }; @@ -212,7 +210,7 @@ impl LayeredRepository { .conf .timeline_path(&timelineid, &self.tenantid) .join("wal"); - import_timeline_wal(&wal_dir, &timeline, timeline.get_last_record_lsn())?; + import_timeline_wal(&wal_dir, &timeline, timeline.last_record_lsn.load())?; let timeline_rc = Arc::new(timeline); timelines.insert(timelineid, timeline_rc.clone()); @@ -261,11 +259,9 @@ impl LayeredRepository { { let timelines = self.timelines.lock().unwrap(); for (_timelineid, timeline) in timelines.iter() { - let distance = u64::from(timeline.last_valid_lsn.load()) - - u64::from(timeline.last_checkpoint_lsn.load()); - if distance > CHECKPOINT_INTERVAL { - timeline.checkpoint()?; - } + STORAGE_TIME + .with_label_values(&["checkpoint_timed"]) + .observe_closure_duration(|| timeline.checkpoint_internal(false))? } // release lock on 'timelines' } @@ -333,7 +329,7 @@ impl LayeredRepository { &self, target_timelineid: Option, horizon: u64, - _compact: bool, + compact: bool, ) -> Result { let mut totals: GcResult = Default::default(); let now = Instant::now(); @@ -397,6 +393,13 @@ impl LayeredRepository { let last_lsn = timeline.get_last_valid_lsn(); if let Some(cutoff) = last_lsn.checked_sub(horizon) { + // If GC was explicitly requested by the admin, force flush all in-memory + // layers to disk first, so that they too can be garbage collected. That's + // used in tests, so we want as deterministic results as possible. + if compact { + timeline.checkpoint()?; + } + let result = timeline.gc_timeline(branchpoints, cutoff)?; totals += result; @@ -409,11 +412,24 @@ impl LayeredRepository { } /// Metadata stored on disk for each timeline +/// +/// The fields correspond to the values we hold in memory, in LayeredTimeline. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TimelineMetadata { - last_valid_lsn: Lsn, - last_record_lsn: Lsn, - prev_record_lsn: Lsn, + disk_consistent_lsn: Lsn, + + // This is only set if we know it. We track it in memory when the page + // server is running, but we only track the value corresponding to + // 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a + // lot. We only store it in the metadata file when we flush *all* the + // in-memory data so that 'last_record_lsn' is the same as + // 'disk_consistent_lsn'. That's OK, because after page server restart, as + // soon as we reprocess at least one record, we will have a valid + // 'prev_record_lsn' value in memory again. This is only really needed when + // doing a clean shutdown, so that there is no more WAL beyond + // 'disk_consistent_lsn' + prev_record_lsn: Option, + ancestor_timeline: Option, ancestor_lsn: Lsn, } @@ -456,7 +472,14 @@ pub struct LayeredTimeline { last_record_lsn: AtomicLsn, prev_record_lsn: AtomicLsn, - last_checkpoint_lsn: AtomicLsn, + // All WAL records have been processed and stored durably on files on + // local disk, up to this LSN. On crash and restart, we need to re-process + // the WAL starting from this point. + // + // Some later WAL records might have been processed and also flushed to disk + // already, so don't be surprised to see some, but there's no guarantee on + // them yet. + disk_consistent_lsn: AtomicLsn, // Parent timeline that this timeline was branched from, and the LSN // of the branch point. @@ -774,8 +797,8 @@ impl Timeline for LayeredTimeline { /// metrics collection. fn checkpoint(&self) -> Result<()> { STORAGE_TIME - .with_label_values(&["checkpoint"]) - .observe_closure_duration(|| self.checkpoint_internal()) + .with_label_values(&["checkpoint_force"]) + .observe_closure_duration(|| self.checkpoint_internal(true)) } /// Remember that WAL has been received and added to the page cache up to the given LSN @@ -864,10 +887,13 @@ impl LayeredTimeline { walredo_mgr, - last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), - last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), - prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0), - last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0), + // initialize in-memory 'last_valid_lsn' and 'last_record_lsn' from + // 'disk_consistent_lsn'. + last_valid_lsn: SeqWait::new(metadata.disk_consistent_lsn), + last_record_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0), + + prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.unwrap_or(Lsn(0)).0), + disk_consistent_lsn: AtomicLsn::new(metadata.disk_consistent_lsn.0), ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn, @@ -1003,23 +1029,23 @@ impl LayeredTimeline { let layer; if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? { // Create new entry after the previous one. - let lsn; + let start_lsn; if prev_layer.get_timeline_id() != self.timelineid { // First modification on this timeline - lsn = self.ancestor_lsn; + start_lsn = self.ancestor_lsn; trace!( "creating file for write for {} at branch point {}/{}", seg, self.timelineid, - lsn + start_lsn ); } else { - lsn = prev_layer.get_end_lsn(); + start_lsn = prev_layer.get_end_lsn(); trace!( "creating file for write for {} after previous layer {}/{}", seg, self.timelineid, - lsn + start_lsn ); } trace!( @@ -1034,6 +1060,7 @@ impl LayeredTimeline { &*prev_layer, self.timelineid, self.tenantid, + start_lsn, lsn, )?; } else { @@ -1045,7 +1072,8 @@ impl LayeredTimeline { lsn ); - layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?; + layer = + InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?; } let mut layers = self.layers.lock().unwrap(); @@ -1088,7 +1116,8 @@ impl LayeredTimeline { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. - fn checkpoint_internal(&self) -> Result<()> { + fn checkpoint_internal(&self, force: bool) -> Result<()> { + // FIXME: these should be fetched atomically. let last_valid_lsn = self.last_valid_lsn.load(); let last_record_lsn = self.last_record_lsn.load(); let prev_record_lsn = self.prev_record_lsn.load(); @@ -1105,71 +1134,80 @@ impl LayeredTimeline { // requests will block. let mut layers = self.layers.lock().unwrap(); - // Walk through each in-memory, and write any dirty data to disk, - // as snapshot files. - // - // We currently write a new snapshot file for every relation - // that was modified, if there has been any changes at all. - // It would be smarter to only flush out in-memory layers that - // have accumulated a fair amount of changes. Note that the - // start and end LSNs of snapshot files belonging to different - // relations don't have to line up, although currently they do - // because of the way this works. So you could have a snapshot - // file covering LSN range 100-200 for one relation, and a - // snapshot file covering 150-250 for another relation. The - // read functions should even cope with snapshot files - // covering overlapping ranges for the same relation, although - // that situation never arises currently. - // - // Note: We aggressively freeze and unload all the layer - // structs. Even if a layer is actively being used. This - // keeps memory usage in check, but is probably too - // aggressive. Some kind of LRU policy would be appropriate. + // Take the in-memory layer with the oldest WAL record. If it's older + // than the threshold, write it out to disk as a new snapshot file. + // Repeat until all remaining in-memory layers are within the threshold. // + // That's necessary to limit the amount of WAL that needs to be kept + // in the safekeepers, and that needs to be reprocessed on page server + // crash. TODO: It's not a great policy for keeping memory usage in + // check, though. We should also aim at flushing layers that consume + // a lot of memory and/or aren't receiving much updates anymore. + let mut disk_consistent_lsn = last_record_lsn; + while let Some(oldest_layer) = layers.peek_oldest_open() { + // Does this layer need freezing? + let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); + let distance = last_valid_lsn.0 - oldest_pending_lsn.0; + if !force && distance < OLDEST_INMEM_DISTANCE { + info!( + "the oldest layer is now {} which is {} bytes behind last_valid_lsn", + oldest_layer.get_seg_tag(), + distance + ); + disk_consistent_lsn = oldest_pending_lsn; + break; + } - // Call freeze() on any unfrozen layers (that is, layers that haven't - // been written to disk yet). - // Call unload() on all frozen layers, to release memory. - let mut iter = layers.iter_open_layers(); - while let Some(layer) = iter.next() { - let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?; + // freeze it + let (new_historic, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?; // replace this layer with the new layers that 'freeze' returned - // (removes it if new_open is None) - iter.replace(new_open); - - trace!( - "freeze returned layer {} {}-{}", - new_historic.get_seg_tag(), - new_historic.get_start_lsn(), - new_historic.get_end_lsn() - ); - iter.insert_historic(new_historic); + layers.pop_oldest_open(); + if let Some(n) = new_open { + layers.insert_open(n); + } + layers.insert_historic(new_historic); } + // Call unload() on all frozen layers, to release memory. + // TODO: On-disk layers shouldn't consume much memory to begin with, + // so this shouldn't be necessary. But currently the SnapshotLayer + // code slurps the whole file into memory, so they do in fact consume + // a lot of memory. for layer in layers.iter_historic_layers() { layer.unload()?; } - // Also save the metadata, with updated last_valid_lsn and last_record_lsn, to a - // file in the timeline dir. The metadata reflects the last_valid_lsn as it was - // when we *started* the checkpoint, so that after crash, the WAL receiver knows - // to restart the streaming from that WAL position. - let ancestor_timelineid = if let Some(x) = &self.ancestor_timeline { - Some(x.timelineid) - } else { - None + // Save the metadata, with updated 'disk_consistent_lsn', to a + // file in the timeline dir. After crash, we will restart WAL + // streaming and processing from that point. + + // We can only save a valid 'prev_record_lsn' value on disk if we + // flushed *all* in-memory changes to disk. We only track + // 'prev_record_lsn' in memory for the latest processed record, so we + // don't remember what the correct value that corresponds to some old + // LSN is. But if we flush everything, then the value corresponding + // current 'last_record_lsn' is correct and we can store it on disk. + let ondisk_prev_record_lsn = { + if disk_consistent_lsn == last_record_lsn { + Some(prev_record_lsn) + } else { + None + } }; + + let ancestor_timelineid = self.ancestor_timeline.as_ref().map(|x| x.timelineid); + let metadata = TimelineMetadata { - last_valid_lsn: last_valid_lsn, - last_record_lsn: last_record_lsn, - prev_record_lsn: prev_record_lsn, + disk_consistent_lsn: disk_consistent_lsn, + prev_record_lsn: ondisk_prev_record_lsn, ancestor_timeline: ancestor_timelineid, ancestor_lsn: self.ancestor_lsn, }; LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?; - self.last_checkpoint_lsn.store(last_valid_lsn); + // Also update the in-memory copy + self.disk_consistent_lsn.store(disk_consistent_lsn); Ok(()) } @@ -1200,12 +1238,6 @@ impl LayeredTimeline { let now = Instant::now(); let mut result: GcResult = Default::default(); - // Scan all snapshot files in the directory. For each file, if a newer file - // exists, we can remove the old one. - self.checkpoint()?; - - let mut layers = self.layers.lock().unwrap(); - info!( "running GC on timeline {}, cutoff {}", self.timelineid, cutoff @@ -1213,9 +1245,13 @@ impl LayeredTimeline { let mut layers_to_remove: Vec> = Vec::new(); + // Scan all snapshot files in the directory. For each file, if a newer file + // exists, we can remove the old one. + // // Determine for each file if it needs to be retained // FIXME: also scan open in-memory layers. Normally we cannot remove the // latest layer of any seg, but if it was unlinked it's possible + let mut layers = self.layers.lock().unwrap(); 'outer: for l in layers.iter_historic_layers() { let seg = l.get_seg_tag(); diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 7eac0d9ce9..2b89d694f1 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -32,6 +32,8 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, + oldest_pending_lsn: Lsn, + /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. inner: Mutex, @@ -164,6 +166,11 @@ impl Layer for InMemoryLayer { } impl InMemoryLayer { + /// Return the oldest page version that's stored in this layer + pub fn get_oldest_pending_lsn(&self) -> Lsn { + self.oldest_pending_lsn + } + /// /// Create a new, empty, in-memory layer /// @@ -173,6 +180,7 @@ impl InMemoryLayer { tenantid: ZTenantId, seg: SegmentTag, start_lsn: Lsn, + oldest_pending_lsn: Lsn, ) -> Result { trace!( "initializing new empty InMemoryLayer for writing {} on timeline {} at {}", @@ -187,6 +195,7 @@ impl InMemoryLayer { tenantid, seg, start_lsn, + oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: BTreeMap::new(), @@ -302,13 +311,14 @@ impl InMemoryLayer { src: &dyn Layer, timelineid: ZTimelineId, tenantid: ZTenantId, - lsn: Lsn, + start_lsn: Lsn, + oldest_pending_lsn: Lsn, ) -> Result { trace!( "initializing new InMemoryLayer for writing {} on timeline {} at {}", src.get_seg_tag(), timelineid, - lsn + start_lsn ); let mut page_versions = BTreeMap::new(); let mut segsizes = BTreeMap::new(); @@ -318,8 +328,8 @@ impl InMemoryLayer { let startblk; let size; if seg.rel.is_blocky() { - size = src.get_seg_size(lsn)?; - segsizes.insert(lsn, size); + size = src.get_seg_size(start_lsn)?; + segsizes.insert(start_lsn, size); startblk = seg.segno * RELISH_SEG_SIZE; } else { size = 1; @@ -327,12 +337,12 @@ impl InMemoryLayer { } for blknum in startblk..(startblk + size) { - let img = timeline.materialize_page(seg, blknum, lsn, src)?; + let img = timeline.materialize_page(seg, blknum, start_lsn, src)?; let pv = PageVersion { page_image: Some(img), record: None, }; - page_versions.insert((blknum, lsn), pv); + page_versions.insert((blknum, start_lsn), pv); } Ok(InMemoryLayer { @@ -340,7 +350,8 @@ impl InMemoryLayer { timelineid, tenantid, seg: src.get_seg_tag(), - start_lsn: lsn, + start_lsn, + oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: page_versions, @@ -434,10 +445,17 @@ impl InMemoryLayer { before_segsizes, )?; + trace!( + "freeze: created snapshot layer {} {}-{}", + snapfile.get_seg_tag(), + snapfile.get_start_lsn(), + snapfile.get_end_lsn() + ); + // If there were any "new" page versions, initialize a new in-memory layer to hold // them let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() { - info!("created new in-mem layer for {} {}-", self.seg, end_lsn); + trace!("freeze: created new in-mem layer {} {}-", self.seg, end_lsn); let new_open = Self::copy_snapshot( self.conf, @@ -446,6 +464,7 @@ impl InMemoryLayer { self.timelineid, self.tenantid, end_lsn, + end_lsn, )?; let mut new_inner = new_open.inner.lock().unwrap(); new_inner.page_versions.append(&mut after_page_versions); diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 0cf9f93419..a5d7b5ac7d 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -14,31 +14,71 @@ use crate::layered_repository::{InMemoryLayer, SnapshotLayer}; use crate::relish::*; use anyhow::Result; use log::*; +use std::cmp::Ordering; use std::collections::HashSet; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, BinaryHeap, HashMap}; use std::ops::Bound::Included; use std::sync::Arc; use zenith_utils::lsn::Lsn; /// -/// LayerMap tracks what layers exist or a timeline. The last layer that is -/// open for writes is always an InMemoryLayer, and is tracked separately -/// because there can be only one for each segment. The older layers, -/// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN. +/// LayerMap tracks what layers exist on a timeline. /// pub struct LayerMap { + /// All the layers keyed by segment tag segs: HashMap, + + /// All in-memory layers, ordered by 'oldest_pending_lsn' of each layer. + /// This allows easy access to the in-memory layer that contains the + /// oldest WAL record. + open_segs: BinaryHeap, } +/// +/// Per-segment entry in the LayerMap.segs hash map +/// +/// The last layer that is open for writes is always an InMemoryLayer, +/// and is kept in a separate field, because there can be only one for +/// each segment. The older layers, stored on disk, are kept in a +/// BTreeMap keyed by the layer's start LSN. struct SegEntry { pub open: Option>, pub historic: BTreeMap>, } +/// Entry held LayerMap.open_segs, with boilerplate comparison +/// routines to implement a min-heap ordered by 'oldest_pending_lsn' +struct OpenSegEntry { + pub oldest_pending_lsn: Lsn, + pub layer: Arc, +} +impl Ord for OpenSegEntry { + fn cmp(&self, other: &Self) -> Ordering { + // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here + // to get that. + other.oldest_pending_lsn.cmp(&self.oldest_pending_lsn) + } +} +impl PartialOrd for OpenSegEntry { + fn partial_cmp(&self, other: &Self) -> Option { + // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here + // to get that. + other + .oldest_pending_lsn + .partial_cmp(&self.oldest_pending_lsn) + } +} +impl PartialEq for OpenSegEntry { + fn eq(&self, other: &Self) -> bool { + self.oldest_pending_lsn.eq(&other.oldest_pending_lsn) + } +} +impl Eq for OpenSegEntry {} + impl LayerMap { /// - /// Look up using the given segment tag and LSN. This differs from a plain - /// key-value lookup in that if there is any layer that covers the + /// Look up a layer using the given segment tag and LSN. This differs from a + /// plain key-value lookup in that if there is any layer that covers the /// given LSN, or precedes the given LSN, it is returned. In other words, /// you don't need to know the exact start LSN of the layer. /// @@ -88,14 +128,29 @@ impl LayerMap { if let Some(_old) = &segentry.open { // FIXME: shouldn't exist, but check } - segentry.open = Some(layer); + segentry.open = Some(Arc::clone(&layer)); } else { let segentry = SegEntry { - open: Some(layer), + open: Some(Arc::clone(&layer)), historic: BTreeMap::new(), }; self.segs.insert(tag, segentry); } + + let opensegentry = OpenSegEntry { + oldest_pending_lsn: layer.get_oldest_pending_lsn(), + layer: layer, + }; + self.open_segs.push(opensegentry); + } + + /// Remove the oldest in-memory layer + pub fn pop_oldest_open(&mut self) { + let opensegentry = self.open_segs.pop().unwrap(); + let segtag = opensegentry.layer.get_seg_tag(); + + let mut segentry = self.segs.get_mut(&segtag).unwrap(); + segentry.open = None; } /// @@ -196,10 +251,12 @@ impl LayerMap { false } - pub fn iter_open_layers(&mut self) -> OpenLayerIter { - OpenLayerIter { - last: None, - segiter: self.segs.iter_mut(), + /// Return the oldest in-memory layer. + pub fn peek_oldest_open(&self) -> Option> { + if let Some(opensegentry) = self.open_segs.peek() { + Some(Arc::clone(&opensegentry.layer)) + } else { + None } } @@ -215,46 +272,11 @@ impl Default for LayerMap { fn default() -> Self { LayerMap { segs: HashMap::new(), + open_segs: BinaryHeap::new(), } } } -pub struct OpenLayerIter<'a> { - last: Option<&'a mut SegEntry>, - - segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>, -} - -impl<'a> OpenLayerIter<'a> { - pub fn replace(&mut self, replacement: Option>) { - let segentry = self.last.as_mut().unwrap(); - segentry.open = replacement; - } - - pub fn insert_historic(&mut self, new_layer: Arc) { - let start_lsn = new_layer.get_start_lsn(); - - let segentry = self.last.as_mut().unwrap(); - segentry.historic.insert(start_lsn, new_layer); - } -} - -impl<'a> Iterator for OpenLayerIter<'a> { - type Item = Arc; - - fn next(&mut self) -> std::option::Option<::Item> { - while let Some((_seg, entry)) = self.segiter.next() { - if let Some(open) = &entry.open { - let op = Arc::clone(&open); - self.last = Some(entry); - return Some(op); - } - } - self.last = None; - None - } -} - pub struct HistoricLayerIter<'a> { segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, iter: Option>>,