diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 97ca2bfea7..0f97dc7443 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -291,18 +291,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { /// to initialize. /// pub struct PageWriteGuard<'i> { - inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + state: PageWriteGuardState<'i>, +} - _permit: PinnedSlotsPermit, - - // Are the page contents currently valid? - // Used to mark pages as invalid that are assigned but not yet filled with data. - valid: bool, +enum PageWriteGuardState<'i> { + Invalid { + inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + _permit: PinnedSlotsPermit, + }, + Downgraded, } impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.buf + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } @@ -310,25 +315,37 @@ impl std::ops::Deref for PageWriteGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.inner.buf + match &self.state { + PageWriteGuardState::Invalid { inner, _permit } => &inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } } } impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] { - self.inner.buf + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf, + PageWriteGuardState::Downgraded => todo!(), + } } } -impl PageWriteGuard<'_> { +impl<'a> PageWriteGuard<'a> { /// Mark that the buffer contents are now valid. - pub fn mark_valid(&mut self) { - assert!(self.inner.key.is_some()); - assert!( - !self.valid, - "mark_valid called on a buffer that was already valid" - ); - self.valid = true; + #[must_use] + pub fn mark_valid(mut self) -> PageReadGuard<'a> { + let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); + match prev { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + PageReadGuard { + _permit: Arc::new(_permit), + slot_guard: inner.downgrade(), + } + } + PageWriteGuardState::Downgraded => unreachable!(), + } } } @@ -339,11 +356,13 @@ impl Drop for PageWriteGuard<'_> { /// initializing it, remove the mapping from the page cache. /// fn drop(&mut self) { - assert!(self.inner.key.is_some()); - if !self.valid { - let self_key = self.inner.key.as_ref().unwrap(); - PAGE_CACHE.get().unwrap().remove_mapping(self_key); - self.inner.key = None; + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => { + let self_key = inner.key.as_ref().unwrap(); + PAGE_CACHE.get().unwrap().remove_mapping(self_key); + inner.key = None; + } + PageWriteGuardState::Downgraded => {} } } } @@ -356,7 +375,7 @@ pub enum ReadBufResult<'a> { /// lock_for_write() return value pub enum WriteBufResult<'a> { - Found(PageWriteGuard<'a>), + Found(PageReadGuard<'a>), NotFound(PageWriteGuard<'a>), } @@ -447,15 +466,15 @@ impl PageCache { }; match self.lock_for_write(&cache_key).await? { - WriteBufResult::Found(write_guard) => { + WriteBufResult::Found(read_guard) => { // We already had it in cache. Another thread must've put it there // concurrently. Check that it had the same contents that we // replayed. - assert!(*write_guard == img); + assert!(*read_guard == img); } WriteBufResult::NotFound(mut write_guard) => { write_guard.copy_from_slice(img); - write_guard.mark_valid(); + let _ = write_guard.mark_valid(); } } @@ -638,41 +657,31 @@ impl PageCache { ); return Ok(ReadBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, })); } } - /// Look up a page in the cache and lock it in write mode. If it's not - /// found, returns None. - /// - /// When locking a page for writing, the search criteria is always "exact". + // FIXME: the name is wrong. async fn try_lock_for_write( &self, cache_key: &CacheKey, permit: &mut Option, - ) -> Option { + ) -> Option { if let Some(slot_idx) = self.search_mapping_for_write(cache_key) { // The page was found in the mapping. Lock the slot, and re-check // that it's still what we expected (because we don't released the mapping // lock already, another thread could have evicted the page) let slot = &self.slots[slot_idx]; - let inner = slot.inner.write().await; + let inner = slot.inner.read().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - return Some(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: true, + return Some(PageReadGuard { + _permit: inner.coalesce_readers_permit(permit.take().unwrap()), + slot_guard: inner, }); } } @@ -728,9 +737,10 @@ impl PageCache { ); return Ok(WriteBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, })); } } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index d81cf1b8a0..f9fb11562b 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -186,27 +186,22 @@ impl FileBlockReader { ctx: &RequestContext, ) -> Result { let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { - ReadBufResult::Found(guard) => break Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum).await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; - } + match cache + .read_immutable_buf(self.file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { + ReadBufResult::Found(guard) => return Ok(guard.into()), + ReadBufResult::NotFound(mut write_guard) => { + // Read the page from disk into the buffer + self.fill_buffer(write_guard.deref_mut(), blknum).await?; + return Ok(write_guard.mark_valid().into()); + } + }; } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 8785f51c06..16f936f0c7 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -70,38 +70,34 @@ impl EphemeralFile { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, - self.file.path.display(), - e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) - .await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; - } + match cache + .read_immutable_buf(self.page_cache_file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + // order path before error because error is anyhow::Error => might have many contexts + format!( + "ephemeral file: read immutable page #{}: {}: {:#}", + blknum, + self.file.path.display(), + e, + ), + ) + })? { + page_cache::ReadBufResult::Found(guard) => { + return Ok(BlockLease::PageReadGuard(guard)) + } + page_cache::ReadBufResult::NotFound(mut write_guard) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + .await?; + let read_guard = write_guard.mark_valid(); + return Ok(BlockLease::PageReadGuard(read_guard)); + } + }; } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) @@ -171,7 +167,7 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); buf.copy_from_slice(&self.ephemeral_file.mutable_tail); - write_guard.mark_valid(); + let _ = write_guard.mark_valid(); // pre-warm successful } Err(e) => {