mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
inline lock_for_write and try_lock_for_write into memorize_materialized_page
Motivation
==========
It's the only user, and the name of `_for_write` is wrong as of
commit 7a63685cde
Author: Christian Schwarz <christian@neon.tech>
Date: Fri Aug 18 19:31:03 2023 +0200
simplify page-caching of EphemeralFile (#4994)
Notes
=====
This also allows us to get rid of the WriteBufResult type.
Also rename `search_mapping_for_write` to `search_mapping_exact`.
It makes more sense that way because there is `_for_write`-locking
anymore.
This commit is contained in:
@@ -66,8 +66,7 @@
|
||||
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
||||
//! the contents are valid. If you need to release the lock without initializing
|
||||
//! the contents, you must remove the mapping first. We make that easy for the
|
||||
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
||||
//! page, the caller must explicitly call guard.mark_valid() after it has
|
||||
//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has
|
||||
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
||||
//! mapping is automatically removed and the slot is marked free.
|
||||
//!
|
||||
@@ -286,10 +285,7 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
///
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
||||
/// to initialize.
|
||||
///
|
||||
/// is expected to fill in the page contents and call mark_valid().
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
@@ -354,12 +350,6 @@ pub enum ReadBufResult<'a> {
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
||||
@@ -446,20 +436,76 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key).await? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(&cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
debug_assert_eq!(inner.buf.len(), img.len());
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(inner.buf == img);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
Ok(())
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
// Create a write guard for the slot so we go through the expected motions.
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
let mut write_guard = PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
};
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
@@ -645,96 +691,6 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
async fn try_lock_for_write(
|
||||
&self,
|
||||
cache_key: &CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
return Some(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a write-locked buffer for given block.
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Section 3: Mapping functions
|
||||
//
|
||||
@@ -775,7 +731,7 @@ impl PageCache {
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
||||
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
|
||||
Reference in New Issue
Block a user