Compare commits

...

1 Commits

Author SHA1 Message Date
Patrick Insinger
32dd786650 pageserver - add naive layer IDs 2021-11-01 00:20:50 -07:00

View File

@@ -17,6 +17,7 @@ use anyhow::Result;
use lazy_static::lazy_static;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;
use zenith_metrics::{register_int_gauge, IntGauge};
use zenith_utils::lsn::Lsn;
@@ -30,6 +31,17 @@ lazy_static! {
.expect("failed to define a metric");
}
static NEXT_LAYER_ID: AtomicU64 = AtomicU64::new(0);
#[derive(PartialEq, Eq, Hash, Clone, Copy)]
pub struct LayerId(u64);
impl LayerId {
fn next() -> LayerId {
Self(NEXT_LAYER_ID.fetch_add(1, atomic::Ordering::Relaxed))
}
}
///
/// LayerMap tracks what layers exist on a timeline.
///
@@ -43,6 +55,8 @@ pub struct LayerMap {
/// contains the oldest WAL record.
open_layers: BinaryHeap<OpenLayerEntry>,
open_layers_by_id: HashMap<LayerId, Arc<InMemoryLayer>>,
/// Generation number, used to distinguish newly inserted entries in the
/// binary heap from older entries during checkpoint.
current_generation: u64,
@@ -71,10 +85,15 @@ impl LayerMap {
segentry.open.as_ref().map(Arc::clone)
}
#[allow(dead_code)]
pub fn get_open_by_id(&self, layer_id: &LayerId) -> Option<Arc<InMemoryLayer>> {
self.open_layers_by_id.get(layer_id).cloned()
}
///
/// Insert an open in-memory layer
///
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) -> LayerId {
let segentry = self.segs.entry(layer.get_seg_tag()).or_default();
segentry.update_open(Arc::clone(&layer));
@@ -86,15 +105,23 @@ impl LayerMap {
// in the middle of a WAL record.
assert!(oldest_pending_lsn.is_aligned());
let id = LayerId::next();
// Also add it to the binary heap
let open_layer_entry = OpenLayerEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer,
layer: Arc::clone(&layer),
generation: self.current_generation,
id,
};
self.open_layers.push(open_layer_entry);
let old_layer = self.open_layers_by_id.insert(id, layer);
assert!(old_layer.is_none());
NUM_INMEMORY_LAYERS.inc();
id
}
/// Remove the oldest in-memory layer
@@ -114,6 +141,8 @@ impl LayerMap {
assert!(oldest_entry.layer.is_dropped());
}
self.open_layers_by_id.remove(&oldest_entry.id).unwrap();
NUM_INMEMORY_LAYERS.dec();
}
@@ -319,6 +348,7 @@ struct OpenLayerEntry {
pub oldest_pending_lsn: Lsn, // copy of layer.get_oldest_pending_lsn()
pub generation: u64,
pub layer: Arc<InMemoryLayer>,
id: LayerId,
}
impl Ord for OpenLayerEntry {
fn cmp(&self, other: &Self) -> Ordering {