pageserver - use write guard for checkpointing

This commit is contained in:
Patrick Insinger
2021-10-08 17:27:08 -07:00
committed by Patrick Insinger
parent 6e5ca5dc5c
commit 160c4aff61
3 changed files with 142 additions and 310 deletions

View File

@@ -33,7 +33,6 @@ use std::sync::{Arc, Mutex, MutexGuard};
use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use crate::layered_repository::inmemory_layer::FreezeLayers;
use crate::relish::*;
use crate::relish_storage::schedule_timeline_upload;
use crate::repository::{GcResult, Repository, Timeline, TimelineWriter, WALRecord};
@@ -72,8 +71,6 @@ use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
};
use self::inmemory_layer::{NonWriteableError, WriteResult};
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
@@ -670,6 +667,13 @@ pub struct LayeredTimeline {
/// If `true`, will backup its timeline files to remote storage after freezing.
upload_relishes: bool,
/// Ensures layers aren't frozen by checkpointer between
/// [`LayeredTimeline::get_layer_for_write`] and layer reads.
/// Locked automatically by [`LayeredTimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
}
/// Public interface functions
@@ -903,7 +907,10 @@ impl Timeline for LayeredTimeline {
}
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
Box::new(LayeredTimelineWriter(self))
Box::new(LayeredTimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
})
}
}
@@ -945,6 +952,8 @@ impl LayeredTimeline {
current_logical_size: AtomicUsize::new(current_logical_size),
current_logical_size_gauge,
upload_relishes,
write_lock: Mutex::new(()),
};
Ok(timeline)
}
@@ -1219,18 +1228,13 @@ impl LayeredTimeline {
///
/// NOTE: This has nothing to do with checkpoint in PostgreSQL.
fn checkpoint_internal(&self, checkpoint_distance: u64, forced: bool) -> Result<()> {
// Grab lock on the layer map.
//
// TODO: We hold it locked throughout the checkpoint operation. That's bad,
// the checkpointing could take many seconds, and any incoming get_page_at_lsn()
// requests will block.
let mut write_guard = self.write_lock.lock().unwrap();
let mut layers = self.layers.lock().unwrap();
// Bump the generation number in the layer map, so that we can distinguish
// entries inserted after the checkpoint started
let current_generation = layers.increment_generation();
// Read 'last_record_lsn'. That becomes the cutoff LSN for frozen layers.
let RecordLsn {
last: last_record_lsn,
prev: prev_record_lsn,
@@ -1280,32 +1284,24 @@ impl LayeredTimeline {
break;
}
// Freeze the layer.
//
// This is a two-step process. First, we "freeze" the in-memory
// layer, to close it for new writes, and replace the original
// layer with the new frozen in-memory layer (and possibly a new
// open layer to hold changes newer than the cutoff.) Then we write
// the frozen layer to disk, and replace the in-memory frozen layer
// with the new on-disk layers.
let FreezeLayers {
frozen,
open: maybe_new_open,
} = oldest_layer.freeze(last_record_lsn)?;
// Mark the layer as no longer accepting writes and record the end_lsn.
// This happens in-place, no new layers are created now.
// We call `get_last_record_lsn` again, which may be different from the
// original load, as we may have released the write lock since then.
oldest_layer.freeze(self.get_last_record_lsn());
// replace this layer with the new layers that 'freeze' returned
// The layer is no longer open, update the layer map to reflect this.
// We will replace it with on-disk historics below.
layers.pop_oldest_open();
if let Some(new_open) = maybe_new_open.clone() {
layers.insert_open(new_open);
}
// We temporarily insert InMemory layer into historic list here.
// TODO: check that all possible concurrent users of 'historic' treat it right
layers.insert_historic(frozen.clone());
layers.insert_historic(oldest_layer.clone());
// Write the now-frozen layer to disk. That could take a while, so release the lock while do it
drop(layers);
let new_historics = frozen.write_to_disk(self)?;
drop(write_guard);
let new_historics = oldest_layer.write_to_disk(self)?;
write_guard = self.write_lock.lock().unwrap();
layers = self.layers.lock().unwrap();
if !new_historics.is_empty() {
@@ -1313,7 +1309,7 @@ impl LayeredTimeline {
}
// Finally, replace the frozen in-memory layer with the new on-disk layers
layers.remove_historic(frozen.clone());
layers.remove_historic(oldest_layer);
// Add the historics to the LayerMap
for delta_layer in new_historics.delta_layers {
@@ -1334,6 +1330,7 @@ impl LayeredTimeline {
}
drop(layers);
drop(write_guard);
if created_historics {
// We must fsync the timeline dir to ensure the directory entries for
@@ -1760,46 +1757,18 @@ impl LayeredTimeline {
self.current_logical_size_gauge
.set(val as i64 - diff as i64);
}
/// If a layer is in the process of being replaced in [`LayerMap`], write
/// operations will fail with [`NonWriteableError`]. This may happen due to
/// a race: the checkpointer thread freezes a layer just after
/// [`Self::get_layer_for_write`] returned it. To handle this error, we try
/// again getting the layer and attempt the write.
fn perform_write_op<R>(
&self,
seg: SegmentTag,
lsn: Lsn,
write_op: impl Fn(&Arc<InMemoryLayer>) -> WriteResult<R>,
) -> anyhow::Result<R> {
let mut layer = self.get_layer_for_write(seg, lsn)?;
loop {
match write_op(&layer) {
Ok(r) => return Ok(r),
Err(NonWriteableError {}) => {}
}
info!(
"attempted to write to non-writeable layer, retrying {} {}",
seg, lsn
);
// layer was non-writeable, try again
let new_layer = self.get_layer_for_write(seg, lsn)?;
// the new layer does not have to be writeable, but it should at least be different
assert!(!Arc::ptr_eq(&layer, &new_layer));
layer = new_layer;
}
}
}
struct LayeredTimelineWriter<'a>(&'a LayeredTimeline);
struct LayeredTimelineWriter<'a> {
tl: &'a LayeredTimeline,
_write_guard: MutexGuard<'a, ()>,
}
impl Deref for LayeredTimelineWriter<'_> {
type Target = dyn Timeline;
fn deref(&self) -> &Self::Target {
self.0
self.tl
}
}
@@ -1815,10 +1784,9 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
ensure!(rec.lsn.is_aligned(), "unaligned record LSN");
let seg = SegmentTag::from_blknum(rel, blknum);
let delta_size = self.0.perform_write_op(seg, rec.lsn, |layer| {
layer.put_wal_record(blknum, rec.clone())
})?;
self.0
let layer = self.tl.get_layer_for_write(seg, rec.lsn)?;
let delta_size = layer.put_wal_record(blknum, rec);
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
}
@@ -1835,11 +1803,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
let seg = SegmentTag::from_blknum(rel, blknum);
let delta_size = self.0.perform_write_op(seg, lsn, |layer| {
layer.put_page_image(blknum, lsn, img.clone())
})?;
let layer = self.tl.get_layer_for_write(seg, lsn)?;
let delta_size = layer.put_page_image(blknum, lsn, img);
self.0
self.tl
.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
}
@@ -1853,8 +1820,8 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
debug!("put_truncation: {} to {} blocks at {}", rel, relsize, lsn);
let oldsize = self
.0
.get_relish_size(rel, self.0.get_last_record_lsn())?
.tl
.get_relish_size(rel, self.tl.get_last_record_lsn())?
.ok_or_else(|| {
anyhow!(
"attempted to truncate non-existent relish {} at {}",
@@ -1880,8 +1847,9 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
rel,
segno: remove_segno,
};
self.0
.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
}
// Truncate the last remaining segment to the specified size
@@ -1890,11 +1858,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
rel,
segno: last_remain_seg,
};
self.0.perform_write_op(seg, lsn, |layer| {
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
})?;
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
}
self.0
self.tl
.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
Ok(())
}
@@ -1903,7 +1870,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
trace!("drop_segment: {} at {}", rel, lsn);
if rel.is_blocky() {
if let Some(oldsize) = self.0.get_relish_size(rel, self.0.get_last_record_lsn())? {
if let Some(oldsize) = self
.tl
.get_relish_size(rel, self.tl.get_last_record_lsn())?
{
let old_last_seg = if oldsize == 0 {
0
} else {
@@ -1916,10 +1886,10 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
rel,
segno: remove_segno,
};
self.0
.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
}
self.0
self.tl
.decrease_current_logical_size(oldsize * BLCKSZ as u32);
} else {
warn!(
@@ -1930,8 +1900,8 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
} else {
// TODO handle TwoPhase relishes
let seg = SegmentTag::from_blknum(rel, 0);
self.0
.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
let layer = self.tl.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn);
}
Ok(())
@@ -1943,7 +1913,7 @@ impl<'a> TimelineWriter for LayeredTimelineWriter<'a> {
fn advance_last_record_lsn(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
self.0.last_record_lsn.advance(new_lsn);
self.tl.last_record_lsn.advance(new_lsn);
}
}

View File

@@ -15,12 +15,10 @@ use crate::{ZTenantId, ZTimelineId};
use anyhow::{bail, ensure, Result};
use bytes::Bytes;
use log::*;
use std::cmp::min;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use zenith_utils::vec_map::VecMap;
use zenith_utils::accum::Accum;
use zenith_utils::lsn::Lsn;
use super::page_versions::PageVersions;
@@ -37,9 +35,6 @@ pub struct InMemoryLayer {
///
start_lsn: Lsn,
/// Frozen in-memory layers have an inclusive end LSN.
end_lsn: Option<Lsn>,
/// LSN of the oldest page version stored in this layer
oldest_pending_lsn: Lsn,
@@ -52,8 +47,13 @@ pub struct InMemoryLayer {
}
pub struct InMemoryLayerInner {
/// Frozen in-memory layers have an exclusive end LSN.
/// Writes are only allowed when this is None
end_lsn: Option<Lsn>,
/// If this relation was dropped, remember when that happened.
drop_lsn: Option<Lsn>,
/// The drop LSN is recorded in [`end_lsn`].
dropped: bool,
///
/// All versions of all pages in the layer are are kept here.
@@ -69,19 +69,11 @@ pub struct InMemoryLayerInner {
/// a non-blocky rel, 'segsizes' is not used and is always empty.
///
segsizes: VecMap<Lsn, u32>,
/// Writes are only allowed when true.
/// Set to false when this layer is in the process of being replaced.
writeable: bool,
}
impl InMemoryLayerInner {
fn check_writeable(&self) -> WriteResult<()> {
if self.writeable {
Ok(())
} else {
Err(NonWriteableError)
}
fn assert_writeable(&self) {
assert!(self.end_lsn.is_none());
}
fn get_seg_size(&self, lsn: Lsn) -> u32 {
@@ -104,20 +96,17 @@ impl Layer for InMemoryLayer {
let inner = self.inner.read().unwrap();
let end_lsn;
let dropped;
if let Some(drop_lsn) = inner.drop_lsn {
if let Some(drop_lsn) = inner.end_lsn {
end_lsn = drop_lsn;
dropped = true;
} else {
end_lsn = Lsn(u64::MAX);
dropped = false;
}
let delta_filename = DeltaFileName {
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn,
dropped,
dropped: inner.dropped,
}
.to_string();
@@ -137,14 +126,10 @@ impl Layer for InMemoryLayer {
}
fn get_end_lsn(&self) -> Lsn {
if let Some(end_lsn) = self.end_lsn {
return Lsn(end_lsn.0 + 1);
}
let inner = self.inner.read().unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
drop_lsn
if let Some(end_lsn) = inner.end_lsn {
end_lsn
} else {
Lsn(u64::MAX)
}
@@ -152,7 +137,7 @@ impl Layer for InMemoryLayer {
fn is_dropped(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.drop_lsn.is_some()
inner.dropped
}
/// Look up given page in the cache.
@@ -230,8 +215,8 @@ impl Layer for InMemoryLayer {
assert!(lsn >= self.start_lsn);
// Is the requested LSN after the segment was dropped?
if let Some(drop_lsn) = inner.drop_lsn {
if lsn >= drop_lsn {
if let Some(end_lsn) = inner.end_lsn {
if lsn >= end_lsn {
return Ok(false);
}
}
@@ -262,14 +247,14 @@ impl Layer for InMemoryLayer {
let inner = self.inner.read().unwrap();
let end_str = inner
.drop_lsn
.end_lsn
.as_ref()
.map(|drop_lsn| drop_lsn.to_string())
.map(Lsn::to_string)
.unwrap_or_default();
println!(
"----- in-memory layer for tli {} seg {} {}-{} ----",
self.timelineid, self.seg, self.start_lsn, end_str
"----- in-memory layer for tli {} seg {} {}-{} {} ----",
self.timelineid, self.seg, self.start_lsn, end_str, inner.dropped,
);
for (k, v) in inner.segsizes.as_slice() {
@@ -290,21 +275,6 @@ impl Layer for InMemoryLayer {
}
}
/// Write failed because the layer is in process of being replaced.
/// See [`LayeredTimeline::perform_write_op`] for how to handle this error.
#[derive(Debug)]
pub struct NonWriteableError;
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
pub struct FreezeLayers {
/// Replacement layer for the layer which freeze was called on.
pub frozen: Arc<InMemoryLayer>,
/// New open layer containing leftover data.
pub open: Option<Arc<InMemoryLayer>>,
}
/// A result of an inmemory layer data being written to disk.
pub struct LayersOnDisk {
pub delta_layers: Vec<DeltaLayer>,
@@ -318,10 +288,6 @@ impl LayersOnDisk {
}
impl InMemoryLayer {
fn assert_not_frozen(&self) {
assert!(self.end_lsn.is_none());
}
/// Return the oldest page version that's stored in this layer
pub fn get_oldest_pending_lsn(&self) -> Lsn {
self.oldest_pending_lsn
@@ -357,14 +323,13 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: false,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
end_lsn: None,
dropped: false,
page_versions: PageVersions::default(),
segsizes,
writeable: true,
}),
})
}
@@ -372,7 +337,7 @@ impl InMemoryLayer {
// Write operations
/// Remember new page version, as a WAL record over previous version
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> WriteResult<u32> {
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> u32 {
self.put_page_version(
blknum,
rec.lsn,
@@ -384,7 +349,7 @@ impl InMemoryLayer {
}
/// Remember new page version, as a full page image
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> WriteResult<u32> {
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> u32 {
self.put_page_version(
blknum,
lsn,
@@ -397,8 +362,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
self.assert_not_frozen();
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> u32 {
assert!(self.seg.blknum_in_seg(blknum));
trace!(
@@ -410,7 +374,7 @@ impl InMemoryLayer {
);
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
inner.assert_writeable();
let old = inner.page_versions.append_or_update_last(blknum, lsn, pv);
@@ -471,22 +435,22 @@ impl InMemoryLayer {
}
inner.segsizes.append_or_update_last(lsn, newsize).unwrap();
return Ok(newsize - oldsize);
return newsize - oldsize;
}
}
Ok(0)
0
}
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) {
assert!(
self.seg.rel.is_blocky(),
"put_truncation() called on a non-blocky rel"
);
self.assert_not_frozen();
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
inner.assert_writeable();
// check that this we truncate to a smaller size than segment was before the truncation
let oldsize = inner.get_seg_size(lsn);
@@ -498,25 +462,19 @@ impl InMemoryLayer {
// We already had an entry for this LSN. That's odd..
warn!("Inserting truncation, but had an entry for the LSN already");
}
Ok(())
}
/// Remember that the segment was dropped at given LSN
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
self.assert_not_frozen();
pub fn drop_segment(&self, lsn: Lsn) {
let mut inner = self.inner.write().unwrap();
inner.check_writeable()?;
assert!(inner.drop_lsn.is_none());
inner.drop_lsn = Some(lsn);
inner.writeable = false;
assert!(inner.end_lsn.is_none());
assert!(!inner.dropped);
inner.dropped = true;
assert!(self.start_lsn < lsn);
inner.end_lsn = Some(lsn);
trace!("dropped segment {} at {}", self.seg, lsn);
Ok(())
}
///
@@ -556,116 +514,43 @@ impl InMemoryLayer {
tenantid,
seg,
start_lsn,
end_lsn: None,
oldest_pending_lsn,
incremental: true,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: None,
end_lsn: None,
dropped: false,
page_versions: PageVersions::default(),
segsizes,
writeable: true,
}),
})
}
pub fn is_writeable(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.writeable
inner.end_lsn.is_none()
}
/// Splits `self` into two InMemoryLayers: `frozen` and `open`.
/// All data up to and including `cutoff_lsn`
/// is copied to `frozen`, while the remaining data is copied to `open`.
/// After completion, self is non-writeable, but not frozen.
pub fn freeze(self: Arc<Self>, cutoff_lsn: Lsn) -> Result<FreezeLayers> {
info!(
"freezing in-memory layer {} at {} (oldest {})",
self.filename().display(),
cutoff_lsn,
self.oldest_pending_lsn
);
/// Make the layer non-writeable. Only call once.
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is inclusive
pub fn freeze(&self, end_lsn: Lsn) {
let mut inner = self.inner.write().unwrap();
self.assert_not_frozen();
let self_ref = self.clone();
let mut inner = self_ref.inner.write().unwrap();
// Dropped layers don't need any special freeze actions,
// they are marked as non-writeable at drop and just
// written out to disk by checkpointer.
if inner.drop_lsn.is_some() {
assert!(!inner.writeable);
info!(
"freezing in memory layer for {} on timeline {} is dropped at {}",
self.seg,
self.timelineid,
inner.drop_lsn.unwrap()
);
// There should be no newer layer that refers this non-writeable layer,
// because layer that is created after dropped one represents a new rel.
return Ok(FreezeLayers {
frozen: self,
open: None,
});
}
assert!(inner.writeable);
inner.writeable = false;
// Divide all the page versions into old and new
// at the 'cutoff_lsn' point.
let mut after_oldest_lsn: Accum<Lsn> = Accum(None);
let cutoff_lsn_exclusive = Lsn(cutoff_lsn.0 + 1);
let (before_segsizes, mut after_segsizes) = inner.segsizes.split_at(&cutoff_lsn_exclusive);
if let Some((lsn, _size)) = after_segsizes.as_slice().first() {
after_oldest_lsn.accum(min, *lsn);
}
let (before_page_versions, after_page_versions) = inner
.page_versions
.split_at(cutoff_lsn_exclusive, &mut after_oldest_lsn);
let frozen = Arc::new(InMemoryLayer {
conf: self.conf,
tenantid: self.tenantid,
timelineid: self.timelineid,
seg: self.seg,
start_lsn: self.start_lsn,
end_lsn: Some(cutoff_lsn),
oldest_pending_lsn: self.start_lsn,
incremental: self.incremental,
inner: RwLock::new(InMemoryLayerInner {
drop_lsn: inner.drop_lsn,
page_versions: before_page_versions,
segsizes: before_segsizes,
writeable: false,
}),
});
let open = if !after_segsizes.is_empty() || !after_page_versions.is_empty() {
let mut new_open = Self::create_successor_layer(
self.conf,
frozen.clone(),
self.timelineid,
self.tenantid,
cutoff_lsn + 1,
after_oldest_lsn.0.unwrap(),
)?;
let new_inner = new_open.inner.get_mut().unwrap();
// Ensure page_versions doesn't contain anything
// so we can just replace it
assert!(new_inner.page_versions.is_empty());
new_inner.page_versions = after_page_versions;
new_inner.segsizes.extend(&mut after_segsizes).unwrap();
Some(Arc::new(new_open))
if inner.end_lsn.is_some() {
assert!(inner.dropped);
} else {
None
};
assert!(!inner.dropped);
assert!(self.start_lsn < end_lsn + 1);
inner.end_lsn = Some(Lsn(end_lsn.0 + 1));
Ok(FreezeLayers { frozen, open })
if let Some((lsn, _)) = inner.segsizes.as_slice().last() {
assert!(lsn <= &end_lsn, "{:?} {:?}", lsn, end_lsn);
}
for (_blk, lsn, _pv) in inner.page_versions.ordered_page_version_iter(None) {
assert!(lsn <= end_lsn);
}
}
}
/// Write the this frozen in-memory layer to disk.
@@ -678,9 +563,8 @@ impl InMemoryLayer {
/// end LSN are the same.)
pub fn write_to_disk(&self, timeline: &LayeredTimeline) -> Result<LayersOnDisk> {
trace!(
"write_to_disk {} end_lsn is {} get_end_lsn is {}",
"write_to_disk {} get_end_lsn is {}",
self.filename().display(),
self.end_lsn.unwrap_or(Lsn(0)),
self.get_end_lsn()
);
@@ -694,16 +578,16 @@ impl InMemoryLayer {
// would have to wait until we release it. That race condition is very
// rare though, so we just accept the potential latency hit for now.
let inner = self.inner.read().unwrap();
assert!(!inner.writeable);
let end_lsn_exclusive = inner.end_lsn.unwrap();
if let Some(drop_lsn) = inner.drop_lsn {
if inner.dropped {
let delta_layer = DeltaLayer::create(
self.conf,
self.timelineid,
self.tenantid,
self.seg,
self.start_lsn,
drop_lsn,
end_lsn_exclusive,
true,
inner.page_versions.ordered_page_version_iter(None),
inner.segsizes.clone(),
@@ -712,7 +596,7 @@ impl InMemoryLayer {
"freeze: created delta layer for dropped segment {} {}-{}",
self.seg,
self.start_lsn,
drop_lsn
end_lsn_exclusive
);
return Ok(LayersOnDisk {
delta_layers: vec![delta_layer],
@@ -720,14 +604,19 @@ impl InMemoryLayer {
});
}
let end_lsn = self.end_lsn.unwrap();
// Since `end_lsn` is inclusive, subtract 1.
// We want to make an ImageLayer for the last included LSN,
// so the DeltaLayer should exlcude that LSN.
let end_lsn_inclusive = Lsn(end_lsn_exclusive.0 - 1);
let mut before_page_versions = inner.page_versions.ordered_page_version_iter(Some(end_lsn));
let mut page_versions = inner
.page_versions
.ordered_page_version_iter(Some(end_lsn_inclusive));
let mut delta_layers = Vec::new();
if self.start_lsn != end_lsn {
let (before_segsizes, _after_segsizes) = inner.segsizes.split_at(&Lsn(end_lsn.0 + 1));
if self.start_lsn != end_lsn_inclusive {
let (segsizes, _) = inner.segsizes.split_at(&end_lsn_exclusive);
// Write the page versions before the cutoff to disk.
let delta_layer = DeltaLayer::create(
self.conf,
@@ -735,27 +624,32 @@ impl InMemoryLayer {
self.tenantid,
self.seg,
self.start_lsn,
end_lsn,
end_lsn_inclusive,
false,
before_page_versions,
before_segsizes,
page_versions,
segsizes,
)?;
delta_layers.push(delta_layer);
trace!(
"freeze: created delta layer {} {}-{}",
self.seg,
self.start_lsn,
end_lsn
end_lsn_inclusive
);
} else {
assert!(before_page_versions.next().is_none());
assert!(page_versions.next().is_none());
}
drop(inner);
// Write a new base image layer at the cutoff point
let image_layer = ImageLayer::create_from_src(self.conf, timeline, self, end_lsn)?;
trace!("freeze: created image layer {} at {}", self.seg, end_lsn);
let image_layer =
ImageLayer::create_from_src(self.conf, timeline, self, end_lsn_inclusive)?;
trace!(
"freeze: created image layer {} at {}",
self.seg,
end_lsn_inclusive
);
Ok(LayersOnDisk {
delta_layers,

View File

@@ -1,6 +1,6 @@
use std::{collections::HashMap, ops::RangeBounds, slice};
use zenith_utils::{accum::Accum, lsn::Lsn, vec_map::VecMap};
use zenith_utils::{lsn::Lsn, vec_map::VecMap};
use super::storage_layer::PageVersion;
@@ -10,10 +10,6 @@ const EMPTY_SLICE: &[(Lsn, PageVersion)] = &[];
pub struct PageVersions(HashMap<u32, VecMap<Lsn, PageVersion>>);
impl PageVersions {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn append_or_update_last(
&mut self,
blknum: u32,
@@ -44,34 +40,6 @@ impl PageVersions {
.unwrap_or(EMPTY_SLICE)
}
/// Split the page version map into two.
///
/// Left contains everything up to and not including [`cutoff_lsn`].
/// Right contains [`cutoff_lsn`] and everything after.
pub fn split_at(&self, cutoff_lsn: Lsn, after_oldest_lsn: &mut Accum<Lsn>) -> (Self, Self) {
let mut before_blocks = HashMap::new();
let mut after_blocks = HashMap::new();
for (blknum, vec_map) in self.0.iter() {
let (before_versions, after_versions) = vec_map.split_at(&cutoff_lsn);
if !before_versions.is_empty() {
let old = before_blocks.insert(*blknum, before_versions);
assert!(old.is_none());
}
if !after_versions.is_empty() {
let (first_lsn, _first_pv) = &after_versions.as_slice()[0];
after_oldest_lsn.accum(std::cmp::min, *first_lsn);
let old = after_blocks.insert(*blknum, after_versions);
assert!(old.is_none());
}
}
(Self(before_blocks), Self(after_blocks))
}
/// Iterate through [`PageVersion`]s in (block, lsn) order.
/// If a [`cutoff_lsn`] is set, only show versions with `lsn < cutoff_lsn`
pub fn ordered_page_version_iter(&self, cutoff_lsn: Option<Lsn>) -> OrderedPageVersionIter<'_> {