diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index ee331ea154..f073fad8e4 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -86,15 +86,18 @@ //! [`RequestContext`] argument. Functions in the middle of the call chain //! only need to pass it on. +use std::sync::{Arc, Mutex, MutexGuard}; + use crate::task_mgr::TaskKind; // The main structure of this module, see module-level comment. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct RequestContext { task_kind: TaskKind, download_behavior: DownloadBehavior, access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, + page_cache_permit: Option>, } /// The kind of access to the page cache. @@ -150,6 +153,7 @@ impl RequestContextBuilder { download_behavior: DownloadBehavior::Download, access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, + page_cache_permit: None, }, } } @@ -163,6 +167,7 @@ impl RequestContextBuilder { download_behavior: original.download_behavior, access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, + page_cache_permit: original.page_cache_permit.clone(), }, } } @@ -186,6 +191,11 @@ impl RequestContextBuilder { self } + pub(crate) fn page_cache_permit(mut self, p: Arc) -> Self { + self.inner.page_cache_permit = Some(p); + self + } + pub fn build(self) -> RequestContext { self.inner } @@ -286,4 +296,8 @@ impl RequestContext { pub(crate) fn page_content_kind(&self) -> PageContentKind { self.page_content_kind } + + pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> { + self.page_cache_permit.as_ref().map(|p| &**p) + } } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 95e26cb0cb..7f64cc987c 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -215,16 +215,21 @@ impl Slot { impl SlotInner { /// If there is aready a reader, drop our permit and share its permit, just like we share read access. - fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc { - let mut guard = self.permit.lock().unwrap(); - if let Some(existing_permit) = guard.upgrade() { - drop(guard); - drop(permit); - existing_permit - } else { - let permit = Arc::new(permit); - *guard = Arc::downgrade(&permit); - permit + fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> { + match permit { + PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit), + PermitKind::Acquired(permit) => { + let mut guard = self.permit.lock().unwrap(); + if let Some(existing_permit) = guard.upgrade() { + drop(guard); + drop(permit); + existing_permit + } else { + let permit = Arc::new(permit); + *guard = Arc::downgrade(&permit); + permit + } + } } } } @@ -260,18 +265,28 @@ pub struct PageCache { size_metrics: &'static PageCacheSizeMetrics, } -struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); +pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); + +enum PermitKind<'c> { + CtxProvided(&'c PinnedSlotsPermit), + Acquired(PinnedSlotsPermit), +} + +enum PermitKindReadGuard<'c> { + CtxProvided(&'c PinnedSlotsPermit), + Coalesced(Arc), +} /// /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked /// until the guard is dropped. /// -pub struct PageReadGuard<'i> { - _permit: Arc, +pub struct PageReadGuard<'c, 'i> { + _permit: PermitKindReadGuard<'c>, slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>, } -impl std::ops::Deref for PageReadGuard<'_> { +impl std::ops::Deref for PageReadGuard<'_, '_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { @@ -279,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> { } } -impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { +impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> { fn as_ref(&self) -> &[u8; PAGE_SZ] { self.slot_guard.buf } @@ -292,19 +307,19 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { /// Counterintuitively, this is used even for a read, if the requested page is not /// currently found in the page cache. In that case, the caller of lock_for_read() /// is expected to fill in the page contents and call mark_valid(). -pub struct PageWriteGuard<'i> { - state: PageWriteGuardState<'i>, +pub struct PageWriteGuard<'c, 'i> { + state: PageWriteGuardState<'c, 'i>, } -enum PageWriteGuardState<'i> { +enum PageWriteGuardState<'c, 'i> { Invalid { inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, - _permit: PinnedSlotsPermit, + _permit: PermitKindReadGuard<'c>, }, Downgraded, } -impl std::ops::DerefMut for PageWriteGuard<'_> { +impl std::ops::DerefMut for PageWriteGuard<'_, '_> { fn deref_mut(&mut self) -> &mut Self::Target { match &mut self.state { PageWriteGuardState::Invalid { inner, _permit } => inner.buf, @@ -313,7 +328,7 @@ impl std::ops::DerefMut for PageWriteGuard<'_> { } } -impl std::ops::Deref for PageWriteGuard<'_> { +impl std::ops::Deref for PageWriteGuard<'_, '_> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { @@ -324,7 +339,7 @@ impl std::ops::Deref for PageWriteGuard<'_> { } } -impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { +impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> { fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] { match &mut self.state { PageWriteGuardState::Invalid { inner, _permit } => inner.buf, @@ -333,16 +348,16 @@ impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { } } -impl<'a> PageWriteGuard<'a> { +impl<'c, 'a> PageWriteGuard<'c, 'a> { /// Mark that the buffer contents are now valid. #[must_use] - pub fn mark_valid(mut self) -> PageReadGuard<'a> { + pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> { let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); match prev { PageWriteGuardState::Invalid { inner, _permit } => { assert!(inner.key.is_some()); PageReadGuard { - _permit: Arc::new(_permit), + _permit, slot_guard: inner.downgrade(), } } @@ -351,7 +366,7 @@ impl<'a> PageWriteGuard<'a> { } } -impl Drop for PageWriteGuard<'_> { +impl Drop for PageWriteGuard<'_, '_> { /// /// If the buffer was allocated for a page that was not already in the /// cache, but the lock_for_read/write() caller dropped the buffer without @@ -371,9 +386,9 @@ impl Drop for PageWriteGuard<'_> { } /// lock_for_read() return value -pub enum ReadBufResult<'a> { - Found(PageReadGuard<'a>), - NotFound(PageWriteGuard<'a>), +pub enum ReadBufResult<'c, 'a> { + Found(PageReadGuard<'c, 'a>), + NotFound(PageWriteGuard<'c, 'a>), } impl PageCache { @@ -395,10 +410,9 @@ impl PageCache { lsn: Lsn, ctx: &RequestContext, ) -> Option<(Lsn, PageReadGuard)> { - let Ok(permit) = self.try_get_pinned_slot_permit().await else { + let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else { return None; }; - crate::metrics::PAGE_CACHE .for_ctx(ctx) .read_accesses_materialized_page @@ -452,6 +466,7 @@ impl PageCache { key: Key, lsn: Lsn, img: &[u8], + ctx: &RequestContext, ) -> anyhow::Result<()> { let cache_key = CacheKey::MaterializedPage { hash_key: MaterializedPageHashKey { @@ -462,7 +477,7 @@ impl PageCache { lsn, }; - let mut permit = Some(self.try_get_pinned_slot_permit().await?); + let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?); loop { // First check if the key already exists in the cache. if let Some(slot_idx) = self.search_mapping_exact(&cache_key) { @@ -556,7 +571,22 @@ impl PageCache { // "mappings" after this section. But the routines in this section should // not require changes. - async fn try_get_pinned_slot_permit(&self) -> anyhow::Result { + pub(crate) async fn get_permit(&self) -> Arc { + Arc::new(PinnedSlotsPermit( + Arc::clone(&self.pinned_slots) + .acquire_owned() + .await + .expect("the semaphore is never closed"), + )) + } + + async fn try_get_pinned_slot_permit<'c>( + &self, + ctx: &'c RequestContext, + ) -> anyhow::Result> { + if let Some(permit) = ctx.permit() { + return Ok(PermitKind::CtxProvided(permit)); + }; let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer(); match tokio::time::timeout( // Choose small timeout, neon_smgr does its own retries. @@ -566,9 +596,9 @@ impl PageCache { ) .await { - Ok(res) => Ok(PinnedSlotsPermit( + Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit( res.expect("this semaphore is never closed"), - )), + ))), Err(_timeout) => { timer.stop_and_discard(); crate::metrics::page_cache_errors_inc( @@ -588,10 +618,10 @@ impl PageCache { /// /// If no page is found, returns None and *cache_key is left unmodified. /// - async fn try_lock_for_read( + async fn try_lock_for_read<'c>( &self, cache_key: &mut CacheKey, - permit: &mut Option, + permit: &mut Option>, ) -> Option { let cache_key_orig = cache_key.clone(); if let Some(slot_idx) = self.search_mapping(cache_key) { @@ -648,7 +678,7 @@ impl PageCache { cache_key: &mut CacheKey, ctx: &RequestContext, ) -> anyhow::Result { - let mut permit = Some(self.try_get_pinned_slot_permit().await?); + let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?); let (read_access, hit) = match cache_key { CacheKey::MaterializedPage { .. } => { diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 39f0d03a01..b326fbbf4e 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -16,7 +16,7 @@ use std::{ collections::HashMap, ops::ControlFlow, - sync::Arc, + sync::{Arc, Mutex}, time::{Duration, SystemTime}, }; @@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; use crate::{ - context::{DownloadBehavior, RequestContext}, + context::{DownloadBehavior, RequestContext, RequestContextBuilder}, task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}, tenant::{ config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}, @@ -397,9 +397,14 @@ impl Timeline { } } + let permit = crate::page_cache::get().get_permit().await; + let ctx = RequestContextBuilder::extend(ctx) + .page_cache_permit(permit) + .build(); + // imitiate repartiting on first compactation if let Err(e) = self - .collect_keyspace(lsn, ctx) + .collect_keyspace(lsn, &ctx) .instrument(info_span!("collect_keyspace")) .await {