diff --git a/Cargo.lock b/Cargo.lock index 9ed9a21dab..ac4cc5d4dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1193,6 +1193,7 @@ dependencies = [ "hyper", "lazy_static", "log", + "nix", "postgres", "postgres-protocol", "postgres-types", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index bdc85d88dc..0525bda254 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ async-trait = "0.1" const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } +nix = "0.23" postgres_ffi = { path = "../postgres_ffi" } zenith_metrics = { path = "../zenith_metrics" } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 6f201d83a7..fd4131b5d6 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -66,7 +66,7 @@ use delta_layer::DeltaLayer; use image_layer::ImageLayer; use inmemory_layer::InMemoryLayer; -use layer_map::LayerMap; +use layer_map::{LayerId, LayerMap}; use storage_layer::{ Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE, }; @@ -1278,7 +1278,7 @@ impl LayeredTimeline { let mut created_historics = false; let mut layer_uploads = Vec::new(); - while let Some((oldest_layer, oldest_generation)) = layers.peek_oldest_open() { + while let Some((oldest_layer_id, oldest_layer, oldest_generation)) = layers.peek_oldest_open() { let oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn(); if tenant_mgr::shutdown_requested() && !forced { @@ -1315,8 +1315,8 @@ impl LayeredTimeline { // The layer is no longer open, update the layer map to reflect this. // We will replace it with on-disk historics below. - layers.pop_oldest_open(); - layers.insert_historic(oldest_layer.clone()); + layers.remove(oldest_layer_id); + let oldest_layer_id = layers.insert_historic(oldest_layer.clone()); // Write the now-frozen layer to disk. That could take a while, so release the lock while do it drop(layers); @@ -1332,7 +1332,7 @@ impl LayeredTimeline { } // Finally, replace the frozen in-memory layer with the new on-disk layers - layers.remove_historic(oldest_layer); + layers.remove(oldest_layer_id); // Add the historics to the LayerMap for delta_layer in new_historics.delta_layers { @@ -1348,7 +1348,7 @@ impl LayeredTimeline { // Call unload() on all frozen layers, to release memory. // This shouldn't be much memory, as only metadata is slurped // into memory. - for layer in layers.iter_historic_layers() { + for (_layer_id, layer) in layers.iter_historic_layers() { layer.unload()?; } @@ -1440,7 +1440,7 @@ impl LayeredTimeline { debug!("retain_lsns: {:?}", retain_lsns); - let mut layers_to_remove: Vec> = Vec::new(); + let mut layers_to_remove: Vec = Vec::new(); // Scan all on-disk layers in the timeline. // @@ -1451,7 +1451,7 @@ impl LayeredTimeline { // 4. this layer doesn't serve as a tombstone for some older layer; // let mut layers = self.layers.lock().unwrap(); - 'outer: for l in layers.iter_historic_layers() { + 'outer: for (layer_id, l) in layers.iter_historic_layers() { let seg = l.get_seg_tag(); if seg.rel.is_relation() { @@ -1593,16 +1593,16 @@ impl LayeredTimeline { l.get_end_lsn(), l.is_dropped() ); - layers_to_remove.push(Arc::clone(&l)); + layers_to_remove.push(layer_id); } // Actually delete the layers from disk and remove them from the map. // (couldn't do this in the loop above, because you cannot modify a collection // while iterating it. BTreeMap::retain() would be another option) - for doomed_layer in layers_to_remove { + for doomed_layer_id in layers_to_remove { + let doomed_layer = layers.get_with_id(doomed_layer_id); doomed_layer.delete()?; - layers.remove_historic(doomed_layer.clone()); - + layers.remove(doomed_layer_id); match ( doomed_layer.is_dropped(), doomed_layer.get_seg_tag().rel.is_relation(), diff --git a/pageserver/src/layered_repository/interval_tree.rs b/pageserver/src/layered_repository/interval_tree.rs index 978ecd837e..97c35b76a1 100644 --- a/pageserver/src/layered_repository/interval_tree.rs +++ b/pageserver/src/layered_repository/interval_tree.rs @@ -41,23 +41,22 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::ops::Range; -use std::sync::Arc; -pub struct IntervalTree +pub struct IntervalTree where I: IntervalItem, { points: BTreeMap>, } -struct Point { +struct Point { /// All intervals that contain this point, in no particular order. /// /// We assume that there aren't a lot of overlappingg intervals, so that this vector /// never grows very large. If that assumption doesn't hold, we could keep this ordered /// by the end bound, to speed up `search`. But as long as there are only a few elements, /// a linear search is OK. - elements: Vec>, + elements: Vec, } /// Abstraction for an interval that can be stored in the tree @@ -75,14 +74,14 @@ pub trait IntervalItem { } } -impl IntervalTree +impl IntervalTree where - I: IntervalItem, + I: IntervalItem + PartialEq + Clone, { /// Return an element that contains 'key', or precedes it. /// /// If there are multiple candidates, returns the one with the highest 'end' key. - pub fn search(&self, key: I::Key) -> Option> { + pub fn search(&self, key: I::Key) -> Option<&I> { // Find the greatest point that precedes or is equal to the search key. If there is // none, returns None. let (_, p) = self.points.range(..=key).next_back()?; @@ -100,7 +99,7 @@ where } }) .unwrap(); - Some(Arc::clone(highest_item)) + Some(highest_item) } /// Iterate over all items with start bound >= 'key' @@ -119,7 +118,7 @@ where } } - pub fn insert(&mut self, item: Arc) { + pub fn insert(&mut self, item: &I) { let start_key = item.start_key(); let end_key = item.end_key(); assert!(start_key < end_key); @@ -133,18 +132,18 @@ where found_start_point = true; // It is an error to insert the same item to the tree twice. assert!( - !point.elements.iter().any(|x| Arc::ptr_eq(x, &item)), + !point.elements.iter().any(|x| x == item), "interval is already in the tree" ); } - point.elements.push(Arc::clone(&item)); + point.elements.push(item.clone()); } if !found_start_point { // Create a new Point for the starting point // Look at the previous point, and copy over elements that overlap with this // new point - let mut new_elements: Vec> = Vec::new(); + let mut new_elements: Vec = Vec::new(); if let Some((_, prev_point)) = self.points.range(..start_key).next_back() { let overlapping_prev_elements = prev_point .elements @@ -154,7 +153,7 @@ where new_elements.extend(overlapping_prev_elements); } - new_elements.push(item); + new_elements.push(item.clone()); let new_point = Point { elements: new_elements, @@ -163,7 +162,7 @@ where } } - pub fn remove(&mut self, item: &Arc) { + pub fn remove(&mut self, item: &I) { // range search points let start_key = item.start_key(); let end_key = item.end_key(); @@ -176,7 +175,7 @@ where found_start_point = true; } let len_before = point.elements.len(); - point.elements.retain(|other| !Arc::ptr_eq(other, item)); + point.elements.retain(|other| other != item); let len_after = point.elements.len(); assert_eq!(len_after + 1, len_before); if len_after == 0 { @@ -191,19 +190,19 @@ where } } -pub struct IntervalIter<'a, I: ?Sized> +pub struct IntervalIter<'a, I> where I: IntervalItem, { point_iter: std::collections::btree_map::Range<'a, I::Key, Point>, - elem_iter: Option<(I::Key, std::slice::Iter<'a, Arc>)>, + elem_iter: Option<(I::Key, std::slice::Iter<'a, I>)>, } impl<'a, I> Iterator for IntervalIter<'a, I> where - I: IntervalItem + ?Sized, + I: IntervalItem, { - type Item = Arc; + type Item = &'a I; fn next(&mut self) -> Option { // Iterate over all elements in all the points in 'point_iter'. To avoid @@ -214,7 +213,7 @@ where if let Some((point_key, elem_iter)) = &mut self.elem_iter { for elem in elem_iter { if elem.start_key() == *point_key { - return Some(Arc::clone(elem)); + return Some(elem); } } } @@ -230,7 +229,7 @@ where } } -impl Default for IntervalTree +impl Default for IntervalTree where I: IntervalItem, { @@ -246,7 +245,7 @@ mod tests { use super::*; use std::fmt; - #[derive(Debug)] + #[derive(Debug, Clone, PartialEq)] struct MockItem { start_key: u32, end_key: u32, @@ -288,7 +287,7 @@ mod tests { tree: &IntervalTree, key: u32, expected: &[&str], - ) -> Option> { + ) -> Option { if let Some(v) = tree.search(key) { let vstr = v.to_string(); @@ -299,7 +298,7 @@ mod tests { key, v, expected, ); - Some(v) + Some(v.clone()) } else { assert!( expected.is_empty(), @@ -331,12 +330,12 @@ mod tests { let mut tree: IntervalTree = IntervalTree::default(); // Simple, non-overlapping ranges. - tree.insert(Arc::new(MockItem::new(10, 11))); - tree.insert(Arc::new(MockItem::new(11, 12))); - tree.insert(Arc::new(MockItem::new(12, 13))); - tree.insert(Arc::new(MockItem::new(18, 19))); - tree.insert(Arc::new(MockItem::new(17, 18))); - tree.insert(Arc::new(MockItem::new(15, 16))); + tree.insert(&MockItem::new(10, 11)); + tree.insert(&MockItem::new(11, 12)); + tree.insert(&MockItem::new(12, 13)); + tree.insert(&MockItem::new(18, 19)); + tree.insert(&MockItem::new(17, 18)); + tree.insert(&MockItem::new(15, 16)); assert_search(&tree, 9, &[]); assert_search(&tree, 10, &["10-11"]); @@ -370,13 +369,13 @@ mod tests { let mut tree: IntervalTree = IntervalTree::default(); // Overlapping items - tree.insert(Arc::new(MockItem::new(22, 24))); - tree.insert(Arc::new(MockItem::new(23, 25))); - let x24_26 = Arc::new(MockItem::new(24, 26)); - tree.insert(Arc::clone(&x24_26)); - let x26_28 = Arc::new(MockItem::new(26, 28)); - tree.insert(Arc::clone(&x26_28)); - tree.insert(Arc::new(MockItem::new(25, 27))); + tree.insert(&MockItem::new(22, 24)); + tree.insert(&MockItem::new(23, 25)); + let x24_26 = MockItem::new(24, 26); + tree.insert(&x24_26); + let x26_28 = MockItem::new(26, 28); + tree.insert(&x26_28); + tree.insert(&MockItem::new(25, 27)); assert_search(&tree, 22, &["22-24"]); assert_search(&tree, 23, &["22-24", "23-25"]); @@ -403,10 +402,10 @@ mod tests { let mut tree: IntervalTree = IntervalTree::default(); // Items containing other items - tree.insert(Arc::new(MockItem::new(31, 39))); - tree.insert(Arc::new(MockItem::new(32, 34))); - tree.insert(Arc::new(MockItem::new(33, 35))); - tree.insert(Arc::new(MockItem::new(30, 40))); + tree.insert(&MockItem::new(31, 39)); + tree.insert(&MockItem::new(32, 34)); + tree.insert(&MockItem::new(33, 35)); + tree.insert(&MockItem::new(30, 40)); assert_search(&tree, 30, &["30-40"]); assert_search(&tree, 31, &["30-40", "31-39"]); @@ -427,16 +426,16 @@ mod tests { let mut tree: IntervalTree = IntervalTree::default(); // Duplicate keys - let item_a = Arc::new(MockItem::new_str(55, 56, "a")); - tree.insert(Arc::clone(&item_a)); - let item_b = Arc::new(MockItem::new_str(55, 56, "b")); - tree.insert(Arc::clone(&item_b)); - let item_c = Arc::new(MockItem::new_str(55, 56, "c")); - tree.insert(Arc::clone(&item_c)); - let item_d = Arc::new(MockItem::new_str(54, 56, "d")); - tree.insert(Arc::clone(&item_d)); - let item_e = Arc::new(MockItem::new_str(55, 57, "e")); - tree.insert(Arc::clone(&item_e)); + let item_a = MockItem::new_str(55, 56, "a"); + tree.insert(&item_a); + let item_b = MockItem::new_str(55, 56, "b"); + tree.insert(&item_b); + let item_c = MockItem::new_str(55, 56, "c"); + tree.insert(&item_c); + let item_d = MockItem::new_str(54, 56, "d"); + tree.insert(&item_d); + let item_e = MockItem::new_str(55, 57, "e"); + tree.insert(&item_e); dump_tree(&tree); @@ -461,8 +460,8 @@ mod tests { let mut tree: IntervalTree = IntervalTree::default(); // Inserting the same item twice is not cool - let item = Arc::new(MockItem::new(1, 2)); - tree.insert(Arc::clone(&item)); - tree.insert(Arc::clone(&item)); // fails assertion + let item = MockItem::new(1, 2); + tree.insert(&item); + tree.insert(&item); // fails assertion } } diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index c5643e896b..7d3027de2a 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -9,6 +9,22 @@ //! new image and delta layers and corresponding files are written to disk. //! + + +// +// Global layer registry: +// +// Every layer is inserted into the global registry, and assigned an ID +// +// The global registry tracks memory usage and usage count for each layer +// +// +// In addition to that, there is a per-timeline LayerMap, used for lookups +// +// + + + use crate::layered_repository::interval_tree::{IntervalItem, IntervalIter, IntervalTree}; use crate::layered_repository::storage_layer::{Layer, SegmentTag}; use crate::layered_repository::InMemoryLayer; @@ -17,7 +33,7 @@ use anyhow::Result; use lazy_static::lazy_static; use std::cmp::Ordering; use std::collections::{BinaryHeap, HashMap}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use zenith_metrics::{register_int_gauge, IntGauge}; use zenith_utils::lsn::Lsn; @@ -28,8 +44,92 @@ lazy_static! { static ref NUM_ONDISK_LAYERS: IntGauge = register_int_gauge!("pageserver_ondisk_layers", "Number of layers on-disk") .expect("failed to define a metric"); + + // Global layer map + static ref LAYERS: Mutex = Mutex::new(GlobalLayerMap::new()); } +const MAX_LOADED_LAYERS: usize = 10; + +#[derive(Clone)] +enum GlobalLayerEntry { + InMemory(Arc), + Historic(Arc), +} + +struct GlobalLayerMap { + layers: HashMap, + last_id: u64, + + // Layers currently loaded. We run a clock algorithm across these. + loaded_layers: Vec, +} + +impl GlobalLayerMap { + pub fn new() -> GlobalLayerMap { + GlobalLayerMap { + layers: HashMap::new(), + last_id: 0, + loaded_layers: Vec::new(), + } + } + + pub fn get(&mut self, layer_id: LayerId) -> Arc { + + match self.layers.get(&layer_id) { + Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(), + Some(GlobalLayerEntry::Historic(layer)) => layer.clone(), + None => panic!() + } + } + + pub fn get_open(&mut self, layer_id: LayerId) -> Arc { + match self.layers.get(&layer_id) { + Some(GlobalLayerEntry::InMemory(layer)) => layer.clone(), + Some(GlobalLayerEntry::Historic(_layer)) => panic!(), + None => panic!() + } + } + + pub fn insert_open(&mut self, layer: Arc) -> LayerId { + let layer_id = LayerId(self.last_id); + self.last_id += 1; + + self.layers.insert(layer_id, GlobalLayerEntry::InMemory(layer)); + + layer_id + } + + pub fn insert_historic(&mut self, layer: Arc) -> LayerId { + let layer_id = LayerId(self.last_id); + self.last_id += 1; + + self.layers.insert(layer_id, GlobalLayerEntry::Historic(layer)); + + layer_id + } + + pub fn remove(&mut self, layer_id: LayerId) -> GlobalLayerEntry { + if let Some(entry) = self.layers.remove(&layer_id) { + let orig_entry = entry.clone(); + match orig_entry { + GlobalLayerEntry::InMemory(_layer) => { + NUM_INMEMORY_LAYERS.dec(); + }, + GlobalLayerEntry::Historic(_layer) => { + NUM_ONDISK_LAYERS.dec(); + } + } + entry.clone() + } else { + panic!() + } + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub struct LayerId(u64); + /// /// LayerMap tracks what layers exist on a timeline. /// @@ -41,7 +141,7 @@ pub struct LayerMap { /// All in-memory layers, ordered by 'oldest_pending_lsn' and generation /// of each layer. This allows easy access to the in-memory layer that /// contains the oldest WAL record. - open_layers: BinaryHeap, + open_layers: BinaryHeap, /// Generation number, used to distinguish newly inserted entries in the /// binary heap from older entries during checkpoint. @@ -61,6 +161,11 @@ impl LayerMap { segentry.get(lsn) } + pub fn get_with_id(&self, layer_id: LayerId) -> Arc { + // TODO: check that it belongs to this tenant+timeline + LAYERS.lock().unwrap().get(layer_id) + } + /// /// Get the open layer for given segment for writing. Or None if no open /// layer exists. @@ -68,16 +173,24 @@ impl LayerMap { pub fn get_open(&self, tag: &SegmentTag) -> Option> { let segentry = self.segs.get(tag)?; - segentry.open.as_ref().map(Arc::clone) + + if let Some((layer_id, _start_lsn)) = segentry.open { + Some(LAYERS.lock().unwrap().get_open(layer_id)) + } else { + None + } } /// /// Insert an open in-memory layer /// pub fn insert_open(&mut self, layer: Arc) { + + let layer_id = LAYERS.lock().unwrap().insert_open(Arc::clone(&layer)); + let segentry = self.segs.entry(layer.get_seg_tag()).or_default(); - segentry.update_open(Arc::clone(&layer)); + segentry.update_open(layer_id, layer.get_start_lsn()); let oldest_pending_lsn = layer.get_oldest_pending_lsn(); @@ -87,9 +200,9 @@ impl LayerMap { assert!(oldest_pending_lsn.is_aligned()); // Also add it to the binary heap - let open_layer_entry = OpenLayerEntry { + let open_layer_entry = OpenLayerHeapEntry { oldest_pending_lsn: layer.get_oldest_pending_lsn(), - layer, + layer_id, generation: self.current_generation, }; self.open_layers.push(open_layer_entry); @@ -98,47 +211,46 @@ impl LayerMap { } /// Remove the oldest in-memory layer - pub fn pop_oldest_open(&mut self) { - // Pop it from the binary heap - let oldest_entry = self.open_layers.pop().unwrap(); - let segtag = oldest_entry.layer.get_seg_tag(); + pub fn remove(&mut self, layer_id: LayerId) { + let layer_entry = LAYERS.lock().unwrap().remove(layer_id); // Also remove it from the SegEntry of this segment - let mut segentry = self.segs.get_mut(&segtag).unwrap(); - if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) { - segentry.open = None; - } else { - // We could have already updated segentry.open for - // dropped (non-writeable) layer. This is fine. - assert!(!oldest_entry.layer.is_writeable()); - assert!(oldest_entry.layer.is_dropped()); - } + match layer_entry { + GlobalLayerEntry::InMemory(layer) => { + let tag = layer.get_seg_tag(); - NUM_INMEMORY_LAYERS.dec(); + if let Some(segentry) = self.segs.get_mut(&tag) { + segentry.historic.remove(&HistoricLayerIntervalTreeEntry::new(layer_id, layer)); + } + } + GlobalLayerEntry::Historic(layer) => { + let segtag = layer.get_seg_tag(); + let mut segentry = self.segs.get_mut(&segtag).unwrap(); + if let Some(open) = segentry.open { + if open.0 == layer_id { + segentry.open = None; + } + } else { + // We could have already updated segentry.open for + // dropped (non-writeable) layer. This is fine. + //assert!(!layer.is_writeable()); + //assert!(layer.is_dropped()); + } + } + } } /// /// Insert an on-disk layer /// - pub fn insert_historic(&mut self, layer: Arc) { + pub fn insert_historic(&mut self, layer: Arc) -> LayerId { + + let layer_id = LAYERS.lock().unwrap().insert_historic(Arc::clone(&layer)); + let segentry = self.segs.entry(layer.get_seg_tag()).or_default(); - segentry.insert_historic(layer); + segentry.insert_historic(layer_id, layer); - NUM_ONDISK_LAYERS.inc(); - } - - /// - /// Remove an on-disk layer from the map. - /// - /// This should be called when the corresponding file on disk has been deleted. - /// - pub fn remove_historic(&mut self, layer: Arc) { - let tag = layer.get_seg_tag(); - - if let Some(segentry) = self.segs.get_mut(&tag) { - segentry.historic.remove(&layer); - } - NUM_ONDISK_LAYERS.dec(); + layer_id } // List relations along with a flag that marks if they exist at the given lsn. @@ -199,10 +311,15 @@ impl LayerMap { } /// Return the oldest in-memory layer, along with its generation number. - pub fn peek_oldest_open(&self) -> Option<(Arc, u64)> { - self.open_layers - .peek() - .map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation)) + pub fn peek_oldest_open(&self) -> Option<(LayerId, Arc, u64)> { + + if let Some(oldest_entry) = self.open_layers.peek() { + Some((oldest_entry.layer_id, + LAYERS.lock().unwrap().get_open(oldest_entry.layer_id), + oldest_entry.generation)) + } else { + None + } } /// Increment the generation number used to stamp open in-memory layers. Layers @@ -220,6 +337,7 @@ impl LayerMap { } } + /* /// debugging function to print out the contents of the layer map #[allow(unused)] pub fn dump(&self) -> Result<()> { @@ -236,16 +354,39 @@ impl LayerMap { println!("End dump LayerMap"); Ok(()) } +*/ } -impl IntervalItem for dyn Layer { +#[derive(Clone)] +struct HistoricLayerIntervalTreeEntry { + layer_id: LayerId, + start_lsn: Lsn, + end_lsn: Lsn, +} + +impl HistoricLayerIntervalTreeEntry { + fn new(layer_id: LayerId, layer: Arc) -> HistoricLayerIntervalTreeEntry{ + HistoricLayerIntervalTreeEntry { + layer_id, + start_lsn: layer.get_start_lsn(), + end_lsn: layer.get_end_lsn(), + } + } +} + +impl PartialEq for HistoricLayerIntervalTreeEntry { + fn eq(&self, other: &Self) -> bool { + self.layer_id == other.layer_id + } +} +impl IntervalItem for HistoricLayerIntervalTreeEntry { type Key = Lsn; fn start_key(&self) -> Lsn { - self.get_start_lsn() + self.start_lsn } fn end_key(&self) -> Lsn { - self.get_end_lsn() + self.end_lsn } } @@ -259,8 +400,8 @@ impl IntervalItem for dyn Layer { /// IntervalTree. #[derive(Default)] struct SegEntry { - open: Option>, - historic: IntervalTree, + open: Option<(LayerId, Lsn)>, + historic: IntervalTree, } impl SegEntry { @@ -276,13 +417,17 @@ impl SegEntry { pub fn get(&self, lsn: Lsn) -> Option> { if let Some(open) = &self.open { - if open.get_start_lsn() <= lsn { - let x: Arc = Arc::clone(open) as _; - return Some(x); + let layer = LAYERS.lock().unwrap().get(open.0); + if layer.get_start_lsn() <= lsn { + return Some(layer); } } - self.historic.search(lsn) + if let Some(historic) = self.historic.search(lsn) { + Some(LAYERS.lock().unwrap().get(historic.layer_id)) + } else { + None + } } pub fn newer_image_layer_exists(&self, lsn: Lsn) -> bool { @@ -291,21 +436,25 @@ impl SegEntry { self.historic .iter_newer(lsn) - .any(|layer| !layer.is_incremental()) + .any(|e| { + let layer = LAYERS.lock().unwrap().get(e.layer_id); + !layer.is_incremental() + } + ) } // Set new open layer for a SegEntry. // It's ok to rewrite previous open layer, // but only if it is not writeable anymore. - pub fn update_open(&mut self, layer: Arc) { - if let Some(prev_open) = &self.open { - assert!(!prev_open.is_writeable()); + pub fn update_open(&mut self, layer_id: LayerId, start_lsn: Lsn) { + if let Some(_prev_open) = &self.open { + //assert!(!prev_open.is_writeable()); } - self.open = Some(layer); + self.open = Some((layer_id, start_lsn)); } - pub fn insert_historic(&mut self, layer: Arc) { - self.historic.insert(layer); + pub fn insert_historic(&mut self, layer_id: LayerId, layer: Arc) { + self.historic.insert(&HistoricLayerIntervalTreeEntry::new(layer_id, layer)); } } @@ -315,12 +464,12 @@ impl SegEntry { /// The generation number associated with each entry can be used to distinguish /// recently-added entries (i.e after last call to increment_generation()) from older /// entries with the same 'oldest_pending_lsn'. -struct OpenLayerEntry { +struct OpenLayerHeapEntry { pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn() pub generation: u64, - pub layer: Arc, + pub layer_id: LayerId, } -impl Ord for OpenLayerEntry { +impl Ord for OpenLayerHeapEntry { fn cmp(&self, other: &Self) -> Ordering { // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here // to get that. Entries with identical oldest_pending_lsn are ordered by generation @@ -330,32 +479,33 @@ impl Ord for OpenLayerEntry { .then_with(|| other.generation.cmp(&self.generation)) } } -impl PartialOrd for OpenLayerEntry { +impl PartialOrd for OpenLayerHeapEntry { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl PartialEq for OpenLayerEntry { +impl PartialEq for OpenLayerHeapEntry { fn eq(&self, other: &Self) -> bool { self.cmp(other) == Ordering::Equal } } -impl Eq for OpenLayerEntry {} +impl Eq for OpenLayerHeapEntry {} /// Iterator returned by LayerMap::iter_historic_layers() pub struct HistoricLayerIter<'a> { seg_iter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>, - iter: Option>, + iter: Option>, } impl<'a> Iterator for HistoricLayerIter<'a> { - type Item = Arc; + type Item = (LayerId, Arc); fn next(&mut self) -> std::option::Option<::Item> { loop { if let Some(x) = &mut self.iter { if let Some(x) = x.next() { - return Some(Arc::clone(&x)); + let layer = LAYERS.lock().unwrap().get(x.layer_id); + return Some((x.layer_id, layer)); } } if let Some((_tag, segentry)) = self.seg_iter.next() { @@ -426,10 +576,10 @@ mod tests { // A helper function (closure) to pop the next oldest open entry from the layer map, // and assert that it is what we'd expect let mut assert_pop_layer = |expected_segno: u32, expected_generation: u64| { - let (l, generation) = layers.peek_oldest_open().unwrap(); + let (layer_id, l, generation) = layers.peek_oldest_open().unwrap(); assert!(l.get_seg_tag().segno == expected_segno); assert!(generation == expected_generation); - layers.pop_oldest_open(); + layers.remove(layer_id); }; assert_pop_layer(0, gen1); // 0x100 diff --git a/pageserver/src/vfd.rs b/pageserver/src/vfd.rs index 0c5855a30d..c7371b02cd 100644 --- a/pageserver/src/vfd.rs +++ b/pageserver/src/vfd.rs @@ -49,13 +49,10 @@ impl VirtualFile { if let Some(mut file) = l.files[self.vfd].file.take() { // return cached File - eprintln!("reusing {} from {}/{}", self.path.display(), self.vfd, self.tag); file.rewind()?; return Ok(file); } } - eprintln!("opening {}", self.path.display()); - File::open(&self.path) } @@ -85,8 +82,6 @@ impl VirtualFile { self.vfd = next; self.tag = l.files[next].tag; - eprintln!("caching {} at {}/{}", self.path.display(), self.vfd, self.tag); - drop(l); } } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index db45623ad4..25e991f9b9 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -28,20 +28,21 @@ use std::fs::OpenOptions; use std::io::prelude::*; use std::io::Error; use std::path::PathBuf; + +use std::process::{ChildStdin, ChildStdout, ChildStderr, Command}; use std::process::Stdio; use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use std::time::Instant; -use tokio::io::AsyncBufReadExt; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::process::{ChildStdin, ChildStdout, Command}; -use tokio::time::timeout; use zenith_metrics::{register_histogram, register_int_counter, Histogram, IntCounter}; use zenith_utils::bin_ser::BeSer; use zenith_utils::lsn::Lsn; use zenith_utils::zid::ZTenantId; +use std::os::unix::io::AsRawFd; +use nix::poll::*; + use crate::relish::*; use crate::repository::WALRecord; use crate::waldecoder::XlMultiXactCreate; @@ -140,7 +141,6 @@ pub struct PostgresRedoManager { tenantid: ZTenantId, conf: &'static PageServerConf, - runtime: tokio::runtime::Runtime, processes: Vec>>, next: AtomicUsize, } @@ -218,16 +218,12 @@ impl WalRedoManager for PostgresRedoManager { // launch the WAL redo process on first use if process_guard.is_none() { - let p = self - .runtime - .block_on(PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid))?; + let p = PostgresRedoProcess::launch(self.conf, process_no, &self.tenantid)?; *process_guard = Some(p); } let process = process_guard.as_mut().unwrap(); - result = self - .runtime - .block_on(self.handle_apply_request_postgres(process, &request)); + result = self.handle_apply_request_postgres(process, &request); WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64()); end_time = Instant::now(); @@ -243,14 +239,6 @@ impl PostgresRedoManager { /// Create a new PostgresRedoManager. /// pub fn new(conf: &'static PageServerConf, tenantid: ZTenantId) -> PostgresRedoManager { - // We block on waiting for requests on the walredo request channel, but - // use async I/O to communicate with the child process. Initialize the - // runtime for the async part. - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - let mut processes: Vec>> = Vec::new(); for _ in 1..10 { processes.push(Mutex::new(None)); @@ -258,7 +246,6 @@ impl PostgresRedoManager { // The actual process is launched lazily, on first request. PostgresRedoManager { - runtime, tenantid, conf, processes, @@ -269,7 +256,7 @@ impl PostgresRedoManager { /// /// Process one request for WAL redo using wal-redo postgres /// - async fn handle_apply_request_postgres( + fn handle_apply_request_postgres( &self, process: &mut PostgresRedoProcess, request: &WalRedoRequest, @@ -287,7 +274,7 @@ impl PostgresRedoManager { if let RelishTag::Relation(rel) = request.rel { // Relational WAL records are applied using wal-redo-postgres let buf_tag = BufferTag { rel, blknum }; - apply_result = process.apply_wal_records(buf_tag, base_img, records).await; + apply_result = process.apply_wal_records(buf_tag, base_img, records); let duration = start.elapsed(); @@ -480,13 +467,14 @@ impl PostgresRedoManager { struct PostgresRedoProcess { stdin: ChildStdin, stdout: ChildStdout, + stderr: ChildStderr, } impl PostgresRedoProcess { // // Start postgres binary in special WAL redo mode. // - async fn launch( + fn launch( conf: &PageServerConf, process_no: usize, tenantid: &ZTenantId, @@ -511,7 +499,6 @@ impl PostgresRedoProcess { .env("LD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir().to_str().unwrap()) .output() - .await .expect("failed to execute initdb"); if !initdb.status.success() { @@ -548,98 +535,106 @@ impl PostgresRedoProcess { datadir.display() ); - let stdin = child.stdin.take().expect("failed to open child's stdin"); - let stderr = child.stderr.take().expect("failed to open child's stderr"); - let stdout = child.stdout.take().expect("failed to open child's stdout"); - - // This async block reads the child's stderr, and forwards it to the logger - let f_stderr = async { - let mut stderr_buffered = tokio::io::BufReader::new(stderr); - - let mut line = String::new(); - loop { - let res = stderr_buffered.read_line(&mut line).await; - if res.is_err() { - debug!("could not convert line to utf-8"); - continue; - } - if res.unwrap() == 0 { - break; - } - error!("wal-redo-postgres: {}", line.trim()); - line.clear(); - } - Ok::<(), Error>(()) - }; - tokio::spawn(f_stderr); - - Ok(PostgresRedoProcess { stdin, stdout }) + let stdin = child.stdin.take().unwrap(); + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + Ok(PostgresRedoProcess { stdin, stdout, stderr }) } // // Apply given WAL records ('records') over an old page image. Returns // new page image. // - async fn apply_wal_records( + fn apply_wal_records( &mut self, tag: BufferTag, base_img: Option, records: &[(Lsn, WALRecord)], ) -> Result { - let stdout = &mut self.stdout; - // Buffer the writes to avoid a lot of small syscalls. - let mut stdin = tokio::io::BufWriter::new(&mut self.stdin); - // We do three things simultaneously: send the old base image and WAL records to - // the child process's stdin, read the result from child's stdout, and forward any logging - // information that the child writes to its stderr to the page server's log. - // - // 'f_stdin' handles writing the base image and WAL records to the child process. - // 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the - // tokio runtime in the 'launch' function already, forwards the logging. - let f_stdin = async { - // Send base image, if any. (If the record initializes the page, previous page - // version is not needed.) - let mut buf: Vec = Vec::new(); - build_begin_redo_for_block_msg(tag, &mut buf); - if let Some(img) = base_img { - build_push_page_msg(tag, &img, &mut buf); + // Send base image, if any. (If the record initializes the page, previous page + // version is not needed.) + let mut buf: Vec = Vec::new(); + build_begin_redo_for_block_msg(tag, &mut buf); + if let Some(img) = base_img { + build_push_page_msg(tag, &img, &mut buf); + } + + // Send WAL records. + for (lsn, rec) in records.iter() { + WAL_REDO_RECORD_COUNTER.inc(); + + build_apply_record_msg(*lsn, &rec.rec, &mut buf); + + //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", + // r.lsn >> 32, r.lsn & 0xffff_ffff); + } + //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", + // records.len(), lsn >> 32, lsn & 0xffff_ffff); + + // Send GetPage command to get the result back + build_get_page_msg(tag, &mut buf); + + + // The input is now in 'buf'. + + let mut nwrite = 0; + + let mut resultbuf = Vec::new(); + resultbuf.resize(8192, 0); + + let mut nresult = 0; + + let mut pollfds = [ + PollFd::new(self.stdout.as_raw_fd(), PollFlags::POLLIN), + PollFd::new(self.stderr.as_raw_fd(), PollFlags::POLLIN), + PollFd::new(self.stdin.as_raw_fd(), PollFlags::POLLOUT), + ]; + + // Do a blind write first + let n = self.stdin.write(&buf[nwrite..])?; + nwrite += n; + + while nresult < 8192 { + + let nfds = if nwrite < buf.len() { + 3 + } else { + 2 + }; + nix::poll::poll(&mut pollfds[0..nfds], TIMEOUT.as_millis() as i32)?; + + // We do three things simultaneously: send the old base image and WAL records to + // the child process's stdin, read the result from child's stdout, and forward any logging + // information that the child writes to its stderr to the page server's log. + // + // 'f_stdin' handles writing the base image and WAL records to the child process. + // 'f_stdout' below reads the result back. And 'f_stderr', which was spawned into the + // tokio runtime in the 'launch' function already, forwards the logging. + if nwrite < buf.len() && !pollfds[2].revents().unwrap().is_empty() { + // stdin + let n = self.stdin.write(&buf[nwrite..])?; + nwrite += n; } + if !pollfds[0].revents().unwrap().is_empty() { + // stdout + // Read back new page image + let n = self.stdout.read(&mut resultbuf[nresult..])?; - // Send WAL records. - for (lsn, rec) in records.iter() { - WAL_REDO_RECORD_COUNTER.inc(); - - build_apply_record_msg(*lsn, &rec.rec, &mut buf); - - //debug!("sent WAL record to wal redo postgres process ({:X}/{:X}", - // r.lsn >> 32, r.lsn & 0xffff_ffff); + nresult += n; } - //debug!("sent {} WAL records to wal redo postgres process ({:X}/{:X}", - // records.len(), lsn >> 32, lsn & 0xffff_ffff); + if !pollfds[1].revents().unwrap().is_empty() { + // stderr + let mut readbuf: [u8; 16384] = [0; 16384]; - // Send GetPage command to get the result back - build_get_page_msg(tag, &mut buf); - timeout(TIMEOUT, stdin.write_all(&buf)).await??; - timeout(TIMEOUT, stdin.flush()).await??; + let n = self.stderr.read(&mut readbuf)?; + + error!("wal-redo-postgres: {}", String::from_utf8_lossy(&readbuf[0..n])); + } //debug!("sent GetPage for {}", tag.blknum); - Ok::<(), Error>(()) - }; + } - // Read back new page image - let f_stdout = async { - let mut buf = [0u8; 8192]; - - timeout(TIMEOUT, stdout.read_exact(&mut buf)).await??; - //debug!("got response for {}", tag.blknum); - Ok::<[u8; 8192], Error>(buf) - }; - - let res = tokio::try_join!(f_stdout, f_stdin)?; - - let buf = res.0; - - Ok::(Bytes::from(std::vec::Vec::from(buf))) + Ok(Bytes::from(Vec::from(resultbuf))) } }