diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 117bd5c792..ba2b10abfe 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -53,6 +53,7 @@ use zenith_utils::seqwait::SeqWait; mod blob; mod delta_layer; mod filename; +mod global_layer_map; mod image_layer; mod inmemory_layer; mod interval_tree; diff --git a/pageserver/src/layered_repository/global_layer_map.rs b/pageserver/src/layered_repository/global_layer_map.rs new file mode 100644 index 0000000000..195bb679be --- /dev/null +++ b/pageserver/src/layered_repository/global_layer_map.rs @@ -0,0 +1,112 @@ +//! +//! Global registry of open layers. +//! +//! Whenever a new in-memory layer is created to hold incoming WAL, it is registered +//! in [`GLOBAL_LAYER_MAP`], so that we can keep track of the total number of in-memory layers +//! in the system, and know when we need to evict some to release memory. +//! +//! Each layer is assigned a unique ID when it's registered in the global registry. +//! The ID can be used to relocate the layer later, without having to hold locks. + +use std::sync::{Arc, RwLock}; + +use super::inmemory_layer::InMemoryLayer; + +use lazy_static::lazy_static; + +lazy_static! { + pub static ref GLOBAL_LAYER_MAP: RwLock = RwLock::new(OpenLayers::default()); +} + +// TODO these types can probably be smaller +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct LayerId { + index: usize, + tag: u64, // to avoid ABA problem +} + +enum SlotData { + Occupied(Arc), + /// Vacant slots form a linked list, the value is the index + /// of the next vacant slot in the list. + Vacant(Option), +} + +struct Slot { + tag: u64, + data: SlotData, +} + +#[derive(Default)] +pub struct OpenLayers { + slots: Vec, + + // Head of free-slot list. + next_empty_slot_idx: Option, +} + +impl OpenLayers { + pub fn insert(&mut self, layer: Arc) -> LayerId { + let slot_idx = match self.next_empty_slot_idx { + Some(slot_idx) => slot_idx, + None => { + let idx = self.slots.len(); + self.slots.push(Slot { + tag: 0, + data: SlotData::Vacant(None), + }); + idx + } + }; + + let slot = &mut self.slots[slot_idx]; + + match slot.data { + SlotData::Occupied(_) => unimplemented!(), + SlotData::Vacant(next_empty_slot_idx) => { + self.next_empty_slot_idx = next_empty_slot_idx; + } + } + + slot.data = SlotData::Occupied(layer); + + LayerId { + index: slot_idx, + tag: slot.tag, + } + } + + pub fn get(&self, layer_id: &LayerId) -> Option> { + let slot = self.slots.get(layer_id.index)?; // TODO should out of bounds indexes just panic? + if slot.tag != layer_id.tag { + return None; + } + + if let SlotData::Occupied(layer) = &slot.data { + Some(Arc::clone(layer)) + } else { + None + } + } + + // TODO this won't be a public API in the future + pub fn remove(&mut self, layer_id: &LayerId) { + let slot = &mut self.slots[layer_id.index]; + + if slot.tag != layer_id.tag { + return; + } + + match &slot.data { + SlotData::Occupied(_layer) => { + // TODO evict the layer + } + SlotData::Vacant(_) => unimplemented!(), + } + + slot.data = SlotData::Vacant(self.next_empty_slot_idx); + self.next_empty_slot_idx = Some(layer_id.index); + + slot.tag = slot.tag.wrapping_add(1); + } +} diff --git a/pageserver/src/layered_repository/layer_map.rs b/pageserver/src/layered_repository/layer_map.rs index c5643e896b..d7d4bc6cee 100644 --- a/pageserver/src/layered_repository/layer_map.rs +++ b/pageserver/src/layered_repository/layer_map.rs @@ -21,6 +21,8 @@ use std::sync::Arc; use zenith_metrics::{register_int_gauge, IntGauge}; use zenith_utils::lsn::Lsn; +use super::global_layer_map::{LayerId, GLOBAL_LAYER_MAP}; + lazy_static! { static ref NUM_INMEMORY_LAYERS: IntGauge = register_int_gauge!("pageserver_inmemory_layers", "Number of layers in memory") @@ -68,7 +70,10 @@ impl LayerMap { pub fn get_open(&self, tag: &SegmentTag) -> Option> { let segentry = self.segs.get(tag)?; - segentry.open.as_ref().map(Arc::clone) + segentry + .open_layer_id + .map(|layer_id| GLOBAL_LAYER_MAP.read().unwrap().get(&layer_id)) + .flatten() } /// @@ -77,7 +82,7 @@ impl LayerMap { pub fn insert_open(&mut self, layer: Arc) { let segentry = self.segs.entry(layer.get_seg_tag()).or_default(); - segentry.update_open(Arc::clone(&layer)); + let layer_id = segentry.update_open(Arc::clone(&layer)); let oldest_pending_lsn = layer.get_oldest_pending_lsn(); @@ -89,7 +94,7 @@ impl LayerMap { // Also add it to the binary heap let open_layer_entry = OpenLayerEntry { oldest_pending_lsn: layer.get_oldest_pending_lsn(), - layer, + layer_id, generation: self.current_generation, }; self.open_layers.push(open_layer_entry); @@ -101,17 +106,27 @@ impl LayerMap { pub fn pop_oldest_open(&mut self) { // Pop it from the binary heap let oldest_entry = self.open_layers.pop().unwrap(); - let segtag = oldest_entry.layer.get_seg_tag(); - // Also remove it from the SegEntry of this segment - let mut segentry = self.segs.get_mut(&segtag).unwrap(); - if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) { - segentry.open = None; - } else { - // We could have already updated segentry.open for - // dropped (non-writeable) layer. This is fine. - assert!(!oldest_entry.layer.is_writeable()); - assert!(oldest_entry.layer.is_dropped()); + let layer_opt = { + let mut global_map = GLOBAL_LAYER_MAP.write().unwrap(); + let layer_opt = global_map.get(&oldest_entry.layer_id); + global_map.remove(&oldest_entry.layer_id); + // TODO it's bad that a ref can still exist after being evicted from global map + layer_opt + }; + + if let Some(layer) = layer_opt { + let mut segentry = self.segs.get_mut(&layer.get_seg_tag()).unwrap(); + + // Also remove it from the SegEntry of this segment + if segentry.open_layer_id.unwrap() == oldest_entry.layer_id { + segentry.open_layer_id = None; + } else { + // We could have already updated segentry.open for + // dropped (non-writeable) layer. This is fine. + assert!(!layer.is_writeable()); + assert!(layer.is_dropped()); + } } NUM_INMEMORY_LAYERS.dec(); @@ -200,9 +215,12 @@ impl LayerMap { /// Return the oldest in-memory layer, along with its generation number. pub fn peek_oldest_open(&self) -> Option<(Arc, u64)> { - self.open_layers - .peek() - .map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation)) + let oldest_entry = self.open_layers.peek()?; + let layer = GLOBAL_LAYER_MAP + .read() + .unwrap() + .get(&oldest_entry.layer_id)?; + Some((layer, oldest_entry.generation)) } /// Increment the generation number used to stamp open in-memory layers. Layers @@ -225,8 +243,12 @@ impl LayerMap { pub fn dump(&self) -> Result<()> { println!("Begin dump LayerMap"); for (seg, segentry) in self.segs.iter() { - if let Some(open) = &segentry.open { - open.dump()?; + if let Some(open) = &segentry.open_layer_id { + if let Some(layer) = GLOBAL_LAYER_MAP.read().unwrap().get(open) { + layer.dump()?; + } else { + println!("layer not found in global map"); + } } for layer in segentry.historic.iter() { @@ -259,7 +281,7 @@ impl IntervalItem for dyn Layer { /// IntervalTree. #[derive(Default)] struct SegEntry { - open: Option>, + open_layer_id: Option, historic: IntervalTree, } @@ -275,10 +297,10 @@ impl SegEntry { } pub fn get(&self, lsn: Lsn) -> Option> { - if let Some(open) = &self.open { - if open.get_start_lsn() <= lsn { - let x: Arc = Arc::clone(open) as _; - return Some(x); + if let Some(open_layer_id) = &self.open_layer_id { + let open_layer = GLOBAL_LAYER_MAP.read().unwrap().get(open_layer_id)?; + if open_layer.get_start_lsn() <= lsn { + return Some(open_layer); } } @@ -297,11 +319,16 @@ impl SegEntry { // Set new open layer for a SegEntry. // It's ok to rewrite previous open layer, // but only if it is not writeable anymore. - pub fn update_open(&mut self, layer: Arc) { - if let Some(prev_open) = &self.open { - assert!(!prev_open.is_writeable()); + pub fn update_open(&mut self, layer: Arc) -> LayerId { + if let Some(prev_open_layer_id) = &self.open_layer_id { + if let Some(prev_open_layer) = GLOBAL_LAYER_MAP.read().unwrap().get(prev_open_layer_id) + { + assert!(!prev_open_layer.is_writeable()); + } } - self.open = Some(layer); + let open_layer_id = GLOBAL_LAYER_MAP.write().unwrap().insert(layer); + self.open_layer_id = Some(open_layer_id); + open_layer_id } pub fn insert_historic(&mut self, layer: Arc) { @@ -316,9 +343,9 @@ impl SegEntry { /// recently-added entries (i.e after last call to increment_generation()) from older /// entries with the same 'oldest_pending_lsn'. struct OpenLayerEntry { - pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn() - pub generation: u64, - pub layer: Arc, + oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn() + generation: u64, + layer_id: LayerId, } impl Ord for OpenLayerEntry { fn cmp(&self, other: &Self) -> Ordering {