mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 11:40:38 +00:00
Handle last "open" layer specially in LayerMap.
There can be only one "open" layer for each segment. That's the last one, implemented by InMemoryLayer. That's the only one where new records can be appended to. Much of the code needed to distinguish between the last open layer and other layers anyway, so make the distinction explicit in LayerMap.
This commit is contained in:
@@ -896,7 +896,7 @@ impl LayeredTimeline {
|
||||
layer_rc.is_dropped(),
|
||||
self.timelineid
|
||||
);
|
||||
layers.insert(Arc::clone(layer_rc));
|
||||
layers.insert_historic(Arc::clone(layer_rc));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -942,7 +942,7 @@ impl LayeredTimeline {
|
||||
//
|
||||
|
||||
// Do we have a layer on this timeline?
|
||||
if let Some(layer) = layers.get(seg, lsn) {
|
||||
if let Some(layer) = layers.get(&seg, lsn) {
|
||||
trace!(
|
||||
"found layer in cache: {} {}-{}",
|
||||
timeline.timelineid,
|
||||
@@ -974,18 +974,19 @@ impl LayeredTimeline {
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<dyn Layer>> {
|
||||
fn get_layer_for_write(&self, seg: SegmentTag, lsn: Lsn) -> Result<Arc<InMemoryLayer>> {
|
||||
let layers = self.layers.lock().unwrap();
|
||||
|
||||
if lsn < self.last_valid_lsn.load() {
|
||||
bail!("cannot modify relation after advancing last_valid_lsn");
|
||||
}
|
||||
|
||||
// Look up the correct layer.
|
||||
let layers = self.layers.lock().unwrap();
|
||||
if let Some(layer) = layers.get(seg, lsn) {
|
||||
// If it's writeable, good, return it.
|
||||
if !layer.is_frozen() {
|
||||
return Ok(Arc::clone(&layer));
|
||||
// Do we have a layer open for writing already?
|
||||
if let Some(layer) = layers.get_open(&seg) {
|
||||
if layer.get_start_lsn() > lsn {
|
||||
bail!("unexpected open layer in the future");
|
||||
}
|
||||
return Ok(layer);
|
||||
}
|
||||
|
||||
// No (writeable) layer for this relation yet. Create one.
|
||||
@@ -1048,8 +1049,8 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let layer_rc: Arc<dyn Layer> = Arc::new(layer);
|
||||
layers.insert(Arc::clone(&layer_rc));
|
||||
let layer_rc: Arc<InMemoryLayer> = Arc::new(layer);
|
||||
layers.insert_open(Arc::clone(&layer_rc));
|
||||
|
||||
Ok(layer_rc)
|
||||
}
|
||||
@@ -1126,34 +1127,32 @@ impl LayeredTimeline {
|
||||
// aggressive. Some kind of LRU policy would be appropriate.
|
||||
//
|
||||
|
||||
// It is not possible to modify a BTreeMap while you're iterating
|
||||
// it. So we have to make a temporary copy, and iterate through that,
|
||||
// while we modify the original.
|
||||
let old_layers = layers.inner.clone();
|
||||
|
||||
// Call freeze() on any unfrozen layers (that is, layers that haven't
|
||||
// been written to disk yet).
|
||||
// Call unload() on all frozen layers, to release memory.
|
||||
for layer in old_layers.values() {
|
||||
if !layer.is_frozen() {
|
||||
let new_layers = layer.freeze(last_valid_lsn, &self)?;
|
||||
let mut iter = layers.iter_open_layers();
|
||||
while let Some(layer) = iter.next() {
|
||||
let (new_historic, new_open) = layer.freeze(last_valid_lsn, &self)?;
|
||||
|
||||
// replace this layer with the new layers that 'freeze' returned
|
||||
layers.remove(&**layer);
|
||||
for new_layer in new_layers {
|
||||
trace!(
|
||||
"freeze returned layer {} {}-{}",
|
||||
new_layer.get_seg_tag(),
|
||||
new_layer.get_start_lsn(),
|
||||
new_layer.get_end_lsn()
|
||||
);
|
||||
layers.insert(Arc::clone(&new_layer));
|
||||
}
|
||||
} else {
|
||||
layer.unload()?;
|
||||
// replace this layer with the new layers that 'freeze' returned
|
||||
// (removes it if new_open is None)
|
||||
iter.replace(new_open);
|
||||
|
||||
if let Some(historic) = new_historic {
|
||||
trace!(
|
||||
"freeze returned layer {} {}-{}",
|
||||
historic.get_seg_tag(),
|
||||
historic.get_start_lsn(),
|
||||
historic.get_end_lsn()
|
||||
);
|
||||
iter.insert_historic(historic);
|
||||
}
|
||||
}
|
||||
|
||||
for layer in layers.iter_historic_layers() {
|
||||
layer.unload()?;
|
||||
}
|
||||
|
||||
// Also save the metadata, with updated last_valid_lsn and last_record_lsn, to a
|
||||
// file in the timeline dir. The metadata reflects the last_valid_lsn as it was
|
||||
// when we *started* the checkpoint, so that after crash, the WAL receiver knows
|
||||
@@ -1214,10 +1213,14 @@ impl LayeredTimeline {
|
||||
self.timelineid, cutoff
|
||||
);
|
||||
|
||||
let mut layers_to_remove: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
let mut layers_to_remove: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
|
||||
// Determine for each file if it needs to be retained
|
||||
'outer: for ((seg, _lsn), l) in layers.inner.iter() {
|
||||
// FIXME: also scan open in-memory layers. Normally we cannot remove the
|
||||
// latest layer of any seg, but if it was unlinked it's possible
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
let seg = l.get_seg_tag();
|
||||
|
||||
if seg.rel.is_relation() {
|
||||
result.snapshot_relfiles_total += 1;
|
||||
} else {
|
||||
@@ -1279,7 +1282,7 @@ impl LayeredTimeline {
|
||||
l.get_end_lsn(),
|
||||
l.is_dropped()
|
||||
);
|
||||
layers_to_remove.push(Arc::clone(l));
|
||||
layers_to_remove.push(Arc::clone(&l));
|
||||
}
|
||||
|
||||
// Actually delete the layers from disk and remove them from the map.
|
||||
@@ -1287,7 +1290,7 @@ impl LayeredTimeline {
|
||||
// while iterating it. BTreeMap::retain() would be another option)
|
||||
for doomed_layer in layers_to_remove {
|
||||
doomed_layer.delete()?;
|
||||
layers.remove(&*doomed_layer);
|
||||
layers.remove_historic(&*doomed_layer);
|
||||
|
||||
if doomed_layer.is_dropped() {
|
||||
if doomed_layer.get_seg_tag().rel.is_relation() {
|
||||
|
||||
@@ -2,15 +2,16 @@
|
||||
//! An in-memory layer stores recently received page versions in memory. The page versions
|
||||
//! are held in a BTreeMap, and there's another BTreeMap to track the size of the relation.
|
||||
//!
|
||||
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag, RELISH_SEG_SIZE,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::layered_repository::SnapshotLayer;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
use anyhow::{bail, Result};
|
||||
use bytes::Bytes;
|
||||
use log::*;
|
||||
use std::collections::BTreeMap;
|
||||
use std::ops::Bound::Included;
|
||||
@@ -53,10 +54,6 @@ pub struct InMemoryLayerInner {
|
||||
}
|
||||
|
||||
impl Layer for InMemoryLayer {
|
||||
fn is_frozen(&self) -> bool {
|
||||
return false;
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
return self.timelineid;
|
||||
}
|
||||
@@ -159,12 +156,69 @@ impl Layer for InMemoryLayer {
|
||||
// Otherwise, it exists
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
///
|
||||
/// Create a new, empty, in-memory layer
|
||||
///
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
seg: SegmentTag,
|
||||
start_lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
|
||||
seg,
|
||||
timelineid,
|
||||
start_lsn
|
||||
);
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
// Write operations
|
||||
|
||||
/// Remember new page version, as a WAL record over previous version
|
||||
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<()> {
|
||||
self.put_page_version(
|
||||
blknum,
|
||||
rec.lsn,
|
||||
PageVersion {
|
||||
page_image: None,
|
||||
record: Some(rec),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Remember new page version, as a full page image
|
||||
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||
self.put_page_version(
|
||||
blknum,
|
||||
lsn,
|
||||
PageVersion {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
|
||||
/// Adds the page version to the in-memory tree
|
||||
fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> {
|
||||
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()> {
|
||||
assert!(self.seg.blknum_in_seg(blknum));
|
||||
|
||||
trace!(
|
||||
@@ -215,7 +269,7 @@ impl Layer for InMemoryLayer {
|
||||
}
|
||||
|
||||
/// Remember that the relation was truncated at given LSN
|
||||
fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> {
|
||||
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
let old = inner.segsizes.insert(lsn, segsize);
|
||||
|
||||
@@ -227,8 +281,8 @@ impl Layer for InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Remember that the relation was dropped at given LSN
|
||||
fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
/// Remember that the segment was dropped at given LSN
|
||||
pub fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
|
||||
assert!(inner.drop_lsn.is_none());
|
||||
@@ -239,6 +293,63 @@ impl Layer for InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Initialize a new InMemoryLayer for, by copying the state at the given
|
||||
/// point in time from given existing layer.
|
||||
///
|
||||
pub fn copy_snapshot(
|
||||
conf: &'static PageServerConf,
|
||||
timeline: &LayeredTimeline,
|
||||
src: &dyn Layer,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
|
||||
src.get_seg_tag(),
|
||||
timelineid,
|
||||
lsn
|
||||
);
|
||||
let mut page_versions = BTreeMap::new();
|
||||
let mut segsizes = BTreeMap::new();
|
||||
|
||||
let seg = src.get_seg_tag();
|
||||
|
||||
let startblk;
|
||||
let size;
|
||||
if seg.rel.is_blocky() {
|
||||
size = src.get_seg_size(lsn)?;
|
||||
segsizes.insert(lsn, size);
|
||||
startblk = seg.segno * RELISH_SEG_SIZE;
|
||||
} else {
|
||||
size = 1;
|
||||
startblk = 0;
|
||||
}
|
||||
|
||||
for blknum in startblk..(startblk + size) {
|
||||
let img = timeline.materialize_page(seg, blknum, lsn, src)?;
|
||||
let pv = PageVersion {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
};
|
||||
page_versions.insert((blknum, lsn), pv);
|
||||
}
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg: src.get_seg_tag(),
|
||||
start_lsn: lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: page_versions,
|
||||
segsizes: segsizes,
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Write the this in-memory layer to disk, as a snapshot layer.
|
||||
///
|
||||
@@ -250,12 +361,12 @@ impl Layer for InMemoryLayer {
|
||||
/// in-memory layer containing those page versions. The caller replaces
|
||||
/// this layer with the returned layers in the layer map.
|
||||
///
|
||||
fn freeze(
|
||||
pub fn freeze(
|
||||
&self,
|
||||
cutoff_lsn: Lsn,
|
||||
// This is needed just to call materialize_page()
|
||||
timeline: &LayeredTimeline,
|
||||
) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
) -> Result<(Option<Arc<SnapshotLayer>>, Option<Arc<InMemoryLayer>>)> {
|
||||
info!(
|
||||
"freezing in memory layer for {} on timeline {} at {}",
|
||||
self.seg, self.timelineid, cutoff_lsn
|
||||
@@ -323,14 +434,13 @@ impl Layer for InMemoryLayer {
|
||||
before_page_versions,
|
||||
before_segsizes,
|
||||
)?;
|
||||
let mut result: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
|
||||
// If there were any page versions after the cutoff, initialize a new in-memory layer
|
||||
// to hold them
|
||||
if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
// If there were any "new" page versions, initialize a new in-memory layer to hold
|
||||
// them
|
||||
let new_open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
|
||||
info!("created new in-mem layer for {} {}-", self.seg, end_lsn);
|
||||
|
||||
let new_layer = Self::copy_snapshot(
|
||||
let new_open = Self::copy_snapshot(
|
||||
self.conf,
|
||||
timeline,
|
||||
&snapfile,
|
||||
@@ -338,116 +448,19 @@ impl Layer for InMemoryLayer {
|
||||
self.tenantid,
|
||||
end_lsn,
|
||||
)?;
|
||||
let mut new_inner = new_layer.inner.lock().unwrap();
|
||||
let mut new_inner = new_open.inner.lock().unwrap();
|
||||
new_inner.page_versions.append(&mut after_page_versions);
|
||||
new_inner.segsizes.append(&mut after_segsizes);
|
||||
drop(new_inner);
|
||||
|
||||
result.push(Arc::new(new_layer));
|
||||
}
|
||||
result.push(Arc::new(snapfile));
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// Nothing to do. When the reference is dropped, the memory is released.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn unload(&self) -> Result<()> {
|
||||
// cannot unload in-memory layer. Freeze instead
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl InMemoryLayer {
|
||||
///
|
||||
/// Create a new, empty, in-memory layer
|
||||
///
|
||||
pub fn create(
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
seg: SegmentTag,
|
||||
start_lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new empty InMemoryLayer for writing {} on timeline {} at {}",
|
||||
seg,
|
||||
timelineid,
|
||||
start_lsn
|
||||
);
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg,
|
||||
start_lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: BTreeMap::new(),
|
||||
segsizes: BTreeMap::new(),
|
||||
}),
|
||||
})
|
||||
}
|
||||
|
||||
///
|
||||
/// Initialize a new InMemoryLayer for, by copying the state at the given
|
||||
/// point in time from given existing layer.
|
||||
///
|
||||
pub fn copy_snapshot(
|
||||
conf: &'static PageServerConf,
|
||||
timeline: &LayeredTimeline,
|
||||
src: &dyn Layer,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
lsn: Lsn,
|
||||
) -> Result<InMemoryLayer> {
|
||||
trace!(
|
||||
"initializing new InMemoryLayer for writing {} on timeline {} at {}",
|
||||
src.get_seg_tag(),
|
||||
timelineid,
|
||||
lsn
|
||||
);
|
||||
let mut page_versions = BTreeMap::new();
|
||||
let mut segsizes = BTreeMap::new();
|
||||
|
||||
let seg = src.get_seg_tag();
|
||||
|
||||
let startblk;
|
||||
let size;
|
||||
if seg.rel.is_blocky() {
|
||||
size = src.get_seg_size(lsn)?;
|
||||
segsizes.insert(lsn, size);
|
||||
startblk = seg.segno * RELISH_SEG_SIZE;
|
||||
Some(Arc::new(new_open))
|
||||
} else {
|
||||
size = 1;
|
||||
startblk = 0;
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
for blknum in startblk..(startblk + size) {
|
||||
let img = timeline.materialize_page(seg, blknum, lsn, src)?;
|
||||
let pv = PageVersion {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
};
|
||||
page_versions.insert((blknum, lsn), pv);
|
||||
}
|
||||
let new_historic = Some(Arc::new(snapfile));
|
||||
|
||||
Ok(InMemoryLayer {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
seg: src.get_seg_tag(),
|
||||
start_lsn: lsn,
|
||||
inner: Mutex::new(InMemoryLayerInner {
|
||||
drop_lsn: None,
|
||||
page_versions: page_versions,
|
||||
segsizes: segsizes,
|
||||
}),
|
||||
})
|
||||
Ok((new_historic, new_open))
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
|
||||
@@ -10,19 +10,29 @@
|
||||
//!
|
||||
|
||||
use crate::layered_repository::storage_layer::{Layer, SegmentTag};
|
||||
use crate::layered_repository::{InMemoryLayer, SnapshotLayer};
|
||||
use crate::relish::*;
|
||||
use anyhow::Result;
|
||||
use log::*;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::ops::Bound::Included;
|
||||
use std::sync::Arc;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
/// LayerMap is a BTreeMap keyed by SegmentTag and the layer's start LSN.
|
||||
/// It provides a couple of convenience functions over a plain BTreeMap
|
||||
///
|
||||
/// LayerMap tracks what layers exist or a timeline. The last layer that is
|
||||
/// open for writes is always an InMemoryLayer, and is tracked separately
|
||||
/// because there can be only one for each segment. The older layers,
|
||||
/// stored on disk, are kept in a BTreeMap keyed by the layer's start LSN.
|
||||
///
|
||||
pub struct LayerMap {
|
||||
pub inner: BTreeMap<(SegmentTag, Lsn), Arc<dyn Layer>>,
|
||||
segs: HashMap<SegmentTag, SegEntry>,
|
||||
}
|
||||
|
||||
struct SegEntry {
|
||||
pub open: Option<Arc<InMemoryLayer>>,
|
||||
pub historic: BTreeMap<Lsn, Arc<SnapshotLayer>>,
|
||||
}
|
||||
|
||||
impl LayerMap {
|
||||
@@ -32,40 +42,101 @@ impl LayerMap {
|
||||
/// given LSN, or precedes the given LSN, it is returned. In other words,
|
||||
/// you don't need to know the exact start LSN of the layer.
|
||||
///
|
||||
pub fn get(&self, tag: SegmentTag, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
let startkey = (tag, Lsn(0));
|
||||
let endkey = (tag, lsn);
|
||||
pub fn get(&self, tag: &SegmentTag, lsn: Lsn) -> Option<Arc<dyn Layer>> {
|
||||
let segentry = self.segs.get(tag)?;
|
||||
|
||||
if let Some((_k, v)) = self
|
||||
.inner
|
||||
.range((Included(startkey), Included(endkey)))
|
||||
if let Some(open) = &segentry.open {
|
||||
if open.get_start_lsn() <= lsn {
|
||||
let x: Arc<dyn Layer> = Arc::clone(&open) as _;
|
||||
return Some(x);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((_k, v)) = segentry
|
||||
.historic
|
||||
.range((Included(Lsn(0)), Included(lsn)))
|
||||
.next_back()
|
||||
{
|
||||
Some(Arc::clone(v))
|
||||
let x: Arc<dyn Layer> = Arc::clone(&v) as _;
|
||||
Some(x)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, layer: Arc<dyn Layer>) {
|
||||
let seg = layer.get_seg_tag();
|
||||
let start_lsn = layer.get_start_lsn();
|
||||
///
|
||||
/// Get the open layer for given segment for writing. Or None if no open
|
||||
/// layer exists.
|
||||
///
|
||||
pub fn get_open(&self, tag: &SegmentTag) -> Option<Arc<InMemoryLayer>> {
|
||||
let segentry = self.segs.get(tag)?;
|
||||
|
||||
self.inner.insert((seg, start_lsn), Arc::clone(&layer));
|
||||
if let Some(open) = &segentry.open {
|
||||
Some(Arc::clone(open))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, layer: &dyn Layer) {
|
||||
let seg = layer.get_seg_tag();
|
||||
///
|
||||
/// Insert an open in-memory layer
|
||||
///
|
||||
pub fn insert_open(&mut self, layer: Arc<InMemoryLayer>) {
|
||||
let tag = layer.get_seg_tag();
|
||||
|
||||
if let Some(segentry) = self.segs.get_mut(&tag) {
|
||||
if let Some(_old) = &segentry.open {
|
||||
// FIXME: shouldn't exist, but check
|
||||
}
|
||||
segentry.open = Some(layer);
|
||||
} else {
|
||||
let segentry = SegEntry {
|
||||
open: Some(layer),
|
||||
historic: BTreeMap::new(),
|
||||
};
|
||||
self.segs.insert(tag, segentry);
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Insert an on-disk layer
|
||||
///
|
||||
pub fn insert_historic(&mut self, layer: Arc<SnapshotLayer>) {
|
||||
let tag = layer.get_seg_tag();
|
||||
let start_lsn = layer.get_start_lsn();
|
||||
|
||||
self.inner.remove(&(seg, start_lsn));
|
||||
if let Some(segentry) = self.segs.get_mut(&tag) {
|
||||
segentry.historic.insert(start_lsn, layer);
|
||||
} else {
|
||||
let mut historic = BTreeMap::new();
|
||||
historic.insert(start_lsn, layer);
|
||||
|
||||
let segentry = SegEntry {
|
||||
open: None,
|
||||
historic,
|
||||
};
|
||||
self.segs.insert(tag, segentry);
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Remove an on-disk layer from the map.
|
||||
///
|
||||
/// This should be called when the corresponding file on disk has been deleted.
|
||||
///
|
||||
pub fn remove_historic(&mut self, layer: &SnapshotLayer) {
|
||||
let tag = layer.get_seg_tag();
|
||||
let start_lsn = layer.get_start_lsn();
|
||||
|
||||
if let Some(segentry) = self.segs.get_mut(&tag) {
|
||||
segentry.historic.remove(&start_lsn);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn list_rels(&self, spcnode: u32, dbnode: u32) -> Result<HashSet<RelTag>> {
|
||||
let mut rels: HashSet<RelTag> = HashSet::new();
|
||||
|
||||
// Scan the timeline directory to get all rels in this timeline.
|
||||
for ((seg, _lsn), _l) in self.inner.iter() {
|
||||
for (seg, _entry) in self.segs.iter() {
|
||||
if let RelishTag::Relation(reltag) = seg.rel {
|
||||
// FIXME: skip if it was dropped before the requested LSN. But there is no
|
||||
// LSN argument
|
||||
@@ -84,7 +155,7 @@ impl LayerMap {
|
||||
let mut rels: HashSet<RelishTag> = HashSet::new();
|
||||
|
||||
// Scan the timeline directory to get all rels in this timeline.
|
||||
for ((seg, _lsn), _l) in self.inner.iter() {
|
||||
for (seg, _entry) in self.segs.iter() {
|
||||
// FIXME: skip if it was dropped before the requested LSN.
|
||||
|
||||
if let RelishTag::Relation(_) = seg.rel {
|
||||
@@ -97,35 +168,114 @@ impl LayerMap {
|
||||
|
||||
/// Is there a newer layer for given segment?
|
||||
pub fn newer_layer_exists(&self, seg: SegmentTag, lsn: Lsn) -> bool {
|
||||
let startkey = (seg, lsn);
|
||||
let endkey = (seg, Lsn(u64::MAX));
|
||||
|
||||
for ((_newer_seg, newer_lsn), layer) in
|
||||
self.inner.range((Included(startkey), Included(endkey)))
|
||||
{
|
||||
if layer.get_end_lsn() > lsn {
|
||||
trace!(
|
||||
"found later layer for {}, {} {}-{}",
|
||||
seg,
|
||||
lsn,
|
||||
newer_lsn,
|
||||
layer.get_end_lsn()
|
||||
);
|
||||
if let Some(segentry) = self.segs.get(&seg) {
|
||||
if let Some(_open) = &segentry.open {
|
||||
return true;
|
||||
} else {
|
||||
trace!("found singleton layer for {}, {} {}", seg, lsn, newer_lsn);
|
||||
continue;
|
||||
}
|
||||
|
||||
for (newer_lsn, layer) in segentry
|
||||
.historic
|
||||
.range((Included(lsn), Included(Lsn(u64::MAX))))
|
||||
{
|
||||
if layer.get_end_lsn() > lsn {
|
||||
trace!(
|
||||
"found later layer for {}, {} {}-{}",
|
||||
seg,
|
||||
lsn,
|
||||
newer_lsn,
|
||||
layer.get_end_lsn()
|
||||
);
|
||||
return true;
|
||||
} else {
|
||||
trace!("found singleton layer for {}, {} {}", seg, lsn, newer_lsn);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
trace!("no later layer found for {}, {}", seg, lsn);
|
||||
false
|
||||
}
|
||||
|
||||
pub fn iter_open_layers(&mut self) -> OpenLayerIter {
|
||||
OpenLayerIter {
|
||||
last: None,
|
||||
segiter: self.segs.iter_mut(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iter_historic_layers(&self) -> HistoricLayerIter {
|
||||
HistoricLayerIter {
|
||||
segiter: self.segs.iter(),
|
||||
iter: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LayerMap {
|
||||
fn default() -> Self {
|
||||
LayerMap {
|
||||
inner: BTreeMap::new(),
|
||||
segs: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpenLayerIter<'a> {
|
||||
last: Option<&'a mut SegEntry>,
|
||||
|
||||
segiter: std::collections::hash_map::IterMut<'a, SegmentTag, SegEntry>,
|
||||
}
|
||||
|
||||
impl<'a> OpenLayerIter<'a> {
|
||||
pub fn replace(&mut self, replacement: Option<Arc<InMemoryLayer>>) {
|
||||
let segentry = self.last.as_mut().unwrap();
|
||||
segentry.open = replacement;
|
||||
}
|
||||
|
||||
pub fn insert_historic(&mut self, new_layer: Arc<SnapshotLayer>) {
|
||||
let start_lsn = new_layer.get_start_lsn();
|
||||
|
||||
let segentry = self.last.as_mut().unwrap();
|
||||
segentry.historic.insert(start_lsn, new_layer);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Iterator for OpenLayerIter<'a> {
|
||||
type Item = Arc<InMemoryLayer>;
|
||||
|
||||
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
|
||||
while let Some((_seg, entry)) = self.segiter.next() {
|
||||
if let Some(open) = &entry.open {
|
||||
let op = Arc::clone(&open);
|
||||
self.last = Some(entry);
|
||||
return Some(op);
|
||||
}
|
||||
}
|
||||
self.last = None;
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HistoricLayerIter<'a> {
|
||||
segiter: std::collections::hash_map::Iter<'a, SegmentTag, SegEntry>,
|
||||
iter: Option<std::collections::btree_map::Iter<'a, Lsn, Arc<SnapshotLayer>>>,
|
||||
}
|
||||
|
||||
impl<'a> Iterator for HistoricLayerIter<'a> {
|
||||
type Item = Arc<SnapshotLayer>;
|
||||
|
||||
fn next(&mut self) -> std::option::Option<<Self as std::iter::Iterator>::Item> {
|
||||
loop {
|
||||
if let Some(x) = &mut self.iter {
|
||||
if let Some(x) = x.next() {
|
||||
return Some(Arc::clone(&*x.1));
|
||||
}
|
||||
}
|
||||
if let Some(seg) = self.segiter.next() {
|
||||
self.iter = Some(seg.1.historic.iter());
|
||||
continue;
|
||||
} else {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,6 @@
|
||||
use crate::layered_repository::storage_layer::{
|
||||
Layer, PageReconstructData, PageVersion, SegmentTag,
|
||||
};
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::relish::*;
|
||||
use crate::PageServerConf;
|
||||
use crate::{ZTenantId, ZTimelineId};
|
||||
@@ -244,10 +243,6 @@ pub struct SnapshotLayerInner {
|
||||
}
|
||||
|
||||
impl Layer for SnapshotLayer {
|
||||
fn is_frozen(&self) -> bool {
|
||||
return true;
|
||||
}
|
||||
|
||||
fn get_timeline_id(&self) -> ZTimelineId {
|
||||
return self.timelineid;
|
||||
}
|
||||
@@ -344,43 +339,6 @@ impl Layer for SnapshotLayer {
|
||||
// Otherwise, it exists.
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
// Unsupported write operations
|
||||
fn put_page_version(&self, blknum: u32, lsn: Lsn, _pv: PageVersion) -> Result<()> {
|
||||
panic!(
|
||||
"cannot modify historical snapshot layer, {} blk {} at {}/{}, {}-{}",
|
||||
self.seg, blknum, self.timelineid, lsn, self.start_lsn, self.end_lsn
|
||||
);
|
||||
}
|
||||
fn put_truncation(&self, _lsn: Lsn, _relsize: u32) -> anyhow::Result<()> {
|
||||
bail!("cannot modify historical snapshot layer");
|
||||
}
|
||||
|
||||
fn put_unlink(&self, _lsn: Lsn) -> anyhow::Result<()> {
|
||||
bail!("cannot modify historical snapshot layer");
|
||||
}
|
||||
|
||||
fn freeze(&self, _end_lsn: Lsn, _timeline: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
bail!("cannot freeze historical snapshot layer");
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Release most of the memory used by this layer. If it's accessed again later,
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SnapshotLayer {
|
||||
@@ -520,10 +478,10 @@ impl SnapshotLayer {
|
||||
conf: &'static PageServerConf,
|
||||
timelineid: ZTimelineId,
|
||||
tenantid: ZTenantId,
|
||||
) -> Result<Vec<Arc<dyn Layer>>> {
|
||||
) -> Result<Vec<Arc<SnapshotLayer>>> {
|
||||
let path = conf.timeline_path(&timelineid, &tenantid);
|
||||
|
||||
let mut snapfiles: Vec<Arc<dyn Layer>> = Vec::new();
|
||||
let mut snapfiles: Vec<Arc<SnapshotLayer>> = Vec::new();
|
||||
for direntry in fs::read_dir(path)? {
|
||||
let fname = direntry?.file_name();
|
||||
let fname = fname.to_str().unwrap();
|
||||
@@ -550,6 +508,24 @@ impl SnapshotLayer {
|
||||
return Ok(snapfiles);
|
||||
}
|
||||
|
||||
pub fn delete(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Release most of the memory used by this layer. If it's accessed again later,
|
||||
/// it will need to be loaded back.
|
||||
///
|
||||
pub fn unload(&self) -> Result<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.page_versions = BTreeMap::new();
|
||||
inner.relsizes = BTreeMap::new();
|
||||
inner.loaded = false;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// debugging function to print out the contents of the layer
|
||||
#[allow(unused)]
|
||||
pub fn dump(&self) -> String {
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
//! Common traits and structs for layers
|
||||
//!
|
||||
|
||||
use crate::layered_repository::LayeredTimeline;
|
||||
use crate::relish::RelishTag;
|
||||
use crate::repository::WALRecord;
|
||||
use crate::ZTimelineId;
|
||||
@@ -10,7 +9,6 @@ use anyhow::Result;
|
||||
use bytes::Bytes;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
@@ -104,12 +102,6 @@ pub trait Layer: Send + Sync {
|
||||
fn get_end_lsn(&self) -> Lsn;
|
||||
fn is_dropped(&self) -> bool;
|
||||
|
||||
/// Frozen layers are stored on disk, an cannot accept cannot accept new WAL
|
||||
/// records, whereas an unfrozen layer can still be modified, but is not
|
||||
/// durable in case of a crash. Snapshot layers are always frozen, and
|
||||
/// in-memory layers are always unfrozen.
|
||||
fn is_frozen(&self) -> bool;
|
||||
|
||||
///
|
||||
/// Return data needed to reconstruct given page at LSN.
|
||||
///
|
||||
@@ -133,50 +125,4 @@ pub trait Layer: Send + Sync {
|
||||
fn get_seg_size(&self, lsn: Lsn) -> Result<u32>;
|
||||
|
||||
fn get_seg_exists(&self, lsn: Lsn) -> Result<bool>;
|
||||
|
||||
fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<()>;
|
||||
|
||||
fn put_truncation(&self, lsn: Lsn, relsize: u32) -> anyhow::Result<()>;
|
||||
|
||||
fn put_unlink(&self, lsn: Lsn) -> anyhow::Result<()>;
|
||||
|
||||
/// Remember new page version, as a WAL record over previous version
|
||||
fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<()> {
|
||||
self.put_page_version(
|
||||
blknum,
|
||||
rec.lsn,
|
||||
PageVersion {
|
||||
page_image: None,
|
||||
record: Some(rec),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
/// Remember new page version, as a full page image
|
||||
fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<()> {
|
||||
self.put_page_version(
|
||||
blknum,
|
||||
lsn,
|
||||
PageVersion {
|
||||
page_image: Some(img),
|
||||
record: None,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
///
|
||||
/// Split off an immutable layer from existing layer.
|
||||
///
|
||||
/// Returns new layers that replace this one.
|
||||
///
|
||||
fn freeze(&self, end_lsn: Lsn, walredo_mgr: &LayeredTimeline) -> Result<Vec<Arc<dyn Layer>>>;
|
||||
|
||||
/// Permanently delete this layer
|
||||
fn delete(&self) -> Result<()>;
|
||||
|
||||
/// Try to release memory used by this layer. This is currently
|
||||
/// only used by snapshot layers, to free the copy of the file
|
||||
/// from memory. (TODO: a smarter, more granular caching scheme
|
||||
/// would be nice)
|
||||
fn unload(&self) -> Result<()>;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user