Prevent frozen InMemoryLayer races

Instead of panicking when a race happens, retry the operation after
getting a new layer.
This commit is contained in:
Patrick Insinger
2021-09-15 14:09:21 -07:00
committed by Patrick Insinger
parent a5bd306db9
commit 25b7d424ab
2 changed files with 72 additions and 28 deletions

View File

@@ -63,6 +63,8 @@ use storage_layer::{
Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE,
};
use self::inmemory_layer::{NonWriteableError, WriteResult};
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]);
// Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call.
@@ -741,8 +743,10 @@ impl Timeline for LayeredTimeline {
}
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, rec.lsn)?;
self.increase_current_logical_size(layer.put_wal_record(blknum, rec)? * BLCKSZ as u32);
let delta_size = self.perform_write_op(seg, rec.lsn, |layer| {
layer.put_wal_record(blknum, rec.clone())
})?;
self.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
}
@@ -780,8 +784,7 @@ impl Timeline for LayeredTimeline {
rel,
segno: remove_segno,
};
let layer = self.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn)?;
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
}
// Truncate the last remaining segment to the specified size
@@ -790,8 +793,9 @@ impl Timeline for LayeredTimeline {
rel,
segno: last_remain_seg,
};
let layer = self.get_layer_for_write(seg, lsn)?;
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)?;
self.perform_write_op(seg, lsn, |layer| {
layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)
})?;
}
self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32);
Ok(())
@@ -814,8 +818,7 @@ impl Timeline for LayeredTimeline {
rel,
segno: remove_segno,
};
let layer = self.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn)?;
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
}
self.decrease_current_logical_size(oldsize * BLCKSZ as u32);
} else {
@@ -827,8 +830,7 @@ impl Timeline for LayeredTimeline {
} else {
// TODO handle TwoPhase relishes
let seg = SegmentTag::from_blknum(rel, 0);
let layer = self.get_layer_for_write(seg, lsn)?;
layer.drop_segment(lsn)?;
self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?;
}
Ok(())
@@ -845,8 +847,11 @@ impl Timeline for LayeredTimeline {
let seg = SegmentTag::from_blknum(rel, blknum);
let layer = self.get_layer_for_write(seg, lsn)?;
self.increase_current_logical_size(layer.put_page_image(blknum, lsn, img)? * BLCKSZ as u32);
let delta_size = self.perform_write_op(seg, lsn, |layer| {
layer.put_page_image(blknum, lsn, img.clone())
})?;
self.increase_current_logical_size(delta_size * BLCKSZ as u32);
Ok(())
}
@@ -1642,6 +1647,37 @@ impl LayeredTimeline {
.with_label_values(&[&self.tenantid.to_string(), &self.timelineid.to_string()])
.set(val as i64 - diff as i64)
}
/// If a layer is in the process of being replaced in [`LayerMap`], write
/// operations will fail with [`NonWriteableError`]. This may happen due to
/// a race: the checkpointer thread freezes a layer just after
/// [`Self::get_layer_for_write`] returned it. To handle this error, we try
/// again getting the layer and attempt the write.
fn perform_write_op<R>(
&self,
seg: SegmentTag,
lsn: Lsn,
write_op: impl Fn(&Arc<InMemoryLayer>) -> WriteResult<R>,
) -> anyhow::Result<R> {
let mut layer = self.get_layer_for_write(seg, lsn)?;
loop {
match write_op(&layer) {
Ok(r) => return Ok(r),
Err(NonWriteableError {}) => {}
}
info!(
"attempted to write to non-writeable layer, retrying {} {}",
seg, lsn
);
// layer was non-writeable, try again
let new_layer = self.get_layer_for_write(seg, lsn)?;
// the new layer does not have to be writeable, but it should at least be different
assert!(!Arc::ptr_eq(&layer, &new_layer));
layer = new_layer;
}
}
}
/// Dump contents of a layer file to stdout.

View File

@@ -69,13 +69,12 @@ pub struct InMemoryLayerInner {
}
impl InMemoryLayerInner {
/// Assert that the layer is not frozen
fn assert_writeable(&self) {
// TODO current this can happen when a walreceiver thread's
// `get_layer_for_write` and InMemoryLayer write calls interleave with
// a checkpoint operation, however timing should make this rare.
// Assert only so we can identify when the bug triggers more easily.
assert!(self.writeable);
fn check_writeable(&self) -> WriteResult<()> {
if self.writeable {
Ok(())
} else {
Err(NonWriteableError)
}
}
fn get_seg_size(&self, lsn: Lsn) -> u32 {
@@ -284,6 +283,13 @@ impl Layer for InMemoryLayer {
}
}
/// Write failed because the layer is in process of being replaced.
/// See [`LayeredTimeline::perform_write_op`] for how to handle this error.
#[derive(Debug)]
pub struct NonWriteableError;
pub type WriteResult<T> = std::result::Result<T, NonWriteableError>;
/// Helper struct to cleanup `InMemoryLayer::freeze` return signature.
pub struct FreezeLayers {
/// Replacement layer for the layer which freeze was called on.
@@ -341,7 +347,7 @@ impl InMemoryLayer {
// Write operations
/// Remember new page version, as a WAL record over previous version
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result<u32> {
pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> WriteResult<u32> {
self.put_page_version(
blknum,
rec.lsn,
@@ -353,7 +359,7 @@ impl InMemoryLayer {
}
/// Remember new page version, as a full page image
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result<u32> {
pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> WriteResult<u32> {
self.put_page_version(
blknum,
lsn,
@@ -366,7 +372,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result<u32> {
pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult<u32> {
self.assert_not_frozen();
assert!(self.seg.blknum_in_seg(blknum));
@@ -379,7 +385,7 @@ impl InMemoryLayer {
);
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
inner.check_writeable()?;
let old = inner.page_versions.insert((blknum, lsn), pv);
@@ -445,11 +451,11 @@ impl InMemoryLayer {
}
/// Remember that the relation was truncated at given LSN
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> {
pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
inner.check_writeable()?;
// check that this we truncate to a smaller size than segment was before the truncation
let oldsize = inner.get_seg_size(lsn);
@@ -466,12 +472,12 @@ impl InMemoryLayer {
}
/// Remember that the segment was dropped at given LSN
pub fn drop_segment(&self, lsn: Lsn) -> anyhow::Result<()> {
pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> {
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
inner.check_writeable()?;
assert!(inner.drop_lsn.is_none());
inner.drop_lsn = Some(lsn);
@@ -537,8 +543,10 @@ impl InMemoryLayer {
self.seg, self.timelineid, cutoff_lsn
);
self.assert_not_frozen();
let mut inner = self.inner.lock().unwrap();
inner.assert_writeable();
assert!(inner.writeable);
inner.writeable = false;
// Normally, use the cutoff LSN as the end of the frozen layer.