From aa8363f0332f45b3b7186c17810562c4cb460fbc Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 4 Oct 2023 17:55:21 +0000 Subject: [PATCH] Revert "page_cache: find_victim: don't spin while there's no chance for a slot (#5319)" To repro the original condition. This reverts commit c07eef8ea5fd2aceb93e3aba1150e979c3810ea8. --- pageserver/src/metrics.rs | 40 -------- pageserver/src/page_cache.rs | 194 +++++------------------------------ 2 files changed, 23 insertions(+), 211 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index f85f525630..de94eb8152 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -264,46 +264,6 @@ pub static PAGE_CACHE_SIZE: Lazy = Lazy::new(|| PageCacheS }, }); -pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy = Lazy::new(|| { - register_histogram!( - "pageserver_page_cache_acquire_pinned_slot_seconds", - "Time spent acquiring a pinned slot in the page cache", - CRITICAL_OP_BUCKETS.into(), - ) - .expect("failed to define a metric") -}); - -pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy = Lazy::new(|| { - register_int_counter!( - "pageserver_page_cache_find_victim_iters_total", - "Counter for the number of iterations in the find_victim loop", - ) - .expect("failed to define a metric") -}); - -static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "page_cache_errors_total", - "Number of timeouts while acquiring a pinned slot in the page cache", - &["error_kind"] - ) - .expect("failed to define a metric") -}); - -#[derive(IntoStaticStr)] -#[strum(serialize_all = "kebab_case")] -pub(crate) enum PageCacheErrorKind { - AcquirePinnedSlotTimeout, - EvictIterLimit, -} - -pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { - PAGE_CACHE_ERRORS - .get_metric_with_label_values(&[error_kind.into()]) - .unwrap() - .inc(); -} - pub(crate) static WAIT_LSN_TIME: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wait_lsn_seconds", diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 97ca2bfea7..38b169ea85 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -75,11 +75,7 @@ use std::{ collections::{hash_map::Entry, HashMap}, convert::TryInto, - sync::{ - atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - Arc, Weak, - }, - time::Duration, + sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, }; use anyhow::Context; @@ -169,8 +165,6 @@ struct Slot { struct SlotInner { key: Option, - // for `coalesce_readers_permit` - permit: std::sync::Mutex>, buf: &'static mut [u8; PAGE_SZ], } @@ -213,22 +207,6 @@ impl Slot { } } -impl SlotInner { - /// If there is aready a reader, drop our permit and share its permit, just like we share read access. - fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc { - let mut guard = self.permit.lock().unwrap(); - if let Some(existing_permit) = guard.upgrade() { - drop(guard); - drop(permit); - existing_permit - } else { - let permit = Arc::new(permit); - *guard = Arc::downgrade(&permit); - permit - } - } -} - pub struct PageCache { /// This contains the mapping from the cache key to buffer slot that currently /// contains the page, if any. @@ -246,8 +224,6 @@ pub struct PageCache { /// The actual buffers with their metadata. slots: Box<[Slot]>, - pinned_slots: Arc, - /// Index of the next candidate to evict, for the Clock replacement algorithm. /// This is interpreted modulo the page cache size. next_evict_slot: AtomicUsize, @@ -255,28 +231,23 @@ pub struct PageCache { size_metrics: &'static PageCacheSizeMetrics, } -struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); - /// /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked /// until the guard is dropped. /// -pub struct PageReadGuard<'i> { - _permit: Arc, - slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>, -} +pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>); impl std::ops::Deref for PageReadGuard<'_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { - self.slot_guard.buf + self.0.buf } } impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.slot_guard.buf + self.0.buf } } @@ -293,8 +264,6 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { pub struct PageWriteGuard<'i> { inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, - _permit: PinnedSlotsPermit, - // Are the page contents currently valid? // Used to mark pages as invalid that are assigned but not yet filled with data. valid: bool, @@ -379,10 +348,6 @@ impl PageCache { lsn: Lsn, ctx: &RequestContext, ) -> Option<(Lsn, PageReadGuard)> { - let Ok(permit) = self.try_get_pinned_slot_permit().await else { - return None; - }; - crate::metrics::PAGE_CACHE .for_ctx(ctx) .read_accesses_materialized_page @@ -397,10 +362,7 @@ impl PageCache { lsn, }; - if let Some(guard) = self - .try_lock_for_read(&mut cache_key, &mut Some(permit)) - .await - { + if let Some(guard) = self.try_lock_for_read(&mut cache_key).await { if let CacheKey::MaterializedPage { hash_key: _, lsn: available_lsn, @@ -483,29 +445,6 @@ impl PageCache { // "mappings" after this section. But the routines in this section should // not require changes. - async fn try_get_pinned_slot_permit(&self) -> anyhow::Result { - let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer(); - match tokio::time::timeout( - // Choose small timeout, neon_smgr does its own retries. - // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869 - Duration::from_secs(10), - Arc::clone(&self.pinned_slots).acquire_owned(), - ) - .await - { - Ok(res) => Ok(PinnedSlotsPermit( - res.expect("this semaphore is never closed"), - )), - Err(_timeout) => { - timer.stop_and_discard(); - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout, - ); - anyhow::bail!("timeout: there were page guards alive for all page cache slots") - } - } - } - /// Look up a page in the cache. /// /// If the search criteria is not exact, *cache_key is updated with the key @@ -515,11 +454,7 @@ impl PageCache { /// /// If no page is found, returns None and *cache_key is left unmodified. /// - async fn try_lock_for_read( - &self, - cache_key: &mut CacheKey, - permit: &mut Option, - ) -> Option { + async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { let cache_key_orig = cache_key.clone(); if let Some(slot_idx) = self.search_mapping(cache_key) { // The page was found in the mapping. Lock the slot, and re-check @@ -529,10 +464,7 @@ impl PageCache { let inner = slot.inner.read().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); - return Some(PageReadGuard { - _permit: inner.coalesce_readers_permit(permit.take().unwrap()), - slot_guard: inner, - }); + return Some(PageReadGuard(inner)); } else { // search_mapping might have modified the search key; restore it. *cache_key = cache_key_orig; @@ -575,8 +507,6 @@ impl PageCache { cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { - let mut permit = Some(self.try_get_pinned_slot_permit().await?); - let (read_access, hit) = match cache_key { CacheKey::MaterializedPage { .. } => { unreachable!("Materialized pages use lookup_materialized_page") @@ -593,21 +523,17 @@ impl PageCache { let mut is_first_iteration = true; loop { // First check if the key already exists in the cache. - if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await { - debug_assert!(permit.is_none()); + if let Some(read_guard) = self.try_lock_for_read(cache_key).await { if is_first_iteration { hit.inc(); } return Ok(ReadBufResult::Found(read_guard)); } - debug_assert!(permit.is_some()); is_first_iteration = false; // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim(permit.as_ref().unwrap()) - .await - .context("Failed to find evict victim")?; + let (slot_idx, mut inner) = + self.find_victim().context("Failed to find evict victim")?; // Insert mapping for this. At this point, we may find that another // thread did the same thing concurrently. In that case, we evicted @@ -629,16 +555,7 @@ impl PageCache { inner.key = Some(cache_key.clone()); slot.set_usage_count(1); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - return Ok(ReadBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), inner, valid: false, })); @@ -649,11 +566,7 @@ impl PageCache { /// found, returns None. /// /// When locking a page for writing, the search criteria is always "exact". - async fn try_lock_for_write( - &self, - cache_key: &CacheKey, - permit: &mut Option, - ) -> Option { + async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option { if let Some(slot_idx) = self.search_mapping_for_write(cache_key) { // The page was found in the mapping. Lock the slot, and re-check // that it's still what we expected (because we don't released the mapping @@ -662,18 +575,7 @@ impl PageCache { let inner = slot.inner.write().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - return Some(PageWriteGuard { - _permit: permit.take().unwrap(), - inner, - valid: true, - }); + return Some(PageWriteGuard { inner, valid: true }); } } None @@ -684,20 +586,15 @@ impl PageCache { /// Similar to lock_for_read(), but the returned buffer is write-locked and /// may be modified by the caller even if it's already found in the cache. async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { - let mut permit = Some(self.try_get_pinned_slot_permit().await?); loop { // First check if the key already exists in the cache. - if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await { - debug_assert!(permit.is_none()); + if let Some(write_guard) = self.try_lock_for_write(cache_key).await { return Ok(WriteBufResult::Found(write_guard)); } - debug_assert!(permit.is_some()); // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim(permit.as_ref().unwrap()) - .await - .context("Failed to find evict victim")?; + let (slot_idx, mut inner) = + self.find_victim().context("Failed to find evict victim")?; // Insert mapping for this. At this point, we may find that another // thread did the same thing concurrently. In that case, we evicted @@ -719,16 +616,7 @@ impl PageCache { inner.key = Some(cache_key.clone()); slot.set_usage_count(1); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - return Ok(WriteBufResult::NotFound(PageWriteGuard { - _permit: permit.take().unwrap(), inner, valid: false, })); @@ -881,10 +769,7 @@ impl PageCache { /// Find a slot to evict. /// /// On return, the slot is empty and write-locked. - async fn find_victim( - &self, - _permit_witness: &PinnedSlotsPermit, - ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { + fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { let iter_limit = self.slots.len() * 10; let mut iters = 0; loop { @@ -897,40 +782,13 @@ impl PageCache { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, Err(_err) => { + // If we have looped through the whole buffer pool 10 times + // and still haven't found a victim buffer, something's wrong. + // Maybe all the buffers were in locked. That could happen in + // theory, if you have more threads holding buffers locked than + // there are buffers in the pool. In practice, with a reasonably + // large buffer pool it really shouldn't happen. if iters > iter_limit { - // NB: Even with the permits, there's no hard guarantee that we will find a slot with - // any particular number of iterations: other threads might race ahead and acquire and - // release pins just as we're scanning the array. - // - // Imagine that nslots is 2, and as starting point, usage_count==1 on all - // slots. There are two threads running concurrently, A and B. A has just - // acquired the permit from the semaphore. - // - // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search - // B: Acquire permit. - // B: Look at slot 2, decrement its usage_count to zero and continue the search - // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // B: Acquire permit. - // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // - // Now we're back in the starting situation that both slots have - // usage_count 1, but A has now been through one iteration of the - // find_victim() loop. This can repeat indefinitely and on each - // iteration, A's iteration count increases by one. - // - // So, even though the semaphore for the permits is fair, the victim search - // itself happens in parallel and is not fair. - // Hence even with a permit, a task can theoretically be starved. - // To avoid this, we'd need tokio to give priority to tasks that are holding - // permits for longer. - // Note that just yielding to tokio during iteration without such - // priority boosting is likely counter-productive. We'd just give more opportunities - // for B to bump usage count, further starving A. - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::EvictIterLimit, - ); anyhow::bail!("exceeded evict iter limit"); } continue; @@ -941,7 +799,6 @@ impl PageCache { self.remove_mapping(old_key); inner.key = None; } - crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64); return Ok((slot_idx, inner)); } } @@ -969,11 +826,7 @@ impl PageCache { let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); Slot { - inner: tokio::sync::RwLock::new(SlotInner { - key: None, - buf, - permit: std::sync::Mutex::new(Weak::new()), - }), + inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }), usage_count: AtomicU8::new(0), } }) @@ -985,7 +838,6 @@ impl PageCache { slots, next_evict_slot: AtomicUsize::new(0), size_metrics, - pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), } } }