WIP: Track oldest open layer

This commit is contained in:
Heikki Linnakangas
2021-08-16 14:54:20 +03:00
parent e35a5aa550
commit a389c2ed7f
3 changed files with 137 additions and 103 deletions

View File

@@ -54,14 +54,14 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
// Perform a checkpoint in the GC thread, when the LSN has advanced this much since
// last checkpoint. This puts a backstop on how much WAL needs to be re-digested if
// the page server is restarted.
// Flush out an inmemory layer, if it's holding WAL older than
// this. This puts a backstop on how much WAL needs to be re-digested
// if the page server is restarted.
//
// FIXME: This current value is very low. I would imagine something like 1 GB or 10 GB
// would be more appropriate. But a low value forces the code to be exercised more,
// which is good for now to trigger bugs.
static CHECKPOINT_INTERVAL: u64 = 16 * 1024 * 1024;
static OLDEST_INMEM_DISTANCE: u64 = 16 * 1024 * 1024;
// Metrics collected on operations on the storage repository.
lazy_static! {
@@ -261,11 +261,11 @@ impl LayeredRepository {
{
let timelines = self.timelines.lock().unwrap();
for (_timelineid, timeline) in timelines.iter() {
let distance = u64::from(timeline.last_valid_lsn.load())
- u64::from(timeline.last_checkpoint_lsn.load());
if distance > CHECKPOINT_INTERVAL {
timeline.checkpoint()?;
}
STORAGE_TIME
.with_label_values(&["checkpoint_timed"])
.observe_closure_duration(
|| timeline.checkpoint_internal(false)
)?
}
// release lock on 'timelines'
}
@@ -456,7 +456,7 @@ pub struct LayeredTimeline {
last_record_lsn: AtomicLsn,
prev_record_lsn: AtomicLsn,
last_checkpoint_lsn: AtomicLsn,
oldest_pending_lsn: AtomicLsn,
// Parent timeline that this timeline was branched from, and the LSN
// of the branch point.
@@ -774,8 +774,8 @@ impl Timeline for LayeredTimeline {
/// metrics collection.
fn checkpoint(&self) -> Result<()> {
STORAGE_TIME
.with_label_values(&["checkpoint"])
.observe_closure_duration(|| self.checkpoint_internal())
.with_label_values(&["checkpoint_force"])
.observe_closure_duration(|| self.checkpoint_internal(true))
}
/// Remember that WAL has been received and added to the page cache up to the given LSN
@@ -867,7 +867,7 @@ impl LayeredTimeline {
last_valid_lsn: SeqWait::new(metadata.last_valid_lsn),
last_record_lsn: AtomicLsn::new(metadata.last_record_lsn.0),
prev_record_lsn: AtomicLsn::new(metadata.prev_record_lsn.0),
last_checkpoint_lsn: AtomicLsn::new(metadata.last_valid_lsn.0),
oldest_pending_lsn: AtomicLsn::new(metadata.last_valid_lsn.0),
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn,
@@ -1003,23 +1003,23 @@ impl LayeredTimeline {
let layer;
if let Some((prev_layer, _prev_lsn)) = self.get_layer_for_read(seg, lsn)? {
// Create new entry after the previous one.
let lsn;
let start_lsn;
if prev_layer.get_timeline_id() != self.timelineid {
// First modification on this timeline
lsn = self.ancestor_lsn;
start_lsn = self.ancestor_lsn;
trace!(
"creating file for write for {} at branch point {}/{}",
seg,
self.timelineid,
lsn
start_lsn
);
} else {
lsn = prev_layer.get_end_lsn();
start_lsn = prev_layer.get_end_lsn();
trace!(
"creating file for write for {} after previous layer {}/{}",
seg,
self.timelineid,
lsn
start_lsn
);
}
trace!(
@@ -1034,6 +1034,7 @@ impl LayeredTimeline {
&*prev_layer,
self.timelineid,
self.tenantid,
start_lsn,
lsn,
)?;
} else {
@@ -1045,7 +1046,7 @@ impl LayeredTimeline {
lsn
);
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn)?;
layer = InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, seg, lsn, lsn)?;
}
let mut layers = self.layers.lock().unwrap();
@@ -1088,7 +1089,7 @@ impl LayeredTimeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL. We don't
/// know anything about them here in the repository.
fn checkpoint_internal(&self) -> Result<()> {
fn checkpoint_internal(&self, force: bool) -> Result<()> {
let last_valid_lsn = self.last_valid_lsn.load();
let last_record_lsn = self.last_record_lsn.load();
let prev_record_lsn = self.prev_record_lsn.load();
@@ -1130,14 +1131,26 @@ impl LayeredTimeline {
// Call freeze() on any unfrozen layers (that is, layers that haven't
// been written to disk yet).
// Call unload() on all frozen layers, to release memory.
let mut iter = layers.iter_open_layers();
while let Some(layer) = iter.next() {
let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?;
let mut oldest_pending_lsn = last_valid_lsn;
while let Some(oldest_layer) = layers.get_oldest_open_layer() {
oldest_pending_lsn = oldest_layer.get_oldest_pending_lsn();
let distance = last_valid_lsn.0 - oldest_pending_lsn.0;
if !force && distance < OLDEST_INMEM_DISTANCE {
info!("the oldest layer is now {} which is {} bytes behind last_valid_lsn",
oldest_layer.get_seg_tag(), distance);
break;
}
let (new_historic, new_open) = oldest_layer.freeze(last_valid_lsn, &self)?;
// replace this layer with the new layers that 'freeze' returned
// (removes it if new_open is None)
iter.replace(new_open);
layers.pop_oldest();
if let Some(n) = new_open {
layers.insert_open(n);
}
if let Some(historic) = new_historic {
trace!(
"freeze returned layer {} {}-{}",
@@ -1145,7 +1158,7 @@ impl LayeredTimeline {
historic.get_start_lsn(),
historic.get_end_lsn()
);
iter.insert_historic(historic);
layers.insert_historic(historic);
}
}
@@ -1171,7 +1184,7 @@ impl LayeredTimeline {
};
LayeredRepository::save_metadata(self.conf, self.timelineid, self.tenantid, &metadata)?;
self.last_checkpoint_lsn.store(last_valid_lsn);
self.oldest_pending_lsn.store(oldest_pending_lsn);
Ok(())
}
@@ -1202,12 +1215,6 @@ impl LayeredTimeline {
let now = Instant::now();
let mut result: GcResult = Default::default();
// Scan all snapshot files in the directory. For each file, if a newer file
// exists, we can remove the old one.
self.checkpoint()?;
let mut layers = self.layers.lock().unwrap();
info!(
"running GC on timeline {}, cutoff {}",
self.timelineid, cutoff
@@ -1215,9 +1222,13 @@ impl LayeredTimeline {
let mut layers_to_remove: Vec<Arc<SnapshotLayer>> = Vec::new();
// Scan all snapshot files in the directory. For each file, if a newer file
// exists, we can remove the old one.
//
// Determine for each file if it needs to be retained
// FIXME: also scan open in-memory layers. Normally we cannot remove the
// latest layer of any seg, but if it was unlinked it's possible
let mut layers = self.layers.lock().unwrap();
'outer: for l in layers.iter_historic_layers() {
let seg = l.get_seg_tag();

View File

@@ -32,6 +32,8 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
/// The above fields never change. The parts that do change are in 'inner',
/// and protected by mutex.
inner: Mutex<InMemoryLayerInner>,
@@ -164,6 +166,11 @@ impl Layer for InMemoryLayer {
}
impl InMemoryLayer {
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
}
///
/// Create a new, empty, in-memory layer
///
@@ -173,6 +180,7 @@ impl InMemoryLayer {
tenantid: ZTenantId,
seg: SegmentTag,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!(
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
@@ -187,6 +195,7 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: BTreeMap::new(),
@@ -314,13 +323,14 @@ impl InMemoryLayer {
src: &dyn Layer,
timelineid: ZTimelineId,
tenantid: ZTenantId,
lsn: Lsn,
start_lsn: Lsn,
oldest_pending_lsn: Lsn,
) -> Result<InMemoryLayer> {
trace!(
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
src.get_seg_tag(),
timelineid,
lsn
start_lsn
);
let mut page_versions = BTreeMap::new();
let mut segsizes = BTreeMap::new();
@@ -331,8 +341,8 @@ impl InMemoryLayer {
let startblk;
let size;
if seg.rel.is_blocky() {
size = src.get_seg_size(lsn)?;
segsizes.insert(lsn, size);
size = src.get_seg_size(start_lsn)?;
segsizes.insert(start_lsn, size);
startblk = seg.segno * RELISH_SEG_SIZE;
} else {
size = 1;
@@ -340,13 +350,13 @@ impl InMemoryLayer {
}
for blknum in startblk..(startblk + size) {
let img = timeline.materialize_page(seg, blknum, lsn, src)?;
let img = timeline.materialize_page(seg, blknum, start_lsn, src)?;
let pv = PageVersion {
page_image: Some(img),
record: None,
};
mem_used += pv.get_mem_size();
page_versions.insert((blknum, lsn), pv);
page_versions.insert((blknum, start_lsn), pv);
}
Ok(InMemoryLayer {
@@ -354,7 +364,8 @@ impl InMemoryLayer {
timelineid,
tenantid,
seg: src.get_seg_tag(),
start_lsn: lsn,
start_lsn,
oldest_pending_lsn,
inner: Mutex::new(InMemoryLayerInner {
drop_lsn: None,
page_versions: page_versions,
@@ -451,26 +462,28 @@ impl InMemoryLayer {
// If there were any "new" page versions, initialize a new in-memory layer to hold
// them
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
let new_open =
if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
let new_open = Self::copy_snapshot(
self.conf,
timeline,
&snapfile,
self.timelineid,
self.tenantid,
end_lsn,
)?;
let mut new_inner = new_open.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.segsizes.append(&mut after_segsizes);
drop(new_inner);
let new_open = Self::copy_snapshot(
self.conf,
timeline,
&snapfile,
self.timelineid,
self.tenantid,
end_lsn,
end_lsn,
)?;
let mut new_inner = new_open.inner.lock().unwrap();
new_inner.page_versions.append(&mut after_page_versions);
new_inner.segsizes.append(&mut after_segsizes);
drop(new_inner);
Some(Arc::new(new_open))
} else {
None
};
Some(Arc::new(new_open))
} else {
None
};
let new_historic = Some(Arc::new(snapfile));

View File

@@ -15,19 +15,23 @@ use crate::relish::*;
use anyhow::Result;
use log::*;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BinaryHeap, BTreeMap, HashMap};
use std::ops::Bound::Included;
use std::cmp::Ordering;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
///
/// LayerMap tracks what layers exist or a timeline. The last layer that is
/// LayerMap tracks what layers exist on a timeline. The last layer that is
/// open for writes is always an InMemoryLayer, and is tracked separately
/// because there can be only one for each segment. The older layers,
/// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN.
///
pub struct LayerMap {
segs: HashMap<SegmentTag, SegEntry>,
// FIXME: explain this
open_segs: BinaryHeap<OpenSegEntry>,
}
struct SegEntry {
@@ -35,6 +39,31 @@ struct SegEntry {
pub historic: BTreeMap<Lsn, Arc<SnapshotLayer>>,
}
struct OpenSegEntry {
pub oldest_pending_lsn: Lsn,
pub layer: Arc<InMemoryLayer>,
}
impl Ord for OpenSegEntry {
fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
other.oldest_pending_lsn.cmp(&self.oldest_pending_lsn)
}
}
impl PartialOrd for OpenSegEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
// BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
// to get that.
other.oldest_pending_lsn.partial_cmp(&self.oldest_pending_lsn)
}
}
impl PartialEq for OpenSegEntry {
fn eq(&self, other: &Self) -> bool {
self.oldest_pending_lsn.eq(&other.oldest_pending_lsn)
}
}
impl Eq for OpenSegEntry {}
impl LayerMap {
///
/// Look up using the given segment tag and LSN. This differs from a plain
@@ -88,14 +117,29 @@ impl LayerMap {
if let Some(_old) = &segentry.open {
// FIXME: shouldn't exist, but check
}
segentry.open = Some(layer);
segentry.open = Some(Arc::clone(&layer));
} else {
let segentry = SegEntry {
open: Some(layer),
open: Some(Arc::clone(&layer)),
historic: BTreeMap::new(),
};
self.segs.insert(tag, segentry);
}
let opensegentry = OpenSegEntry {
oldest_pending_lsn: layer.get_oldest_pending_lsn(),
layer: layer,
};
self.open_segs.push(opensegentry);
}
// replace given open layer with other layers.
pub fn pop_oldest(&mut self) {
let opensegentry = self.open_segs.pop().unwrap();
let segtag = opensegentry.layer.get_seg_tag();
let mut segentry = self.segs.get_mut(&segtag).unwrap();
segentry.open = None;
}
///
@@ -196,10 +240,11 @@ impl LayerMap {
false
}
pub fn iter_open_layers(&mut self) -> OpenLayerIter {
OpenLayerIter {
last: None,
segiter: self.segs.iter_mut(),
pub fn get_oldest_open_layer(&mut self) -> Option<Arc<InMemoryLayer>> {
if let Some(opensegentry) = self.open_segs.peek() {
Some(Arc::clone(&opensegentry.layer))
} else {
None
}
}
@@ -215,46 +260,11 @@ impl Default for LayerMap {
fn default() -> Self {
LayerMap {
segs: HashMap::new(),
open_segs: BinaryHeap::new(),
}
}
}
pub struct OpenLayerIter<'a> {
last: Option<&'a mut SegEntry>,
segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>,
}
impl<'a> OpenLayerIter<'a> {
pub fn replace(&mut self, replacement: Option<Arc<InMemoryLayer>>) {
let segentry = self.last.as_mut().unwrap();
segentry.open = replacement;
}
pub fn insert_historic(&mut self, new_layer: Arc<SnapshotLayer>) {
let start_lsn = new_layer.get_start_lsn();
let segentry = self.last.as_mut().unwrap();
segentry.historic.insert(start_lsn, new_layer);
}
}
impl<'a> Iterator for OpenLayerIter<'a> {
type Item = Arc<InMemoryLayer>;
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
while let Some((_seg, entry)) = self.segiter.next() {
if let Some(open) = &entry.open {
let op = Arc::clone(&open);
self.last = Some(entry);
return Some(op);
}
}
self.last = None;
None
}
}
pub struct HistoricLayerIter<'a> {
segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<SnapshotLayer>>>,