diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 97ca2bfea7..2c858a8934 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -66,8 +66,7 @@ //! inserted to the mapping, but you must hold the write-lock on the slot until //! the contents are valid. If you need to release the lock without initializing //! the contents, you must remove the mapping first. We make that easy for the -//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized -//! page, the caller must explicitly call guard.mark_valid() after it has +//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has //! initialized it. If the guard is dropped without calling mark_valid(), the //! mapping is automatically removed and the slot is marked free. //! @@ -286,10 +285,7 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { /// /// Counterintuitively, this is used even for a read, if the requested page is not /// currently found in the page cache. In that case, the caller of lock_for_read() -/// is expected to fill in the page contents and call mark_valid(). Similarly -/// lock_for_write() can return an invalid buffer that the caller is expected to -/// to initialize. -/// +/// is expected to fill in the page contents and call mark_valid(). pub struct PageWriteGuard<'i> { inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, @@ -354,12 +350,6 @@ pub enum ReadBufResult<'a> { NotFound(PageWriteGuard<'a>), } -/// lock_for_write() return value -pub enum WriteBufResult<'a> { - Found(PageWriteGuard<'a>), - NotFound(PageWriteGuard<'a>), -} - impl PageCache { // // Section 1.1: Public interface functions for looking up and memorizing materialized page @@ -446,20 +436,76 @@ impl PageCache { lsn, }; - match self.lock_for_write(&cache_key).await? { - WriteBufResult::Found(write_guard) => { - // We already had it in cache. Another thread must've put it there - // concurrently. Check that it had the same contents that we - // replayed. - assert!(*write_guard == img); + let mut permit = Some(self.try_get_pinned_slot_permit().await?); + loop { + // First check if the key already exists in the cache. + if let Some(slot_idx) = self.search_mapping_exact(&cache_key) { + // The page was found in the mapping. Lock the slot, and re-check + // that it's still what we expected (because we don't released the mapping + // lock already, another thread could have evicted the page) + let slot = &self.slots[slot_idx]; + let inner = slot.inner.write().await; + if inner.key.as_ref() == Some(&cache_key) { + slot.inc_usage_count(); + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + debug_assert_eq!(inner.buf.len(), img.len()); + // We already had it in cache. Another thread must've put it there + // concurrently. Check that it had the same contents that we + // replayed. + assert!(inner.buf == img); + return Ok(()); + } } - WriteBufResult::NotFound(mut write_guard) => { - write_guard.copy_from_slice(img); - write_guard.mark_valid(); - } - } + debug_assert!(permit.is_some()); - Ok(()) + // Not found. Find a victim buffer + let (slot_idx, mut inner) = self + .find_victim(permit.as_ref().unwrap()) + .await + .context("Failed to find evict victim")?; + + // Insert mapping for this. At this point, we may find that another + // thread did the same thing concurrently. In that case, we evicted + // our victim buffer unnecessarily. Put it into the free list and + // continue with the slot that the other thread chose. + if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) { + // TODO: put to free list + + // We now just loop back to start from beginning. This is not + // optimal, we'll perform the lookup in the mapping again, which + // is not really necessary because we already got + // 'existing_slot_idx'. But this shouldn't happen often enough + // to matter much. + continue; + } + + // Make the slot ready + let slot = &self.slots[slot_idx]; + inner.key = Some(cache_key.clone()); + slot.set_usage_count(1); + // Create a write guard for the slot so we go through the expected motions. + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + let mut write_guard = PageWriteGuard { + _permit: permit.take().unwrap(), + inner, + valid: false, + }; + write_guard.copy_from_slice(img); + write_guard.mark_valid(); + return Ok(()); + } } // Section 1.2: Public interface functions for working with immutable file pages. @@ -645,96 +691,6 @@ impl PageCache { } } - /// Look up a page in the cache and lock it in write mode. If it's not - /// found, returns None. - /// - /// When locking a page for writing, the search criteria is always "exact". - async fn try_lock_for_write( - &self, - cache_key: &CacheKey, - permit: &mut Option, - ) -> Option { - if let Some(slot_idx) = self.search_mapping_for_write(cache_key) { - // The page was found in the mapping. Lock the slot, and re-check - // that it's still what we expected (because we don't released the mapping - // lock already, another thread could have evicted the page) - let slot = &self.slots[slot_idx]; - let inner = slot.inner.write().await; - if inner.key.as_ref() == Some(cache_key) { - slot.inc_usage_count(); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - return Some(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: true, - }); - } - } - None - } - - /// Return a write-locked buffer for given block. - /// - /// Similar to lock_for_read(), but the returned buffer is write-locked and - /// may be modified by the caller even if it's already found in the cache. - async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { - let mut permit = Some(self.try_get_pinned_slot_permit().await?); - loop { - // First check if the key already exists in the cache. - if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await { - debug_assert!(permit.is_none()); - return Ok(WriteBufResult::Found(write_guard)); - } - debug_assert!(permit.is_some()); - - // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim(permit.as_ref().unwrap()) - .await - .context("Failed to find evict victim")?; - - // Insert mapping for this. At this point, we may find that another - // thread did the same thing concurrently. In that case, we evicted - // our victim buffer unnecessarily. Put it into the free list and - // continue with the slot that the other thread chose. - if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) { - // TODO: put to free list - - // We now just loop back to start from beginning. This is not - // optimal, we'll perform the lookup in the mapping again, which - // is not really necessary because we already got - // 'existing_slot_idx'. But this shouldn't happen often enough - // to matter much. - continue; - } - - // Make the slot ready - let slot = &self.slots[slot_idx]; - inner.key = Some(cache_key.clone()); - slot.set_usage_count(1); - - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - - return Ok(WriteBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: false, - })); - } - } - // // Section 3: Mapping functions // @@ -775,7 +731,7 @@ impl PageCache { /// /// Like 'search_mapping, but performs an "exact" search. Used for /// allocating a new buffer. - fn search_mapping_for_write(&self, key: &CacheKey) -> Option { + fn search_mapping_exact(&self, key: &CacheKey) -> Option { match key { CacheKey::MaterializedPage { hash_key, lsn } => { let map = self.materialized_page_map.read().unwrap();