From 28a4247c973bb4e45922a7fbc63e1055952158a9 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 29 Jan 2024 19:37:11 +0000 Subject: [PATCH] rip out slot pinning, has about 5% speedup --- pageserver/src/metrics.rs | 1 - pageserver/src/page_cache.rs | 94 ++++-------------------------------- 2 files changed, 10 insertions(+), 85 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 03ac6aa5cd..9261019932 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -334,7 +334,6 @@ static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { #[derive(IntoStaticStr)] #[strum(serialize_all = "kebab_case")] pub(crate) enum PageCacheErrorKind { - AcquirePinnedSlotTimeout, EvictIterLimit, } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 708f2b02df..6feec2a798 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -74,11 +74,7 @@ use std::{ collections::{hash_map::Entry, HashMap}, convert::TryInto, - sync::{ - atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - Arc, Weak, - }, - time::Duration, + sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, }; use anyhow::Context; @@ -164,7 +160,6 @@ struct Slot { struct SlotInner { key: Option, // for `coalesce_readers_permit` - permit: std::sync::Mutex>, buf: &'static mut SlotContents, } @@ -217,30 +212,12 @@ impl Slot { } } -impl SlotInner { - /// If there is aready a reader, drop our permit and share its permit, just like we share read access. - fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc { - let mut guard = self.permit.lock().unwrap(); - if let Some(existing_permit) = guard.upgrade() { - drop(guard); - drop(permit); - existing_permit - } else { - let permit = Arc::new(permit); - *guard = Arc::downgrade(&permit); - permit - } - } -} - pub struct PageCache { immutable_page_map: std::sync::RwLock>, /// The actual buffers with their metadata. slots: Box<[Slot]>, - pinned_slots: Arc, - /// Index of the next candidate to evict, for the Clock replacement algorithm. /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, @@ -248,14 +225,11 @@ pub struct PageCache { size_metrics: &'static PageCacheSizeMetrics, } -struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); - /// /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked /// until the guard is dropped. /// pub struct PageReadGuard<'i> { - _permit: Arc, slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>, } @@ -287,7 +261,6 @@ pub struct PageWriteGuard<'i> { enum PageWriteGuardState<'i> { Invalid { inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, - _permit: PinnedSlotsPermit, }, Downgraded, } @@ -295,7 +268,7 @@ enum PageWriteGuardState<'i> { impl std::ops::DerefMut for PageWriteGuard<'_> { fn deref_mut(&mut self) -> &mut Self::Target { match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf.0, + PageWriteGuardState::Invalid { inner } => &mut inner.buf.0, PageWriteGuardState::Downgraded => unreachable!(), } } @@ -306,7 +279,7 @@ impl std::ops::Deref for PageWriteGuard<'_> { fn deref(&self) -> &Self::Target { match &self.state { - PageWriteGuardState::Invalid { inner, _permit } => &inner.buf.0, + PageWriteGuardState::Invalid { inner } => &inner.buf.0, PageWriteGuardState::Downgraded => unreachable!(), } } @@ -318,10 +291,9 @@ impl<'a> PageWriteGuard<'a> { pub fn mark_valid(mut self) -> PageReadGuard<'a> { let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); match prev { - PageWriteGuardState::Invalid { inner, _permit } => { + PageWriteGuardState::Invalid { inner } => { assert!(inner.key.is_some()); PageReadGuard { - _permit: Arc::new(_permit), slot_guard: inner.downgrade(), } } @@ -338,7 +310,7 @@ impl Drop for PageWriteGuard<'_> { /// fn drop(&mut self) { match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => { + PageWriteGuardState::Invalid { inner } => { assert!(inner.key.is_some()); let self_key = inner.key.as_ref().unwrap(); PAGE_CACHE.get().unwrap().remove_mapping(self_key); @@ -375,27 +347,6 @@ impl PageCache { // "mappings" after this section. But the routines in this section should // not require changes. - async fn try_get_pinned_slot_permit(&self) -> anyhow::Result { - match tokio::time::timeout( - // Choose small timeout, neon_smgr does its own retries. - // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869 - Duration::from_secs(10), - Arc::clone(&self.pinned_slots).acquire_owned(), - ) - .await - { - Ok(res) => Ok(PinnedSlotsPermit( - res.expect("this semaphore is never closed"), - )), - Err(_timeout) => { - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout, - ); - anyhow::bail!("timeout: there were page guards alive for all page cache slots") - } - } - } - /// Look up a page in the cache. /// /// If the search criteria is not exact, *cache_key is updated with the key @@ -405,11 +356,7 @@ impl PageCache { /// /// If no page is found, returns None and *cache_key is left unmodified. /// - async fn try_lock_for_read( - &self, - cache_key: &mut CacheKey, - permit: &mut Option, - ) -> Option { + async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { let cache_key_orig = cache_key.clone(); if let Some(slot_idx) = self.search_mapping(cache_key) { // The page was found in the mapping. Lock the slot, and re-check @@ -419,10 +366,7 @@ impl PageCache { let inner = slot.inner.read().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); - return Some(PageReadGuard { - _permit: inner.coalesce_readers_permit(permit.take().unwrap()), - slot_guard: inner, - }); + return Some(PageReadGuard { slot_guard: inner }); } else { // search_mapping might have modified the search key; restore it. *cache_key = cache_key_orig; @@ -465,8 +409,6 @@ impl PageCache { cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { - let mut permit = Some(self.try_get_pinned_slot_permit().await?); - let (read_access, hit) = match cache_key { CacheKey::ImmutableFilePage { .. } => ( &crate::metrics::PAGE_CACHE @@ -480,19 +422,17 @@ impl PageCache { let mut is_first_iteration = true; loop { // First check if the key already exists in the cache. - if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await { - debug_assert!(permit.is_none()); + if let Some(read_guard) = self.try_lock_for_read(cache_key).await { if is_first_iteration { hit.inc(); } return Ok(ReadBufResult::Found(read_guard)); } - debug_assert!(permit.is_some()); is_first_iteration = false; // Not found. Find a victim buffer let (slot_idx, mut inner) = self - .find_victim(permit.as_ref().unwrap()) + .find_victim() .await .context("Failed to find evict victim")?; @@ -516,19 +456,8 @@ impl PageCache { inner.key = Some(cache_key.clone()); slot.set_usage_count(1); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - return Ok(ReadBufResult::NotFound(PageWriteGuard { - state: PageWriteGuardState::Invalid { - _permit: permit.take().unwrap(), - inner, - }, + state: PageWriteGuardState::Invalid { inner }, })); } } @@ -600,7 +529,6 @@ impl PageCache { /// On return, the slot is empty and write-locked. async fn find_victim( &self, - _permit_witness: &PinnedSlotsPermit, ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { let iter_limit = self.slots.len() * 10; let mut iters = 0; @@ -695,7 +623,6 @@ impl PageCache { inner: tokio::sync::RwLock::new(SlotInner { key: None, buf: slot_contents, - permit: std::sync::Mutex::new(Weak::new()), }), usage_count: AtomicU8::new(0), }) @@ -706,7 +633,6 @@ impl PageCache { slots, next_evict_slot: AtomicUsize::new(0), size_metrics, - pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), } } }