From bfba5e3acacb6ce1b93c5df34cc9d558d7a42667 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 6 Oct 2023 14:41:13 +0200 Subject: [PATCH] page_cache: ensure forward progress on miss (#5482) Problem ======= Prior to this PR, when we had a cache miss, we'd get back a write guard, fill it, the drop it and retry the read from cache. If there's severe contention for the cache, it could happen that the just-filled data gets evicted before our retry, resulting in lost work and no forward progress. Solution ======== This PR leverages the now-available `tokio::sync::RwLockWriteGuard`'s `downgrade()` functionality to turn the filled slot write guard into a read guard. We don't drop the guard at any point, so, forward progress is ensured. Refs ==== Stacked atop https://github.com/neondatabase/neon/pull/5480 part of https://github.com/neondatabase/neon/issues/4743 specifically part of https://github.com/neondatabase/neon/issues/5479 --- pageserver/src/page_cache.rs | 80 ++++++++++++++++--------- pageserver/src/tenant/block_io.rs | 35 +++++------ pageserver/src/tenant/ephemeral_file.rs | 58 +++++++++--------- 3 files changed, 93 insertions(+), 80 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 2c858a8934..f6acf64f22 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -287,18 +287,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { /// currently found in the page cache. In that case, the caller of lock_for_read() /// is expected to fill in the page contents and call mark_valid(). pub struct PageWriteGuard<'i> { - inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + state: PageWriteGuardState<'i>, +} - _permit: PinnedSlotsPermit, - - // Are the page contents currently valid? - // Used to mark pages as invalid that are assigned but not yet filled with data. - valid: bool, +enum PageWriteGuardState<'i> { + Invalid { + inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + _permit: PinnedSlotsPermit, + }, + Downgraded, } impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.buf + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } @@ -306,25 +311,37 @@ impl std::ops::Deref for PageWriteGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.inner.buf + match &self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] { - self.inner.buf + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } -impl PageWriteGuard<'_> { +impl<'a> PageWriteGuard<'a> { /// Mark that the buffer contents are now valid. - pub fn mark_valid(&mut self) { - assert!(self.inner.key.is_some()); - assert!( - !self.valid, - "mark_valid called on a buffer that was already valid" - ); - self.valid = true; + #[must_use] + pub fn mark_valid(mut self) -> PageReadGuard<'a> { + let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); + match prev { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + PageReadGuard { + _permit: Arc::new(_permit), + slot_guard: inner.downgrade(), + } + } + PageWriteGuardState::Downgraded => unreachable!(), + } } } @@ -335,11 +352,14 @@ impl Drop for PageWriteGuard<'_> { /// initializing it, remove the mapping from the page cache. /// fn drop(&mut self) { - assert!(self.inner.key.is_some()); - if !self.valid { - let self_key = self.inner.key.as_ref().unwrap(); - PAGE_CACHE.get().unwrap().remove_mapping(self_key); - self.inner.key = None; + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + let self_key = inner.key.as_ref().unwrap(); + PAGE_CACHE.get().unwrap().remove_mapping(self_key); + inner.key = None; + } + PageWriteGuardState::Downgraded => {} } } } @@ -498,12 +518,13 @@ impl PageCache { "we hold a write lock, so, no one else should have a permit" ); let mut write_guard = PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, }; write_guard.copy_from_slice(img); - write_guard.mark_valid(); + let _ = write_guard.mark_valid(); return Ok(()); } } @@ -684,9 +705,10 @@ impl PageCache { ); return Ok(ReadBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, })); } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index d81cf1b8a0..0617017528 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -186,26 +186,21 @@ impl FileBlockReader { ctx: &RequestContext, ) -> Result { let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { - ReadBufResult::Found(guard) => break Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum).await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; + match cache + .read_immutable_buf(self.file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { + ReadBufResult::Found(guard) => Ok(guard.into()), + ReadBufResult::NotFound(mut write_guard) => { + // Read the page from disk into the buffer + self.fill_buffer(write_guard.deref_mut(), blknum).await?; + Ok(write_guard.mark_valid().into()) + } } } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index b99be0c075..5b99a1dd03 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -72,36 +72,32 @@ impl EphemeralFile { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, self.file.path, e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) - .await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; - } + match cache + .read_immutable_buf(self.page_cache_file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + // order path before error because error is anyhow::Error => might have many contexts + format!( + "ephemeral file: read immutable page #{}: {}: {:#}", + blknum, self.file.path, e, + ), + ) + })? { + page_cache::ReadBufResult::Found(guard) => { + return Ok(BlockLease::PageReadGuard(guard)) + } + page_cache::ReadBufResult::NotFound(mut write_guard) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + .await?; + let read_guard = write_guard.mark_valid(); + return Ok(BlockLease::PageReadGuard(read_guard)); + } + }; } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) @@ -171,7 +167,7 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); buf.copy_from_slice(&self.ephemeral_file.mutable_tail); - write_guard.mark_valid(); + let _ = write_guard.mark_valid(); // pre-warm successful } Err(e) => {