From dc96a7604abf31599eedd19255c07c6a48fb11f4 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 2 Oct 2023 17:39:35 +0000 Subject: [PATCH] page_cache: ensure forward progress on cache miss --- pageserver/src/page_cache.rs | 80 ++++++++++++++++--------- pageserver/src/tenant/block_io.rs | 35 +++++------ pageserver/src/tenant/ephemeral_file.rs | 58 +++++++++--------- 3 files changed, 93 insertions(+), 80 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 2c858a8934..d7ac1ec336 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -287,18 +287,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { /// currently found in the page cache. In that case, the caller of lock_for_read() /// is expected to fill in the page contents and call mark_valid(). pub struct PageWriteGuard<'i> { - inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + state: PageWriteGuardState<'i>, +} - _permit: PinnedSlotsPermit, - - // Are the page contents currently valid? - // Used to mark pages as invalid that are assigned but not yet filled with data. - valid: bool, +enum PageWriteGuardState<'i> { + Invalid { + inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + _permit: PinnedSlotsPermit, + }, + Downgraded, } impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.buf + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } @@ -306,25 +311,37 @@ impl std::ops::Deref for PageWriteGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.inner.buf + match &self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] { - self.inner.buf + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => todo!(), + } } } -impl PageWriteGuard<'_> { +impl<'a> PageWriteGuard<'a> { /// Mark that the buffer contents are now valid. - pub fn mark_valid(&mut self) { - assert!(self.inner.key.is_some()); - assert!( - !self.valid, - "mark_valid called on a buffer that was already valid" - ); - self.valid = true; + #[must_use] + pub fn mark_valid(mut self) -> PageReadGuard<'a> { + let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); + match prev { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + PageReadGuard { + _permit: Arc::new(_permit), + slot_guard: inner.downgrade(), + } + } + PageWriteGuardState::Downgraded => unreachable!(), + } } } @@ -335,11 +352,14 @@ impl Drop for PageWriteGuard<'_> { /// initializing it, remove the mapping from the page cache. /// fn drop(&mut self) { - assert!(self.inner.key.is_some()); - if !self.valid { - let self_key = self.inner.key.as_ref().unwrap(); - PAGE_CACHE.get().unwrap().remove_mapping(self_key); - self.inner.key = None; + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + let self_key = inner.key.as_ref().unwrap(); + PAGE_CACHE.get().unwrap().remove_mapping(self_key); + inner.key = None; + } + PageWriteGuardState::Downgraded => {} } } } @@ -498,12 +518,13 @@ impl PageCache { "we hold a write lock, so, no one else should have a permit" ); let mut write_guard = PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, }; write_guard.copy_from_slice(img); - write_guard.mark_valid(); + let _ = write_guard.mark_valid(); return Ok(()); } } @@ -684,9 +705,10 @@ impl PageCache { ); return Ok(ReadBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, })); } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index d81cf1b8a0..0617017528 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -186,26 +186,21 @@ impl FileBlockReader { ctx: &RequestContext, ) -> Result { let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { - ReadBufResult::Found(guard) => break Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum).await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; + match cache + .read_immutable_buf(self.file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { + ReadBufResult::Found(guard) => Ok(guard.into()), + ReadBufResult::NotFound(mut write_guard) => { + // Read the page from disk into the buffer + self.fill_buffer(write_guard.deref_mut(), blknum).await?; + Ok(write_guard.mark_valid().into()) + } } } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index b99be0c075..5b99a1dd03 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -72,36 +72,32 @@ impl EphemeralFile { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, self.file.path, e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) - .await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; - } + match cache + .read_immutable_buf(self.page_cache_file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + // order path before error because error is anyhow::Error => might have many contexts + format!( + "ephemeral file: read immutable page #{}: {}: {:#}", + blknum, self.file.path, e, + ), + ) + })? { + page_cache::ReadBufResult::Found(guard) => { + return Ok(BlockLease::PageReadGuard(guard)) + } + page_cache::ReadBufResult::NotFound(mut write_guard) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + .await?; + let read_guard = write_guard.mark_valid(); + return Ok(BlockLease::PageReadGuard(read_guard)); + } + }; } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) @@ -171,7 +167,7 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); buf.copy_from_slice(&self.ephemeral_file.mutable_tail); - write_guard.mark_valid(); + let _ = write_guard.mark_valid(); // pre-warm successful } Err(e) => {