From a389c2ed7f189075f93671673637e372195ad00a Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 16 Aug 2021 14:54:20 +0300 Subject: [PATCH] WIP: Track oldest open layer --- pageserver/src/layered_repository.rs | 79 ++++++++------- .../src/layered_repository/inmemory_layer.rs | 63 +++++++----- .../src/layered_repository/layer_map.rs | 98 ++++++++++--------- 3 files changed, 137 insertions(+), 103 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index cc78086468..e6ff4ec32c 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -54,14 +54,14 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); -// Perform a checkpoint in the GC thread, when the LSN has advanced this much since -// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if -// the page server is restarted. +// Flush out an inmemory layer, if it's holding WAL older than +// this. This puts a backstop on how much WAL needs to be re-digested +// if the page server is restarted. // // FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB // would be more appropriate. But a low value forces the code to be exercised more, // which is good for now to trigger bugs. -static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024; +static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024; // Metrics collected on operations on the storage repository. lazy_static! { @@ -261,11 +261,11 @@ impl LayeredRepository { { let timelines = self.timelines.lock().unwrap(); for (_timelineid, timeline) in timelines.iter() { - let distance = u64::from(timeline.last_valid_lsn.load()) - - u64::from(timeline.last_checkpoint_lsn.load()); - if distance > CHECKPOINT_INTERVAL { - timeline.checkpoint()?; - } + STORAGE_TIME + .with_label_values(&["checkpoint_timed"]) + .observe_closure_duration( + || timeline.checkpoint_internal(false) + )? } // release lock on 'timelines' } @@ -456,7 +456,7 @@ pub struct LayeredTimeline { last_record_lsn: AtomicLsn, prev_record_lsn: AtomicLsn, - last_checkpoint_lsn: AtomicLsn, + oldest_pending_lsn: AtomicLsn, // Parent timeline that this timeline was branched from, and the LSN // of the branch point. @@ -774,8 +774,8 @@ impl Timeline for LayeredTimeline { /// metrics collection. fn checkpoint(&self) -> Result<()> { STORAGE_TIME - .with_label_values(&["checkpoint"]) - .observe_closure_duration(|| self.checkpoint_internal()) + .with_label_values(&["checkpoint_force"]) + .observe_closure_duration(|| self.checkpoint_internal(true)) } /// Remember that WAL has been received and added to the page cache up to the given LSN @@ -867,7 +867,7 @@ impl LayeredTimeline { last_valid_lsn: SeqWait::new(metadata.last_valid_lsn), last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0), prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0), - last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0), + oldest_pending_lsn: AtomicLsn::new(metadata.last_valid_lsn.0), ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn, @@ -1003,23 +1003,23 @@ impl LayeredTimeline { let layer; if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? { // Create new entry after the previous one. - let lsn; + let start_lsn; if prev_layer.get_timeline_id() != self.timelineid { // First modification on this timeline - lsn = self.ancestor_lsn; + start_lsn = self.ancestor_lsn; trace!( "creating file for write for {} at branch point {}/{}", seg, self.timelineid, - lsn + start_lsn ); } else { - lsn = prev_layer.get_end_lsn(); + start_lsn = prev_layer.get_end_lsn(); trace!( "creating file for write for {} after previous layer {}/{}", seg, self.timelineid, - lsn + start_lsn ); } trace!( @@ -1034,6 +1034,7 @@ impl LayeredTimeline { &*prev_layer, self.timelineid, self.tenantid, + start_lsn, lsn, )?; } else { @@ -1045,7 +1046,7 @@ impl LayeredTimeline { lsn ); - layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?; + layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?; } let mut layers = self.layers.lock().unwrap(); @@ -1088,7 +1089,7 @@ impl LayeredTimeline { /// /// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't /// know anything about them here in the repository. - fn checkpoint_internal(&self) -> Result<()> { + fn checkpoint_internal(&self, force: bool) -> Result<()> { let last_valid_lsn = self.last_valid_lsn.load(); let last_record_lsn = self.last_record_lsn.load(); let prev_record_lsn = self.prev_record_lsn.load(); @@ -1130,14 +1131,26 @@ impl LayeredTimeline { // Call freeze() on any unfrozen layers (that is, layers that haven't // been written to disk yet). // Call unload() on all frozen layers, to release memory. - let mut iter = layers.iter_open_layers(); - while let Some(layer) = iter.next() { - let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?; + + let mut oldest_pending_lsn = last_valid_lsn; + + while let Some(oldest_layer) = layers.get_oldest_open_layer() { + + oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); + let distance = last_valid_lsn.0 - oldest_pending_lsn.0; + if !force && distance < OLDEST_INMEM_DISTANCE { + info!("the oldest layer is now {} which is {} bytes behind last_valid_lsn", + oldest_layer.get_seg_tag(), distance); + break; + } + + let (new_historic, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?; // replace this layer with the new layers that 'freeze' returned - // (removes it if new_open is None) - iter.replace(new_open); - + layers.pop_oldest(); + if let Some(n) = new_open { + layers.insert_open(n); + } if let Some(historic) = new_historic { trace!( "freeze returned layer {} {}-{}", @@ -1145,7 +1158,7 @@ impl LayeredTimeline { historic.get_start_lsn(), historic.get_end_lsn() ); - iter.insert_historic(historic); + layers.insert_historic(historic); } } @@ -1171,7 +1184,7 @@ impl LayeredTimeline { }; LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?; - self.last_checkpoint_lsn.store(last_valid_lsn); + self.oldest_pending_lsn.store(oldest_pending_lsn); Ok(()) } @@ -1202,12 +1215,6 @@ impl LayeredTimeline { let now = Instant::now(); let mut result: GcResult = Default::default(); - // Scan all snapshot files in the directory. For each file, if a newer file - // exists, we can remove the old one. - self.checkpoint()?; - - let mut layers = self.layers.lock().unwrap(); - info!( "running GC on timeline {}, cutoff {}", self.timelineid, cutoff @@ -1215,9 +1222,13 @@ impl LayeredTimeline { let mut layers_to_remove: Vec> = Vec::new(); + // Scan all snapshot files in the directory. For each file, if a newer file + // exists, we can remove the old one. + // // Determine for each file if it needs to be retained // FIXME: also scan open in-memory layers. Normally we cannot remove the // latest layer of any seg, but if it was unlinked it's possible + let mut layers = self.layers.lock().unwrap(); 'outer: for l in layers.iter_historic_layers() { let seg = l.get_seg_tag(); diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index cc356efaed..7f7492f4a5 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -32,6 +32,8 @@ pub struct InMemoryLayer { /// start_lsn: Lsn, + oldest_pending_lsn: Lsn, + /// The above fields never change. The parts that do change are in 'inner', /// and protected by mutex. inner: Mutex, @@ -164,6 +166,11 @@ impl Layer for InMemoryLayer { } impl InMemoryLayer { + + pub fn get_oldest_pending_lsn(&self) -> Lsn { + self.oldest_pending_lsn + } + /// /// Create a new, empty, in-memory layer /// @@ -173,6 +180,7 @@ impl InMemoryLayer { tenantid: ZTenantId, seg: SegmentTag, start_lsn: Lsn, + oldest_pending_lsn: Lsn, ) -> Result { trace!( "initializing new empty InMemoryLayer for writing {} on timeline {} at {}", @@ -187,6 +195,7 @@ impl InMemoryLayer { tenantid, seg, start_lsn, + oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: BTreeMap::new(), @@ -314,13 +323,14 @@ impl InMemoryLayer { src: &dyn Layer, timelineid: ZTimelineId, tenantid: ZTenantId, - lsn: Lsn, + start_lsn: Lsn, + oldest_pending_lsn: Lsn, ) -> Result { trace!( "initializing new InMemoryLayer for writing {} on timeline {} at {}", src.get_seg_tag(), timelineid, - lsn + start_lsn ); let mut page_versions = BTreeMap::new(); let mut segsizes = BTreeMap::new(); @@ -331,8 +341,8 @@ impl InMemoryLayer { let startblk; let size; if seg.rel.is_blocky() { - size = src.get_seg_size(lsn)?; - segsizes.insert(lsn, size); + size = src.get_seg_size(start_lsn)?; + segsizes.insert(start_lsn, size); startblk = seg.segno * RELISH_SEG_SIZE; } else { size = 1; @@ -340,13 +350,13 @@ impl InMemoryLayer { } for blknum in startblk..(startblk + size) { - let img = timeline.materialize_page(seg, blknum, lsn, src)?; + let img = timeline.materialize_page(seg, blknum, start_lsn, src)?; let pv = PageVersion { page_image: Some(img), record: None, }; mem_used += pv.get_mem_size(); - page_versions.insert((blknum, lsn), pv); + page_versions.insert((blknum, start_lsn), pv); } Ok(InMemoryLayer { @@ -354,7 +364,8 @@ impl InMemoryLayer { timelineid, tenantid, seg: src.get_seg_tag(), - start_lsn: lsn, + start_lsn, + oldest_pending_lsn, inner: Mutex::new(InMemoryLayerInner { drop_lsn: None, page_versions: page_versions, @@ -451,26 +462,28 @@ impl InMemoryLayer { // If there were any "new" page versions, initialize a new in-memory layer to hold // them - let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() { - info!("created new in-mem layer for {} {}-", self.seg, end_lsn); + let new_open = + if !after_segsizes.is_empty() || !after_page_versions.is_empty() { + info!("created new in-mem layer for {} {}-", self.seg, end_lsn); - let new_open = Self::copy_snapshot( - self.conf, - timeline, - &snapfile, - self.timelineid, - self.tenantid, - end_lsn, - )?; - let mut new_inner = new_open.inner.lock().unwrap(); - new_inner.page_versions.append(&mut after_page_versions); - new_inner.segsizes.append(&mut after_segsizes); - drop(new_inner); + let new_open = Self::copy_snapshot( + self.conf, + timeline, + &snapfile, + self.timelineid, + self.tenantid, + end_lsn, + end_lsn, + )?; + let mut new_inner = new_open.inner.lock().unwrap(); + new_inner.page_versions.append(&mut after_page_versions); + new_inner.segsizes.append(&mut after_segsizes); + drop(new_inner); - Some(Arc::new(new_open)) - } else { - None - }; + Some(Arc::new(new_open)) + } else { + None + }; let new_historic = Some(Arc::new(snapfile)); diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 0cf9f93419..3cca2f68ca 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -15,19 +15,23 @@ use crate::relish::*; use anyhow::Result; use log::*; use std::collections::HashSet; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BinaryHeap, BTreeMap, HashMap}; use std::ops::Bound::Included; +use std::cmp::Ordering; use std::sync::Arc; use zenith_utils::lsn::Lsn; /// -/// LayerMap tracks what layers exist or a timeline. The last layer that is +/// LayerMap tracks what layers exist on a timeline. The last layer that is /// open for writes is always an InMemoryLayer, and is tracked separately /// because there can be only one for each segment. The older layers, /// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN. /// pub struct LayerMap { segs: HashMap, + + // FIXME: explain this + open_segs: BinaryHeap, } struct SegEntry { @@ -35,6 +39,31 @@ struct SegEntry { pub historic: BTreeMap>, } +struct OpenSegEntry { + pub oldest_pending_lsn: Lsn, + pub layer: Arc, +} +impl Ord for OpenSegEntry { + fn cmp(&self, other: &Self) -> Ordering { + // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here + // to get that. + other.oldest_pending_lsn.cmp(&self.oldest_pending_lsn) + } +} +impl PartialOrd for OpenSegEntry { + fn partial_cmp(&self, other: &Self) -> Option { + // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here + // to get that. + other.oldest_pending_lsn.partial_cmp(&self.oldest_pending_lsn) + } +} +impl PartialEq for OpenSegEntry { + fn eq(&self, other: &Self) -> bool { + self.oldest_pending_lsn.eq(&other.oldest_pending_lsn) + } +} +impl Eq for OpenSegEntry {} + impl LayerMap { /// /// Look up using the given segment tag and LSN. This differs from a plain @@ -88,14 +117,29 @@ impl LayerMap { if let Some(_old) = &segentry.open { // FIXME: shouldn't exist, but check } - segentry.open = Some(layer); + segentry.open = Some(Arc::clone(&layer)); } else { let segentry = SegEntry { - open: Some(layer), + open: Some(Arc::clone(&layer)), historic: BTreeMap::new(), }; self.segs.insert(tag, segentry); } + + let opensegentry = OpenSegEntry { + oldest_pending_lsn: layer.get_oldest_pending_lsn(), + layer: layer, + }; + self.open_segs.push(opensegentry); + } + + // replace given open layer with other layers. + pub fn pop_oldest(&mut self) { + let opensegentry = self.open_segs.pop().unwrap(); + let segtag = opensegentry.layer.get_seg_tag(); + + let mut segentry = self.segs.get_mut(&segtag).unwrap(); + segentry.open = None; } /// @@ -196,10 +240,11 @@ impl LayerMap { false } - pub fn iter_open_layers(&mut self) -> OpenLayerIter { - OpenLayerIter { - last: None, - segiter: self.segs.iter_mut(), + pub fn get_oldest_open_layer(&mut self) -> Option> { + if let Some(opensegentry) = self.open_segs.peek() { + Some(Arc::clone(&opensegentry.layer)) + } else { + None } } @@ -215,46 +260,11 @@ impl Default for LayerMap { fn default() -> Self { LayerMap { segs: HashMap::new(), + open_segs: BinaryHeap::new(), } } } -pub struct OpenLayerIter<'a> { - last: Option<&'a mut SegEntry>, - - segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>, -} - -impl<'a> OpenLayerIter<'a> { - pub fn replace(&mut self, replacement: Option>) { - let segentry = self.last.as_mut().unwrap(); - segentry.open = replacement; - } - - pub fn insert_historic(&mut self, new_layer: Arc) { - let start_lsn = new_layer.get_start_lsn(); - - let segentry = self.last.as_mut().unwrap(); - segentry.historic.insert(start_lsn, new_layer); - } -} - -impl<'a> Iterator for OpenLayerIter<'a> { - type Item = Arc; - - fn next(&mut self) -> std::option::Option<::Item> { - while let Some((_seg, entry)) = self.segiter.next() { - if let Some(open) = &entry.open { - let op = Arc::clone(&open); - self.last = Some(entry); - return Some(op); - } - } - self.last = None; - None - } -} - pub struct HistoricLayerIter<'a> { segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, iter: Option>>,