mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
rip out slot pinning, has about 5% speedup
This commit is contained in:
@@ -334,7 +334,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[derive(IntoStaticStr)]
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
|
||||
@@ -74,11 +74,7 @@
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
time::Duration,
|
||||
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
@@ -164,7 +160,6 @@ struct Slot {
|
||||
struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
// for `coalesce_readers_permit`
|
||||
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
|
||||
buf: &'static mut SlotContents,
|
||||
}
|
||||
|
||||
@@ -217,30 +212,12 @@ impl Slot {
|
||||
}
|
||||
}
|
||||
|
||||
impl SlotInner {
|
||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
|
||||
pinned_slots: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
@@ -248,14 +225,11 @@ pub struct PageCache {
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i> {
|
||||
_permit: Arc<PinnedSlotsPermit>,
|
||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||
}
|
||||
|
||||
@@ -287,7 +261,6 @@ pub struct PageWriteGuard<'i> {
|
||||
enum PageWriteGuardState<'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PinnedSlotsPermit,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
@@ -295,7 +268,7 @@ enum PageWriteGuardState<'i> {
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf.0,
|
||||
PageWriteGuardState::Invalid { inner } => &mut inner.buf.0,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -306,7 +279,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf.0,
|
||||
PageWriteGuardState::Invalid { inner } => &inner.buf.0,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -318,10 +291,9 @@ impl<'a> PageWriteGuard<'a> {
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
PageWriteGuardState::Invalid { inner } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit: Arc::new(_permit),
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
@@ -338,7 +310,7 @@ impl Drop for PageWriteGuard<'_> {
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
PageWriteGuardState::Invalid { inner } => {
|
||||
assert!(inner.key.is_some());
|
||||
let self_key = inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
@@ -375,27 +347,6 @@ impl PageCache {
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
|
||||
Duration::from_secs(10),
|
||||
Arc::clone(&self.pinned_slots).acquire_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
Err(_timeout) => {
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
|
||||
);
|
||||
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache.
|
||||
///
|
||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
||||
@@ -405,11 +356,7 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageReadGuard> {
|
||||
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
@@ -419,10 +366,7 @@ impl PageCache {
|
||||
let inner = slot.inner.read().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard {
|
||||
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
||||
slot_guard: inner,
|
||||
});
|
||||
return Some(PageReadGuard { slot_guard: inner });
|
||||
} else {
|
||||
// search_mapping might have modified the search key; restore it.
|
||||
*cache_key = cache_key_orig;
|
||||
@@ -465,8 +409,6 @@ impl PageCache {
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE
|
||||
@@ -480,19 +422,17 @@ impl PageCache {
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.find_victim()
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
@@ -516,19 +456,8 @@ impl PageCache {
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
state: PageWriteGuardState::Invalid { inner },
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -600,7 +529,6 @@ impl PageCache {
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
@@ -695,7 +623,6 @@ impl PageCache {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf: slot_contents,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
})
|
||||
@@ -706,7 +633,6 @@ impl PageCache {
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user