diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 01e9eb1944..8010ceab76 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -63,6 +63,8 @@ use storage_layer::{ Layer, PageReconstructData, PageReconstructResult, SegmentTag, RELISH_SEG_SIZE, }; +use self::inmemory_layer::{NonWriteableError, WriteResult}; + static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; 8192]); // Timeout when waiting for WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -741,8 +743,10 @@ impl Timeline for LayeredTimeline { } let seg = SegmentTag::from_blknum(rel, blknum); - let layer = self.get_layer_for_write(seg, rec.lsn)?; - self.increase_current_logical_size(layer.put_wal_record(blknum, rec)? * BLCKSZ as u32); + let delta_size = self.perform_write_op(seg, rec.lsn, |layer| { + layer.put_wal_record(blknum, rec.clone()) + })?; + self.increase_current_logical_size(delta_size * BLCKSZ as u32); Ok(()) } @@ -780,8 +784,7 @@ impl Timeline for LayeredTimeline { rel, segno: remove_segno, }; - let layer = self.get_layer_for_write(seg, lsn)?; - layer.drop_segment(lsn)?; + self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?; } // Truncate the last remaining segment to the specified size @@ -790,8 +793,9 @@ impl Timeline for LayeredTimeline { rel, segno: last_remain_seg, }; - let layer = self.get_layer_for_write(seg, lsn)?; - layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE)?; + self.perform_write_op(seg, lsn, |layer| { + layer.put_truncation(lsn, relsize % RELISH_SEG_SIZE) + })?; } self.decrease_current_logical_size((oldsize - relsize) * BLCKSZ as u32); Ok(()) @@ -814,8 +818,7 @@ impl Timeline for LayeredTimeline { rel, segno: remove_segno, }; - let layer = self.get_layer_for_write(seg, lsn)?; - layer.drop_segment(lsn)?; + self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?; } self.decrease_current_logical_size(oldsize * BLCKSZ as u32); } else { @@ -827,8 +830,7 @@ impl Timeline for LayeredTimeline { } else { // TODO handle TwoPhase relishes let seg = SegmentTag::from_blknum(rel, 0); - let layer = self.get_layer_for_write(seg, lsn)?; - layer.drop_segment(lsn)?; + self.perform_write_op(seg, lsn, |layer| layer.drop_segment(lsn))?; } Ok(()) @@ -845,8 +847,11 @@ impl Timeline for LayeredTimeline { let seg = SegmentTag::from_blknum(rel, blknum); - let layer = self.get_layer_for_write(seg, lsn)?; - self.increase_current_logical_size(layer.put_page_image(blknum, lsn, img)? * BLCKSZ as u32); + let delta_size = self.perform_write_op(seg, lsn, |layer| { + layer.put_page_image(blknum, lsn, img.clone()) + })?; + + self.increase_current_logical_size(delta_size * BLCKSZ as u32); Ok(()) } @@ -1642,6 +1647,37 @@ impl LayeredTimeline { .with_label_values(&[&self.tenantid.to_string(), &self.timelineid.to_string()]) .set(val as i64 - diff as i64) } + + /// If a layer is in the process of being replaced in [`LayerMap`], write + /// operations will fail with [`NonWriteableError`]. This may happen due to + /// a race: the checkpointer thread freezes a layer just after + /// [`Self::get_layer_for_write`] returned it. To handle this error, we try + /// again getting the layer and attempt the write. + fn perform_write_op( + &self, + seg: SegmentTag, + lsn: Lsn, + write_op: impl Fn(&Arc) -> WriteResult, + ) -> anyhow::Result { + let mut layer = self.get_layer_for_write(seg, lsn)?; + loop { + match write_op(&layer) { + Ok(r) => return Ok(r), + Err(NonWriteableError {}) => {} + } + + info!( + "attempted to write to non-writeable layer, retrying {} {}", + seg, lsn + ); + + // layer was non-writeable, try again + let new_layer = self.get_layer_for_write(seg, lsn)?; + // the new layer does not have to be writeable, but it should at least be different + assert!(!Arc::ptr_eq(&layer, &new_layer)); + layer = new_layer; + } + } } /// Dump contents of a layer file to stdout. diff --git a/pageserver/src/layered_repository/inmemory_layer.rs b/pageserver/src/layered_repository/inmemory_layer.rs index 6ab9daed0a..2980a70c76 100644 --- a/pageserver/src/layered_repository/inmemory_layer.rs +++ b/pageserver/src/layered_repository/inmemory_layer.rs @@ -69,13 +69,12 @@ pub struct InMemoryLayerInner { } impl InMemoryLayerInner { - /// Assert that the layer is not frozen - fn assert_writeable(&self) { - // TODO current this can happen when a walreceiver thread's - // `get_layer_for_write` and InMemoryLayer write calls interleave with - // a checkpoint operation, however timing should make this rare. - // Assert only so we can identify when the bug triggers more easily. - assert!(self.writeable); + fn check_writeable(&self) -> WriteResult<()> { + if self.writeable { + Ok(()) + } else { + Err(NonWriteableError) + } } fn get_seg_size(&self, lsn: Lsn) -> u32 { @@ -284,6 +283,13 @@ impl Layer for InMemoryLayer { } } +/// Write failed because the layer is in process of being replaced. +/// See [`LayeredTimeline::perform_write_op`] for how to handle this error. +#[derive(Debug)] +pub struct NonWriteableError; + +pub type WriteResult = std::result::Result; + /// Helper struct to cleanup `InMemoryLayer::freeze` return signature. pub struct FreezeLayers { /// Replacement layer for the layer which freeze was called on. @@ -341,7 +347,7 @@ impl InMemoryLayer { // Write operations /// Remember new page version, as a WAL record over previous version - pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> Result { + pub fn put_wal_record(&self, blknum: u32, rec: WALRecord) -> WriteResult { self.put_page_version( blknum, rec.lsn, @@ -353,7 +359,7 @@ impl InMemoryLayer { } /// Remember new page version, as a full page image - pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> Result { + pub fn put_page_image(&self, blknum: u32, lsn: Lsn, img: Bytes) -> WriteResult { self.put_page_version( blknum, lsn, @@ -366,7 +372,7 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree - pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> Result { + pub fn put_page_version(&self, blknum: u32, lsn: Lsn, pv: PageVersion) -> WriteResult { self.assert_not_frozen(); assert!(self.seg.blknum_in_seg(blknum)); @@ -379,7 +385,7 @@ impl InMemoryLayer { ); let mut inner = self.inner.lock().unwrap(); - inner.assert_writeable(); + inner.check_writeable()?; let old = inner.page_versions.insert((blknum, lsn), pv); @@ -445,11 +451,11 @@ impl InMemoryLayer { } /// Remember that the relation was truncated at given LSN - pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> anyhow::Result<()> { + pub fn put_truncation(&self, lsn: Lsn, segsize: u32) -> WriteResult<()> { self.assert_not_frozen(); let mut inner = self.inner.lock().unwrap(); - inner.assert_writeable(); + inner.check_writeable()?; // check that this we truncate to a smaller size than segment was before the truncation let oldsize = inner.get_seg_size(lsn); @@ -466,12 +472,12 @@ impl InMemoryLayer { } /// Remember that the segment was dropped at given LSN - pub fn drop_segment(&self, lsn: Lsn) -> anyhow::Result<()> { + pub fn drop_segment(&self, lsn: Lsn) -> WriteResult<()> { self.assert_not_frozen(); let mut inner = self.inner.lock().unwrap(); - inner.assert_writeable(); + inner.check_writeable()?; assert!(inner.drop_lsn.is_none()); inner.drop_lsn = Some(lsn); @@ -537,8 +543,10 @@ impl InMemoryLayer { self.seg, self.timelineid, cutoff_lsn ); + self.assert_not_frozen(); + let mut inner = self.inner.lock().unwrap(); - inner.assert_writeable(); + assert!(inner.writeable); inner.writeable = false; // Normally, use the cutoff LSN as the end of the frozen layer.