mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
Revert "Revert "page_cache: find_victim: don't spin while there's no chance for a slot (#5319)""
This reverts commit b3d889475839a5f651209dd7c0de203967d905ef.
This commit is contained in:
@@ -264,6 +264,46 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
|
||||
},
|
||||
});
|
||||
|
||||
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_page_cache_acquire_pinned_slot_seconds",
|
||||
"Time spent acquiring a pinned slot in the page cache",
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_page_cache_find_victim_iters_total",
|
||||
"Counter for the number of iterations in the find_victim loop",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
register_int_counter_vec!(
|
||||
"page_cache_errors_total",
|
||||
"Number of timeouts while acquiring a pinned slot in the page cache",
|
||||
&["error_kind"]
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
#[derive(IntoStaticStr)]
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
PAGE_CACHE_ERRORS
|
||||
.get_metric_with_label_values(&[error_kind.into()])
|
||||
.unwrap()
|
||||
.inc();
|
||||
}
|
||||
|
||||
pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_wait_lsn_seconds",
|
||||
|
||||
@@ -75,7 +75,11 @@
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -165,6 +169,8 @@ struct Slot {
|
||||
|
||||
struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
// for `coalesce_readers_permit`
|
||||
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
}
|
||||
|
||||
@@ -207,6 +213,22 @@ impl Slot {
|
||||
}
|
||||
}
|
||||
|
||||
impl SlotInner {
|
||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
/// This contains the mapping from the cache key to buffer slot that currently
|
||||
/// contains the page, if any.
|
||||
@@ -224,6 +246,8 @@ pub struct PageCache {
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
|
||||
pinned_slots: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
@@ -231,23 +255,28 @@ pub struct PageCache {
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>);
|
||||
pub struct PageReadGuard<'i> {
|
||||
_permit: Arc<PinnedSlotsPermit>,
|
||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.buf
|
||||
self.slot_guard.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.0.buf
|
||||
self.slot_guard.buf
|
||||
}
|
||||
}
|
||||
|
||||
@@ -264,6 +293,8 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
_permit: PinnedSlotsPermit,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
// Used to mark pages as invalid that are assigned but not yet filled with data.
|
||||
valid: bool,
|
||||
@@ -348,6 +379,10 @@ impl PageCache {
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_materialized_page
|
||||
@@ -362,7 +397,10 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
if let Some(guard) = self.try_lock_for_read(&mut cache_key).await {
|
||||
if let Some(guard) = self
|
||||
.try_lock_for_read(&mut cache_key, &mut Some(permit))
|
||||
.await
|
||||
{
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
@@ -445,6 +483,29 @@ impl PageCache {
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
|
||||
Duration::from_secs(10),
|
||||
Arc::clone(&self.pinned_slots).acquire_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
Err(_timeout) => {
|
||||
timer.stop_and_discard();
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
|
||||
);
|
||||
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache.
|
||||
///
|
||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
||||
@@ -454,7 +515,11 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
async fn try_lock_for_read(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
@@ -464,7 +529,10 @@ impl PageCache {
|
||||
let inner = slot.inner.read().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard(inner));
|
||||
return Some(PageReadGuard {
|
||||
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
||||
slot_guard: inner,
|
||||
});
|
||||
} else {
|
||||
// search_mapping might have modified the search key; restore it.
|
||||
*cache_key = cache_key_orig;
|
||||
@@ -507,6 +575,8 @@ impl PageCache {
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
@@ -523,17 +593,21 @@ impl PageCache {
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
@@ -555,7 +629,16 @@ impl PageCache {
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
@@ -566,7 +649,11 @@ impl PageCache {
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
||||
async fn try_lock_for_write(
|
||||
&self,
|
||||
cache_key: &CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
@@ -575,7 +662,18 @@ impl PageCache {
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
return Some(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
@@ -586,15 +684,20 @@ impl PageCache {
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key).await {
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
@@ -616,7 +719,16 @@ impl PageCache {
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
@@ -769,7 +881,10 @@ impl PageCache {
|
||||
/// Find a slot to evict.
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
async fn find_victim(
|
||||
&self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
@@ -782,13 +897,40 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
// and still haven't found a victim buffer, something's wrong.
|
||||
// Maybe all the buffers were in locked. That could happen in
|
||||
// theory, if you have more threads holding buffers locked than
|
||||
// there are buffers in the pool. In practice, with a reasonably
|
||||
// large buffer pool it really shouldn't happen.
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
}
|
||||
continue;
|
||||
@@ -799,6 +941,7 @@ impl PageCache {
|
||||
self.remove_mapping(old_key);
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
return Ok((slot_idx, inner));
|
||||
}
|
||||
}
|
||||
@@ -826,7 +969,11 @@ impl PageCache {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }),
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
@@ -838,6 +985,7 @@ impl PageCache {
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user