pageserver - add InMemoryLayer global map (#817)

This commit is contained in:
Patrick Insinger
2021-11-01 12:20:24 -07:00
committed by GitHub
parent 85f8bf97f5
commit cdbbd15eb9
3 changed files with 170 additions and 30 deletions

View File

@@ -53,6 +53,7 @@ use zenith_utils::seqwait::SeqWait;
mod blob;
mod delta_layer;
mod filename;
mod global_layer_map;
mod image_layer;
mod inmemory_layer;
mod interval_tree;

View File

@@ -0,0 +1,112 @@
//!
//! Global registry of open layers.
//!
//! Whenever a new in-memory layer is created to hold incoming WAL, it is registered
//! in [`GLOBAL_LAYER_MAP`], so that we can keep track of the total number of in-memory layers
//! in the system, and know when we need to evict some to release memory.
//!
//! Each layer is assigned a unique ID when it's registered in the global registry.
//! The ID can be used to relocate the layer later, without having to hold locks.
use std::sync::{Arc, RwLock};
use super::inmemory_layer::InMemoryLayer;
use lazy_static::lazy_static;
lazy_static! {
pub static ref GLOBAL_LAYER_MAP: RwLock<OpenLayers> = RwLock::new(OpenLayers::default());
}
// TODO these types can probably be smaller
#[derive(PartialEq, Eq, Clone, Copy)]
pub struct LayerId {
index: usize,
tag: u64, // to avoid ABA problem
}
enum SlotData {
Occupied(Arc<InMemoryLayer>),
/// Vacant slots form a linked list, the value is the index
/// of the next vacant slot in the list.
Vacant(Option<usize>),
}
struct Slot {
tag: u64,
data: SlotData,
}
#[derive(Default)]
pub struct OpenLayers {
slots: Vec<Slot>,
// Head of free-slot list.
next_empty_slot_idx: Option<usize>,
}
impl OpenLayers {
pub fn insert(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
let slot_idx = match self.next_empty_slot_idx {
Some(slot_idx) => slot_idx,
None => {
let idx = self.slots.len();
self.slots.push(Slot {
tag: 0,
data: SlotData::Vacant(None),
});
idx
}
};
let slot = &mut self.slots[slot_idx];
match slot.data {
SlotData::Occupied(_) => unimplemented!(),
SlotData::Vacant(next_empty_slot_idx) => {
self.next_empty_slot_idx = next_empty_slot_idx;
}
}
slot.data = SlotData::Occupied(layer);
LayerId {
index: slot_idx,
tag: slot.tag,
}
}
pub fn get(&self, layer_id: &LayerId) -> Option<Arc<InMemoryLayer>> {
let slot = self.slots.get(layer_id.index)?; // TODO should out of bounds indexes just panic?
if slot.tag != layer_id.tag {
return None;
}
if let SlotData::Occupied(layer) = &slot.data {
Some(Arc::clone(layer))
} else {
None
}
}
// TODO this won't be a public API in the future
pub fn remove(&mut self, layer_id: &LayerId) {
let slot = &mut self.slots[layer_id.index];
if slot.tag != layer_id.tag {
return;
}
match &slot.data {
SlotData::Occupied(_layer) => {
// TODO evict the layer
}
SlotData::Vacant(_) => unimplemented!(),
}
slot.data = SlotData::Vacant(self.next_empty_slot_idx);
self.next_empty_slot_idx = Some(layer_id.index);
slot.tag = slot.tag.wrapping_add(1);
}
}

View File

@@ -21,6 +21,8 @@ use std::sync::Arc;
use zenith_metrics::{register_int_gauge, IntGauge};
use zenith_utils::lsn::Lsn;
use super::global_layer_map::{LayerId, GLOBAL_LAYER_MAP};
lazy_static! {
static ref NUM_INMEMORY_LAYERS: IntGauge =
register_int_gauge!("pageserver_inmemory_layers", "Number of layers in memory")
@@ -68,7 +70,10 @@ impl LayerMap {
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
let segentry = self.segs.get(tag)?;
segentry.open.as_ref().map(Arc::clone)
segentry
.open_layer_id
.map(|layer_id| GLOBAL_LAYER_MAP.read().unwrap().get(&layer_id))
.flatten()
}
///
@@ -77,7 +82,7 @@ impl LayerMap {
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.update_open(Arc::clone(&layer));
let layer_id = segentry.update_open(Arc::clone(&layer));
let oldest_pending_lsn = layer.get_oldest_pending_lsn();
@@ -89,7 +94,7 @@ impl LayerMap {
// Also add it to the binary heap
let open_layer_entry = OpenLayerEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer,
layer_id,
generation: self.current_generation,
};
self.open_layers.push(open_layer_entry);
@@ -101,17 +106,27 @@ impl LayerMap {
pub fn pop_oldest_open(&mut self) {
// Pop it from the binary heap
let oldest_entry = self.open_layers.pop().unwrap();
let segtag = oldest_entry.layer.get_seg_tag();
// Also remove it from the SegEntry of this segment
let mut segentry = self.segs.get_mut(&segtag).unwrap();
if Arc::ptr_eq(segentry.open.as_ref().unwrap(), &oldest_entry.layer) {
segentry.open = None;
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
assert!(!oldest_entry.layer.is_writeable());
assert!(oldest_entry.layer.is_dropped());
let layer_opt = {
let mut global_map = GLOBAL_LAYER_MAP.write().unwrap();
let layer_opt = global_map.get(&oldest_entry.layer_id);
global_map.remove(&oldest_entry.layer_id);
// TODO it's bad that a ref can still exist after being evicted from global map
layer_opt
};
if let Some(layer) = layer_opt {
let mut segentry = self.segs.get_mut(&layer.get_seg_tag()).unwrap();
// Also remove it from the SegEntry of this segment
if segentry.open_layer_id.unwrap() == oldest_entry.layer_id {
segentry.open_layer_id = None;
} else {
// We could have already updated segentry.open for
// dropped (non-writeable) layer. This is fine.
assert!(!layer.is_writeable());
assert!(layer.is_dropped());
}
}
NUM_INMEMORY_LAYERS.dec();
@@ -200,9 +215,12 @@ impl LayerMap {
/// Return the oldest in-memory layer, along with its generation number.
pub fn peek_oldest_open(&self) -> Option<(Arc<InMemoryLayer>, u64)> {
self.open_layers
.peek()
.map(|oldest_entry| (Arc::clone(&oldest_entry.layer), oldest_entry.generation))
let oldest_entry = self.open_layers.peek()?;
let layer = GLOBAL_LAYER_MAP
.read()
.unwrap()
.get(&oldest_entry.layer_id)?;
Some((layer, oldest_entry.generation))
}
/// Increment the generation number used to stamp open in-memory layers. Layers
@@ -225,8 +243,12 @@ impl LayerMap {
pub fn dump(&self) -> Result<()> {
println!("Begin dump LayerMap");
for (seg, segentry) in self.segs.iter() {
if let Some(open) = &segentry.open {
open.dump()?;
if let Some(open) = &segentry.open_layer_id {
if let Some(layer) = GLOBAL_LAYER_MAP.read().unwrap().get(open) {
layer.dump()?;
} else {
println!("layer not found in global map");
}
}
for layer in segentry.historic.iter() {
@@ -259,7 +281,7 @@ impl IntervalItem for dyn Layer {
/// IntervalTree.
#[derive(Default)]
struct SegEntry {
open: Option<Arc<InMemoryLayer>>,
open_layer_id: Option<LayerId>,
historic: IntervalTree<dyn Layer>,
}
@@ -275,10 +297,10 @@ impl SegEntry {
}
pub fn get(&self, lsn: Lsn) -> Option<Arc<dyn Layer>> {
if let Some(open) = &self.open {
if open.get_start_lsn() <= lsn {
let x: Arc<dyn Layer> = Arc::clone(open) as _;
return Some(x);
if let Some(open_layer_id) = &self.open_layer_id {
let open_layer = GLOBAL_LAYER_MAP.read().unwrap().get(open_layer_id)?;
if open_layer.get_start_lsn() <= lsn {
return Some(open_layer);
}
}
@@ -297,11 +319,16 @@ impl SegEntry {
// Set new open layer for a SegEntry.
// It's ok to rewrite previous open layer,
// but only if it is not writeable anymore.
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) {
if let Some(prev_open) = &self.open {
assert!(!prev_open.is_writeable());
pub fn update_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
if let Some(prev_open_layer_id) = &self.open_layer_id {
if let Some(prev_open_layer) = GLOBAL_LAYER_MAP.read().unwrap().get(prev_open_layer_id)
{
assert!(!prev_open_layer.is_writeable());
}
}
self.open = Some(layer);
let open_layer_id = GLOBAL_LAYER_MAP.write().unwrap().insert(layer);
self.open_layer_id = Some(open_layer_id);
open_layer_id
}
pub fn insert_historic(&mut self, layer: Arc<dyn Layer>) {
@@ -316,9 +343,9 @@ impl SegEntry {
/// recently-added entries (i.e after last call to increment_generation()) from older
/// entries with the same 'oldest_pending_lsn'.
struct OpenLayerEntry {
pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
pub generation: u64,
pub layer: Arc<InMemoryLayer>,
oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
generation: u64,
layer_id: LayerId,
}
impl Ord for OpenLayerEntry {
fn cmp(&self, other: &Self) -> Ordering {