diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 24b428b7a8..267e30d706 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -11,7 +11,7 @@ //! parent timeline, and the last LSN that has been written to disk. //! -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{anyhow, bail, ensure, Context, Result}; use bookfile::Book; use bytes::Bytes; use lazy_static::lazy_static; @@ -53,6 +53,7 @@ mod delta_layer; mod filename; mod image_layer; mod inmemory_layer; +mod interval_tree; mod layer_map; mod storage_layer; @@ -783,6 +784,7 @@ impl Timeline for LayeredTimeline { rel ); } + ensure!(rec.lsn.is_aligned(), "unaligned record LSN"); let seg = SegmentTag::from_blknum(rel, blknum); let delta_size = self.perform_write_op(seg, rec.lsn, |layer| { @@ -796,6 +798,7 @@ impl Timeline for LayeredTimeline { if !rel.is_blocky() { bail!("invalid truncation for non-blocky relish {}", rel); } + ensure!(lsn.is_aligned(), "unaligned record LSN"); debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn); @@ -886,6 +889,7 @@ impl Timeline for LayeredTimeline { rel ); } + ensure!(lsn.is_aligned(), "unaligned record LSN"); let seg = SegmentTag::from_blknum(rel, blknum); @@ -1382,7 +1386,7 @@ impl LayeredTimeline { } // Finally, replace the frozen in-memory layer with the new on-disk layers - layers.remove_historic(frozen.as_ref()); + layers.remove_historic(frozen.clone()); // If we created a successor InMemoryLayer, its predecessor is // currently the frozen layer. We need to update the predecessor @@ -1559,7 +1563,7 @@ impl LayeredTimeline { // Check if this layer serves as a tombstone for this timeline // We have to do this separately from timeline check below, // because LayerMap of this timeline is already locked. - let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn); + let mut is_tombstone = layers.layer_exists_at_lsn(l.get_seg_tag(), prior_lsn)?; if is_tombstone { info!( "earlier layer exists at {} in {}", @@ -1639,7 +1643,7 @@ impl LayeredTimeline { // while iterating it. BTreeMap::retain() would be another option) for doomed_layer in layers_to_remove { doomed_layer.delete()?; - layers.remove_historic(&*doomed_layer); + layers.remove_historic(doomed_layer.clone()); match ( doomed_layer.is_dropped(), diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index ef00400dd2..040f799b01 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -15,11 +15,13 @@ use crate::{ZTenantId, ZTimelineId}; use anyhow::{bail, Result}; use bytes::Bytes; use log::*; +use std::cmp::min; use std::collections::BTreeMap; use std::ops::Bound::Included; use std::path::PathBuf; use std::sync::{Arc, Mutex}; +use zenith_utils::accum::Accum; use zenith_utils::lsn::Lsn; pub struct InMemoryLayer { @@ -506,6 +508,7 @@ impl InMemoryLayer { ) -> Result { let seg = src.get_seg_tag(); + assert!(oldest_pending_lsn.is_aligned()); assert!(oldest_pending_lsn >= start_lsn); trace!( @@ -588,9 +591,11 @@ impl InMemoryLayer { // at the 'cutoff_lsn' point. let mut before_segsizes = BTreeMap::new(); let mut after_segsizes = BTreeMap::new(); + let mut after_oldest_lsn: Accum = Accum(None); for (lsn, size) in inner.segsizes.iter() { if *lsn > cutoff_lsn { after_segsizes.insert(*lsn, *size); + after_oldest_lsn.accum(min, *lsn); } else { before_segsizes.insert(*lsn, *size); } @@ -601,6 +606,7 @@ impl InMemoryLayer { for ((blknum, lsn), pv) in inner.page_versions.iter() { if *lsn > cutoff_lsn { after_page_versions.insert((*blknum, *lsn), pv.clone()); + after_oldest_lsn.accum(min, *lsn); } else { before_page_versions.insert((*blknum, *lsn), pv.clone()); } @@ -630,7 +636,7 @@ impl InMemoryLayer { self.timelineid, self.tenantid, cutoff_lsn + 1, - cutoff_lsn + 1, + after_oldest_lsn.0.unwrap(), )?; let new_inner = new_open.inner.get_mut().unwrap(); diff --git a/pageserver/src/layered_repository/interval_tree.rs b/pageserver/src/layered_repository/interval_tree.rs new file mode 100644 index 0000000000..562d32d4bf --- /dev/null +++ b/pageserver/src/layered_repository/interval_tree.rs @@ -0,0 +1,475 @@ +/// +/// IntervalTree is data structure for holding intervals. It is generic +/// to make unit testing possible, but the only real user of it is the layer map, +/// +/// It's inspired by the "segment tree" or a "statistic tree" as described in +/// https://en.wikipedia.org/wiki/Segment_tree. However, we use a B-tree to hold +/// the points instead of a binary tree. This is called an "interval tree" instead +/// of "segment tree" because the term "segment" is already using Zenith to mean +/// something else. To add to the confusion, there is another data structure known +/// as "interval tree" out there (see https://en.wikipedia.org/wiki/Interval_tree), +/// for storing intervals, but this isn't that. +/// +/// The basic idea is to have a B-tree of "interesting Points". At each Point, +/// there is a list of intervals that contain the point. The Points are formed +/// from the start bounds of each interval; there is a Point for each distinct +/// start bound. +/// +/// Operations: +/// +/// To find intervals that contain a given point, you search the b-tree to find +/// the nearest Point <= search key. Then you just return the list of intervals. +/// +/// To insert an interval, find the Point with start key equal to the inserted item. +/// If the Point doesn't exist yet, create it, by copying all the items from the +/// previous Point that cover the new Point. Then walk right, inserting the new +/// interval to all the Points that are contained by the new interval (including the +/// newly created Point). +/// +/// To remove an interval, you scan the tree for all the Points that are contained by +/// the removed interval, and remove it from the list in each Point. +/// +/// Requirements and assumptions: +/// +/// - Can store overlapping items +/// - But there are not many overlapping items +/// - The interval bounds don't change after it is added to the tree +/// - Intervals are uniquely identified by pointer equality. You must not be insert the +/// same interval object twice, and `remove` uses pointer equality to remove the right +/// interval. It is OK to have two intervals with the same bounds, however. +/// +use std::collections::BTreeMap; +use std::fmt::Debug; +use std::ops::Range; +use std::sync::Arc; + +pub struct IntervalTree +where + I: IntervalItem, +{ + points: BTreeMap>, +} + +struct Point { + /// All intervals that contain this point, in no particular order. + /// + /// We assume that there aren't a lot of overlappingg intervals, so that this vector + /// never grows very large. If that assumption doesn't hold, we could keep this ordered + /// by the end bound, to speed up `search`. But as long as there are only a few elements, + /// a linear search is OK. + elements: Vec>, +} + +/// Abstraction for an interval that can be stored in the tree +/// +/// The start bound is inclusive and the end bound is exclusive. End must be greater +/// than start. +pub trait IntervalItem { + type Key: Ord + Copy + Debug + Sized; + + fn start_key(&self) -> Self::Key; + fn end_key(&self) -> Self::Key; + + fn bounds(&self) -> Range { + self.start_key()..self.end_key() + } +} + +impl IntervalTree +where + I: IntervalItem, +{ + /// Return an element that contains 'key', or precedes it. + /// + /// If there are multiple candidates, returns the one with the highest 'end' key. + pub fn search(&self, key: I::Key) -> Option> { + // Find the greatest point that precedes or is equal to the search key. If there is + // none, returns None. + let (_, p) = self.points.range(..=key).next_back()?; + + // Find the element with the highest end key at this point + let highest_item = p + .elements + .iter() + .reduce(|a, b| { + // starting with Rust 1.53, could use `std::cmp::min_by_key` here + if a.end_key() > b.end_key() { + a + } else { + b + } + }) + .unwrap(); + Some(Arc::clone(highest_item)) + } + + /// Iterate over all items with start bound >= 'key' + pub fn iter_newer(&self, key: I::Key) -> IntervalIter { + IntervalIter { + point_iter: self.points.range(key..), + elem_iter: None, + } + } + + /// Iterate over all items + pub fn iter(&self) -> IntervalIter { + IntervalIter { + point_iter: self.points.range(..), + elem_iter: None, + } + } + + pub fn insert(&mut self, item: Arc) { + let start_key = item.start_key(); + let end_key = item.end_key(); + assert!(start_key < end_key); + let bounds = start_key..end_key; + + // Find the starting point and walk forward from there + let mut found_start_point = false; + let iter = self.points.range_mut(bounds); + for (point_key, point) in iter { + if *point_key == start_key { + found_start_point = true; + // It is an error to insert the same item to the tree twice. + assert!( + point + .elements + .iter() + .find(|x| Arc::ptr_eq(x, &item)) + .is_none(), + "interval is already in the tree" + ); + } + point.elements.push(Arc::clone(&item)); + } + if !found_start_point { + // Create a new Point for the starting point + + // Look at the previous point, and copy over elements that overlap with this + // new point + let mut new_elements: Vec> = Vec::new(); + if let Some((_, prev_point)) = self.points.range(..start_key).next_back() { + let overlapping_prev_elements = prev_point + .elements + .iter() + .filter(|x| x.bounds().contains(&start_key)) + .cloned(); + + new_elements.extend(overlapping_prev_elements); + } + new_elements.push(item); + + let new_point = Point { + elements: new_elements, + }; + self.points.insert(start_key, new_point); + } + } + + pub fn remove(&mut self, item: &Arc) { + // range search points + let start_key = item.start_key(); + let end_key = item.end_key(); + let bounds = start_key..end_key; + + let mut points_to_remove: Vec = Vec::new(); + let mut found_start_point = false; + for (point_key, point) in self.points.range_mut(bounds) { + if *point_key == start_key { + found_start_point = true; + } + let len_before = point.elements.len(); + point.elements.retain(|other| !Arc::ptr_eq(other, &item)); + let len_after = point.elements.len(); + assert_eq!(len_after + 1, len_before); + if len_after == 0 { + points_to_remove.push(*point_key); + } + } + assert!(found_start_point); + + for k in points_to_remove { + self.points.remove(&k).unwrap(); + } + } +} + +pub struct IntervalIter<'a, I: ?Sized> +where + I: IntervalItem, +{ + point_iter: std::collections::btree_map::Range<'a, I::Key, Point>, + elem_iter: Option<(I::Key, std::slice::Iter<'a, Arc>)>, +} + +impl<'a, I> Iterator for IntervalIter<'a, I> +where + I: IntervalItem + ?Sized, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + // Iterate over all elements in all the points in 'point_iter'. To avoid + // returning the same element twice, we only return each element at its + // starting point. + loop { + // Return next remaining element from the current point + if let Some((point_key, elem_iter)) = &mut self.elem_iter { + for elem in elem_iter { + if elem.start_key() == *point_key { + return Some(Arc::clone(elem)); + } + } + } + // No more elements at this point. Move to next point. + if let Some((point_key, point)) = self.point_iter.next() { + self.elem_iter = Some((*point_key, point.elements.iter())); + continue; + } else { + // No more points, all done + return None; + } + } + } +} + +impl Default for IntervalTree +where + I: IntervalItem, +{ + fn default() -> Self { + IntervalTree { + points: BTreeMap::new(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fmt; + + #[derive(Debug)] + struct MockItem { + start_key: u32, + end_key: u32, + val: String, + } + impl IntervalItem for MockItem { + type Key = u32; + + fn start_key(&self) -> u32 { + self.start_key + } + fn end_key(&self) -> u32 { + self.end_key + } + } + impl MockItem { + fn new(start_key: u32, end_key: u32) -> Self { + MockItem { + start_key, + end_key, + val: format!("{}-{}", start_key, end_key), + } + } + fn new_str(start_key: u32, end_key: u32, val: &str) -> Self { + MockItem { + start_key, + end_key, + val: format!("{}-{}: {}", start_key, end_key, val), + } + } + } + impl fmt::Display for MockItem { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.val) + } + } + fn assert_search( + tree: &IntervalTree, + key: u32, + expected: &[&str], + ) -> Option> { + if let Some(v) = tree.search(key) { + let vstr = v.to_string(); + + if expected.is_empty() { + panic!("search with {} returned {}, expected None", key, v); + } + + if !expected.contains(&vstr.as_str()) { + panic!( + "search with {} returned {}, expected one of: {:?}", + key, v, expected + ); + } + Some(v) + } else { + if !expected.is_empty() { + panic!( + "search with {} returned None, expected one of {:?}", + key, expected + ); + } + None + } + } + + fn assert_contents(tree: &IntervalTree, expected: &[&str]) { + let mut contents: Vec = tree.iter().map(|e| e.to_string()).collect(); + contents.sort(); + assert_eq!(contents, expected); + } + + fn dump_tree(tree: &IntervalTree) { + for (point_key, point) in tree.points.iter() { + print!("{}:", point_key); + for e in point.elements.iter() { + print!(" {}", e); + } + println!(); + } + } + + #[test] + fn test_interval_tree_simple() { + let mut tree: IntervalTree = IntervalTree::default(); + + // Simple, non-overlapping ranges. + tree.insert(Arc::new(MockItem::new(10, 11))); + tree.insert(Arc::new(MockItem::new(11, 12))); + tree.insert(Arc::new(MockItem::new(12, 13))); + tree.insert(Arc::new(MockItem::new(18, 19))); + tree.insert(Arc::new(MockItem::new(17, 18))); + tree.insert(Arc::new(MockItem::new(15, 16))); + + assert_search(&tree, 9, &[]); + assert_search(&tree, 10, &["10-11"]); + assert_search(&tree, 11, &["11-12"]); + assert_search(&tree, 12, &["12-13"]); + assert_search(&tree, 13, &["12-13"]); + assert_search(&tree, 14, &["12-13"]); + assert_search(&tree, 15, &["15-16"]); + assert_search(&tree, 16, &["15-16"]); + assert_search(&tree, 17, &["17-18"]); + assert_search(&tree, 18, &["18-19"]); + assert_search(&tree, 19, &["18-19"]); + assert_search(&tree, 20, &["18-19"]); + + // remove a few entries and search around them again + tree.remove(&assert_search(&tree, 10, &["10-11"]).unwrap()); // first entry + tree.remove(&assert_search(&tree, 12, &["12-13"]).unwrap()); // entry in the middle + tree.remove(&assert_search(&tree, 18, &["18-19"]).unwrap()); // last entry + assert_search(&tree, 9, &[]); + assert_search(&tree, 10, &[]); + assert_search(&tree, 11, &["11-12"]); + assert_search(&tree, 12, &["11-12"]); + assert_search(&tree, 14, &["11-12"]); + assert_search(&tree, 15, &["15-16"]); + assert_search(&tree, 17, &["17-18"]); + assert_search(&tree, 18, &["17-18"]); + } + + #[test] + fn test_interval_tree_overlap() { + let mut tree: IntervalTree = IntervalTree::default(); + + // Overlapping items + tree.insert(Arc::new(MockItem::new(22, 24))); + tree.insert(Arc::new(MockItem::new(23, 25))); + let x24_26 = Arc::new(MockItem::new(24, 26)); + tree.insert(Arc::clone(&x24_26)); + let x26_28 = Arc::new(MockItem::new(26, 28)); + tree.insert(Arc::clone(&x26_28)); + tree.insert(Arc::new(MockItem::new(25, 27))); + + assert_search(&tree, 22, &["22-24"]); + assert_search(&tree, 23, &["22-24", "23-25"]); + assert_search(&tree, 24, &["23-25", "24-26"]); + assert_search(&tree, 25, &["24-26", "25-27"]); + assert_search(&tree, 26, &["25-27", "26-28"]); + assert_search(&tree, 27, &["26-28"]); + assert_search(&tree, 28, &["26-28"]); + assert_search(&tree, 29, &["26-28"]); + + tree.remove(&x24_26); + tree.remove(&x26_28); + assert_search(&tree, 23, &["22-24", "23-25"]); + assert_search(&tree, 24, &["23-25"]); + assert_search(&tree, 25, &["25-27"]); + assert_search(&tree, 26, &["25-27"]); + assert_search(&tree, 27, &["25-27"]); + assert_search(&tree, 28, &["25-27"]); + assert_search(&tree, 29, &["25-27"]); + } + + #[test] + fn test_interval_tree_nested() { + let mut tree: IntervalTree = IntervalTree::default(); + + // Items containing other items + tree.insert(Arc::new(MockItem::new(31, 39))); + tree.insert(Arc::new(MockItem::new(32, 34))); + tree.insert(Arc::new(MockItem::new(33, 35))); + tree.insert(Arc::new(MockItem::new(30, 40))); + + assert_search(&tree, 30, &["30-40"]); + assert_search(&tree, 31, &["30-40", "31-39"]); + assert_search(&tree, 32, &["30-40", "32-34", "31-39"]); + assert_search(&tree, 33, &["30-40", "32-34", "33-35", "31-39"]); + assert_search(&tree, 34, &["30-40", "33-35", "31-39"]); + assert_search(&tree, 35, &["30-40", "31-39"]); + assert_search(&tree, 36, &["30-40", "31-39"]); + assert_search(&tree, 37, &["30-40", "31-39"]); + assert_search(&tree, 38, &["30-40", "31-39"]); + assert_search(&tree, 39, &["30-40"]); + assert_search(&tree, 40, &["30-40"]); + assert_search(&tree, 41, &["30-40"]); + } + + #[test] + fn test_interval_tree_duplicates() { + let mut tree: IntervalTree = IntervalTree::default(); + + // Duplicate keys + let item_a = Arc::new(MockItem::new_str(55, 56, "a")); + tree.insert(Arc::clone(&item_a)); + let item_b = Arc::new(MockItem::new_str(55, 56, "b")); + tree.insert(Arc::clone(&item_b)); + let item_c = Arc::new(MockItem::new_str(55, 56, "c")); + tree.insert(Arc::clone(&item_c)); + let item_d = Arc::new(MockItem::new_str(54, 56, "d")); + tree.insert(Arc::clone(&item_d)); + let item_e = Arc::new(MockItem::new_str(55, 57, "e")); + tree.insert(Arc::clone(&item_e)); + + dump_tree(&tree); + + assert_search( + &tree, + 55, + &["55-56: a", "55-56: b", "55-56: c", "54-56: d", "55-57: e"], + ); + tree.remove(&item_b); + dump_tree(&tree); + + assert_contents(&tree, &["54-56: d", "55-56: a", "55-56: c", "55-57: e"]); + + tree.remove(&item_d); + dump_tree(&tree); + assert_contents(&tree, &["55-56: a", "55-56: c", "55-57: e"]); + } + + #[test] + #[should_panic] + fn test_interval_tree_insert_twice() { + let mut tree: IntervalTree = IntervalTree::default(); + + // Inserting the same item twice is not cool + let item = Arc::new(MockItem::new(1, 2)); + tree.insert(Arc::clone(&item)); + tree.insert(Arc::clone(&item)); // fails assertion + } +} diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index 5f89e249bf..c5643e896b 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -9,13 +9,14 @@ //! new image and delta layers and corresponding files are written to disk. //! +use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree}; use crate::layered_repository::storage_layer::{Layer, SegmentTag}; use crate::layered_repository::InMemoryLayer; use crate::relish::*; use anyhow::Result; use lazy_static::lazy_static; use std::cmp::Ordering; -use std::collections::{BTreeMap, BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap}; use std::sync::Arc; use zenith_metrics::{register_int_gauge, IntGauge}; use zenith_utils::lsn::Lsn; @@ -78,6 +79,13 @@ impl LayerMap { segentry.update_open(Arc::clone(&layer)); + let oldest_pending_lsn = layer.get_oldest_pending_lsn(); + + // After a crash and restart, 'oldest_pending_lsn' of the oldest in-memory + // layer becomes the WAL streaming starting point, so it better not point + // in the middle of a WAL record. + assert!(oldest_pending_lsn.is_aligned()); + // Also add it to the binary heap let open_layer_entry = OpenLayerEntry { oldest_pending_lsn: layer.get_oldest_pending_lsn(), @@ -124,12 +132,11 @@ impl LayerMap { /// /// This should be called when the corresponding file on disk has been deleted. /// - pub fn remove_historic(&mut self, layer: &dyn Layer) { + pub fn remove_historic(&mut self, layer: Arc) { let tag = layer.get_seg_tag(); - let start_lsn = layer.get_start_lsn(); if let Some(segentry) = self.segs.get_mut(&tag) { - segentry.historic.remove(&start_lsn); + segentry.historic.remove(&layer); } NUM_ONDISK_LAYERS.dec(); } @@ -147,7 +154,7 @@ impl LayerMap { if (request_rel.spcnode == 0 || reltag.spcnode == request_rel.spcnode) && (request_rel.dbnode == 0 || reltag.dbnode == request_rel.dbnode) { - if let Some(exists) = segentry.exists_at_lsn(lsn) { + if let Some(exists) = segentry.exists_at_lsn(lsn)? { rels.insert(seg.rel, exists); } } @@ -155,7 +162,7 @@ impl LayerMap { } _ => { if tag == None { - if let Some(exists) = segentry.exists_at_lsn(lsn) { + if let Some(exists) = segentry.exists_at_lsn(lsn)? { rels.insert(seg.rel, exists); } } @@ -183,12 +190,12 @@ impl LayerMap { /// used for garbage collection, to determine if some alive layer /// exists at the lsn. If so, we shouldn't delete a newer dropped layer /// to avoid incorrectly making it visible. - pub fn layer_exists_at_lsn(&self, seg: SegmentTag, lsn: Lsn) -> bool { - if let Some(segentry) = self.segs.get(&seg) { - segentry.exists_at_lsn(lsn).unwrap_or(false) + pub fn layer_exists_at_lsn(&self, seg: SegmentTag, lsn: Lsn) -> Result { + Ok(if let Some(segentry) = self.segs.get(&seg) { + segentry.exists_at_lsn(lsn)?.unwrap_or(false) } else { false - } + }) } /// Return the oldest in-memory layer, along with its generation number. @@ -208,7 +215,7 @@ impl LayerMap { pub fn iter_historic_layers(&self) -> HistoricLayerIter { HistoricLayerIter { - segiter: self.segs.iter(), + seg_iter: self.segs.iter(), iter: None, } } @@ -222,7 +229,7 @@ impl LayerMap { open.dump()?; } - for (_, layer) in segentry.historic.iter() { + for layer in segentry.historic.iter() { layer.dump()?; } } @@ -231,34 +238,40 @@ impl LayerMap { } } +impl IntervalItem for dyn Layer { + type Key = Lsn; + + fn start_key(&self) -> Lsn { + self.get_start_lsn() + } + fn end_key(&self) -> Lsn { + self.get_end_lsn() + } +} + /// /// Per-segment entry in the LayerMap::segs hash map. Holds all the layers /// associated with the segment. /// /// The last layer that is open for writes is always an InMemoryLayer, /// and is kept in a separate field, because there can be only one for -/// each segment. The older layers, stored on disk, are kept in a -/// BTreeMap keyed by the layer's start LSN. +/// each segment. The older layers, stored on disk, are kept in an +/// IntervalTree. #[derive(Default)] struct SegEntry { - pub open: Option>, - pub historic: BTreeMap>, + open: Option>, + historic: IntervalTree, } impl SegEntry { /// Does the segment exist at given LSN? /// Return None if object is not found in this SegEntry. - fn exists_at_lsn(&self, lsn: Lsn) -> Option { - if let Some(layer) = &self.open { - if layer.get_start_lsn() <= lsn && lsn <= layer.get_end_lsn() { - let exists = layer.get_seg_exists(lsn).ok()?; - return Some(exists); - } - } else if let Some((_, layer)) = self.historic.range(..=lsn).next_back() { - let exists = layer.get_seg_exists(lsn).ok()?; - return Some(exists); + fn exists_at_lsn(&self, lsn: Lsn) -> Result> { + if let Some(layer) = self.get(lsn) { + Ok(Some(layer.get_seg_exists(lsn)?)) + } else { + Ok(None) } - None } pub fn get(&self, lsn: Lsn) -> Option> { @@ -269,29 +282,16 @@ impl SegEntry { } } - if let Some((_start_lsn, layer)) = self.historic.range(..=lsn).next_back() { - Some(Arc::clone(layer)) - } else { - None - } + self.historic.search(lsn) } pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool { // We only check on-disk layers, because // in-memory layers are not durable - for (_newer_lsn, layer) in self.historic.range(lsn..) { - // Ignore incremental layers. - if layer.is_incremental() { - continue; - } - if layer.get_end_lsn() > lsn { - return true; - } else { - continue; - } - } - false + self.historic + .iter_newer(lsn) + .any(|layer| !layer.is_incremental()) } // Set new open layer for a SegEntry. @@ -305,9 +305,7 @@ impl SegEntry { } pub fn insert_historic(&mut self, layer: Arc) { - let start_lsn = layer.get_start_lsn(); - - self.historic.insert(start_lsn, layer); + self.historic.insert(layer); } } @@ -346,8 +344,8 @@ impl Eq for OpenLayerEntry {} /// Iterator returned by LayerMap::iter_historic_layers() pub struct HistoricLayerIter<'a> { - segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, - iter: Option>>, + seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, + iter: Option>, } impl<'a> Iterator for HistoricLayerIter<'a> { @@ -357,11 +355,11 @@ impl<'a> Iterator for HistoricLayerIter<'a> { loop { if let Some(x) = &mut self.iter { if let Some(x) = x.next() { - return Some(Arc::clone(&*x.1)); + return Some(Arc::clone(&x)); } } - if let Some(seg) = self.segiter.next() { - self.iter = Some(seg.1.historic.iter()); + if let Some((_tag, segentry)) = self.seg_iter.next() { + self.iter = Some(segentry.historic.iter()); continue; } else { return None; @@ -416,14 +414,14 @@ mod tests { let mut layers = LayerMap::default(); let gen1 = layers.increment_generation(); - layers.insert_open(dummy_inmem_layer(conf, 0, Lsn(100), Lsn(100))); - layers.insert_open(dummy_inmem_layer(conf, 1, Lsn(100), Lsn(200))); - layers.insert_open(dummy_inmem_layer(conf, 2, Lsn(100), Lsn(120))); - layers.insert_open(dummy_inmem_layer(conf, 3, Lsn(100), Lsn(110))); + layers.insert_open(dummy_inmem_layer(conf, 0, Lsn(0x100), Lsn(0x100))); + layers.insert_open(dummy_inmem_layer(conf, 1, Lsn(0x100), Lsn(0x200))); + layers.insert_open(dummy_inmem_layer(conf, 2, Lsn(0x100), Lsn(0x120))); + layers.insert_open(dummy_inmem_layer(conf, 3, Lsn(0x100), Lsn(0x110))); let gen2 = layers.increment_generation(); - layers.insert_open(dummy_inmem_layer(conf, 4, Lsn(100), Lsn(110))); - layers.insert_open(dummy_inmem_layer(conf, 5, Lsn(100), Lsn(100))); + layers.insert_open(dummy_inmem_layer(conf, 4, Lsn(0x100), Lsn(0x110))); + layers.insert_open(dummy_inmem_layer(conf, 5, Lsn(0x100), Lsn(0x100))); // A helper function (closure) to pop the next oldest open entry from the layer map, // and assert that it is what we'd expect @@ -434,12 +432,12 @@ mod tests { layers.pop_oldest_open(); }; - assert_pop_layer(0, gen1); // 100 - assert_pop_layer(5, gen2); // 100 - assert_pop_layer(3, gen1); // 110 - assert_pop_layer(4, gen2); // 110 - assert_pop_layer(2, gen1); // 120 - assert_pop_layer(1, gen1); // 200 + assert_pop_layer(0, gen1); // 0x100 + assert_pop_layer(5, gen2); // 0x100 + assert_pop_layer(3, gen1); // 0x110 + assert_pop_layer(4, gen2); // 0x110 + assert_pop_layer(2, gen1); // 0x120 + assert_pop_layer(1, gen1); // 0x200 Ok(()) } diff --git a/test_runner/batch_others/test_old_request_lsn.py b/test_runner/batch_others/test_old_request_lsn.py index fafa37fe5a..bb28bdd83f 100644 --- a/test_runner/batch_others/test_old_request_lsn.py +++ b/test_runner/batch_others/test_old_request_lsn.py @@ -46,7 +46,7 @@ def test_old_request_lsn(zenith_cli, pageserver: ZenithPageserver, postgres: Pos from pg_settings where name = 'shared_buffers' ''') row = cur.fetchone() - print("shared_buffers is {}, table size {}", row[0], row[1]); + print(f'shared_buffers is {row[0]}, table size {row[1]}'); assert int(row[0]) < int(row[1]) cur.execute('VACUUM foo'); diff --git a/zenith_utils/src/accum.rs b/zenith_utils/src/accum.rs new file mode 100644 index 0000000000..d3ad61e514 --- /dev/null +++ b/zenith_utils/src/accum.rs @@ -0,0 +1,33 @@ +/// A helper to "accumulate" a value similar to `Iterator::reduce`, but lets you +/// feed the accumulated values by calling the 'accum' function, instead of having an +/// iterator. +/// +/// For example, to calculate the smallest value among some integers: +/// +/// ``` +/// use zenith_utils::accum::Accum; +/// +/// let values = [1, 2, 3]; +/// +/// let mut min_value: Accum = Accum(None); +/// for new_value in &values { +/// min_value.accum(std::cmp::min, *new_value); +/// } +/// +/// assert_eq!(min_value.0.unwrap(), 1); +/// ``` +pub struct Accum(pub Option); +impl Accum { + pub fn accum(&mut self, func: F, new_value: T) + where + F: FnOnce(T, T) -> T, + { + // If there is no previous value, just store the new value. + // Otherwise call the function to decide which one to keep. + self.0 = Some(if let Some(accum) = self.0 { + func(accum, new_value) + } else { + new_value + }); + } +} diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 302069494c..142c65e6f1 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -31,3 +31,6 @@ pub mod sock_split; // common log initialisation routine pub mod logging; + +// Misc +pub mod accum;