mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
WIP: provide permits in requestcontext
This commit is contained in:
@@ -86,15 +86,18 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone)]
|
||||
pub struct RequestContext {
|
||||
task_kind: TaskKind,
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
page_cache_permit: Option<Arc<crate::page_cache::PinnedSlotsPermit>>,
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -150,6 +153,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: DownloadBehavior::Download,
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
page_cache_permit: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -163,6 +167,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
page_cache_permit: original.page_cache_permit.clone(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -186,6 +191,11 @@ impl RequestContextBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_permit(mut self, p: Arc<crate::page_cache::PinnedSlotsPermit>) -> Self {
|
||||
self.inner.page_cache_permit = Some(p);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> RequestContext {
|
||||
self.inner
|
||||
}
|
||||
@@ -286,4 +296,8 @@ impl RequestContext {
|
||||
pub(crate) fn page_content_kind(&self) -> PageContentKind {
|
||||
self.page_content_kind
|
||||
}
|
||||
|
||||
pub(crate) fn permit(&self) -> Option<&crate::page_cache::PinnedSlotsPermit> {
|
||||
self.page_cache_permit.as_ref().map(|p| &**p)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,16 +215,21 @@ impl Slot {
|
||||
|
||||
impl SlotInner {
|
||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
fn coalesce_readers_permit<'c>(&self, permit: PermitKind<'c>) -> PermitKindReadGuard<'c> {
|
||||
match permit {
|
||||
PermitKind::CtxProvided(permit) => PermitKindReadGuard::CtxProvided(permit),
|
||||
PermitKind::Acquired(permit) => {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -260,18 +265,28 @@ pub struct PageCache {
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
pub(crate) struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
enum PermitKind<'c> {
|
||||
CtxProvided(&'c PinnedSlotsPermit),
|
||||
Acquired(PinnedSlotsPermit),
|
||||
}
|
||||
|
||||
enum PermitKindReadGuard<'c> {
|
||||
CtxProvided(&'c PinnedSlotsPermit),
|
||||
Coalesced(Arc<PinnedSlotsPermit>),
|
||||
}
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i> {
|
||||
_permit: Arc<PinnedSlotsPermit>,
|
||||
pub struct PageReadGuard<'c, 'i> {
|
||||
_permit: PermitKindReadGuard<'c>,
|
||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
impl std::ops::Deref for PageReadGuard<'_, '_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -279,7 +294,7 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_, '_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.slot_guard.buf
|
||||
}
|
||||
@@ -292,19 +307,19 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid().
|
||||
pub struct PageWriteGuard<'i> {
|
||||
state: PageWriteGuardState<'i>,
|
||||
pub struct PageWriteGuard<'c, 'i> {
|
||||
state: PageWriteGuardState<'c, 'i>,
|
||||
}
|
||||
|
||||
enum PageWriteGuardState<'i> {
|
||||
enum PageWriteGuardState<'c, 'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PinnedSlotsPermit,
|
||||
_permit: PermitKindReadGuard<'c>,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_, '_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
@@ -313,7 +328,7 @@ impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
impl std::ops::Deref for PageWriteGuard<'_, '_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
@@ -324,7 +339,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_, '_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
@@ -333,16 +348,16 @@ impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> PageWriteGuard<'a> {
|
||||
impl<'c, 'a> PageWriteGuard<'c, 'a> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
#[must_use]
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'c, 'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit: Arc::new(_permit),
|
||||
_permit,
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
@@ -351,7 +366,7 @@ impl<'a> PageWriteGuard<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
impl Drop for PageWriteGuard<'_, '_> {
|
||||
///
|
||||
/// If the buffer was allocated for a page that was not already in the
|
||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||
@@ -371,9 +386,9 @@ impl Drop for PageWriteGuard<'_> {
|
||||
}
|
||||
|
||||
/// lock_for_read() return value
|
||||
pub enum ReadBufResult<'a> {
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
pub enum ReadBufResult<'c, 'a> {
|
||||
Found(PageReadGuard<'c, 'a>),
|
||||
NotFound(PageWriteGuard<'c, 'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
@@ -395,10 +410,9 @@ impl PageCache {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit(ctx).await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_materialized_page
|
||||
@@ -452,6 +466,7 @@ impl PageCache {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
@@ -462,7 +477,7 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
@@ -556,7 +571,22 @@ impl PageCache {
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
pub(crate) async fn get_permit(&self) -> Arc<PinnedSlotsPermit> {
|
||||
Arc::new(PinnedSlotsPermit(
|
||||
Arc::clone(&self.pinned_slots)
|
||||
.acquire_owned()
|
||||
.await
|
||||
.expect("the semaphore is never closed"),
|
||||
))
|
||||
}
|
||||
|
||||
async fn try_get_pinned_slot_permit<'c>(
|
||||
&self,
|
||||
ctx: &'c RequestContext,
|
||||
) -> anyhow::Result<PermitKind<'c>> {
|
||||
if let Some(permit) = ctx.permit() {
|
||||
return Ok(PermitKind::CtxProvided(permit));
|
||||
};
|
||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
@@ -566,9 +596,9 @@ impl PageCache {
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
Ok(res) => Ok(PermitKind::Acquired(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
))),
|
||||
Err(_timeout) => {
|
||||
timer.stop_and_discard();
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
@@ -588,10 +618,10 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(
|
||||
async fn try_lock_for_read<'c>(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
permit: &mut Option<PermitKind<'c>>,
|
||||
) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
@@ -648,7 +678,7 @@ impl PageCache {
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit(ctx).await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ops::ControlFlow,
|
||||
sync::Arc,
|
||||
sync::{Arc, Mutex},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
@@ -25,7 +25,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info, info_span, instrument, warn, Instrument};
|
||||
|
||||
use crate::{
|
||||
context::{DownloadBehavior, RequestContext},
|
||||
context::{DownloadBehavior, RequestContext, RequestContextBuilder},
|
||||
task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
|
||||
tenant::{
|
||||
config::{EvictionPolicy, EvictionPolicyLayerAccessThreshold},
|
||||
@@ -397,9 +397,14 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let permit = crate::page_cache::get().get_permit().await;
|
||||
let ctx = RequestContextBuilder::extend(ctx)
|
||||
.page_cache_permit(permit)
|
||||
.build();
|
||||
|
||||
// imitiate repartiting on first compactation
|
||||
if let Err(e) = self
|
||||
.collect_keyspace(lsn, ctx)
|
||||
.collect_keyspace(lsn, &ctx)
|
||||
.instrument(info_span!("collect_keyspace"))
|
||||
.await
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user