Compare commits

...

1 Commits

Author SHA1 Message Date
Christian Schwarz
7d79ace934 page_cache: eliminate PageWriteGuard 2023-08-28 11:02:10 +02:00
3 changed files with 72 additions and 143 deletions

View File

@@ -39,10 +39,9 @@
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
//! a read guard for the page. Just use it.
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
//! a write guard for the page. Fill the page with the contents of the on-disk file.
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
//! Then try again to [`PageCache::read_immutable_buf`].
//! * If the page was not cached, we invoke the supplied miss handler to fill the page,
//! then return [`ReadBufResult::MissFilled`].
//! After that, try again to [`PageCache::read_immutable_buf`].
//! Unless there's high cache pressure, the page should now be cached.
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
//!
@@ -249,81 +248,16 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
}
}
///
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
/// until the guard is dropped.
///
/// Counterintuitively, this is used even for a read, if the requested page is not
/// currently found in the page cache. In that case, the caller of lock_for_read()
/// is expected to fill in the page contents and call mark_valid(). Similarly
/// lock_for_write() can return an invalid buffer that the caller is expected to
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: RwLockWriteGuard<'i, SlotInner>,
// Are the page contents currently valid?
valid: bool,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
}
}
impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
}
}
impl PageWriteGuard<'_> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
}
}
impl Drop for PageWriteGuard<'_> {
///
/// If the buffer was allocated for a page that was not already in the
/// cache, but the lock_for_read/write() caller dropped the buffer without
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
}
}
}
/// lock_for_read() return value
pub enum ReadBufResult<'a> {
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
MissFilled,
}
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
NotFound(PageWriteGuard<'a>),
pub enum WriteBufResult {
Already,
Inserted,
}
impl PageCache {
@@ -401,17 +335,9 @@ impl PageCache {
lsn,
};
match self.lock_for_write(&cache_key)? {
WriteBufResult::Found(write_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
}
match self.lock_for_write(&cache_key, img)? {
WriteBufResult::Already => {}
WriteBufResult::Inserted => {}
}
Ok(())
@@ -419,10 +345,18 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
pub fn read_immutable_buf<M>(
&self,
file_id: FileId,
blkno: u32,
miss_handler: M,
) -> anyhow::Result<ReadBufResult>
where
M: FnOnce(&mut [u8; PAGE_SZ]) -> anyhow::Result<()>,
{
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
self.lock_for_read(&mut cache_key, miss_handler)
}
/// Immediately drop all buffers belonging to given file
@@ -511,7 +445,14 @@ impl PageCache {
/// }
/// ```
///
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
fn lock_for_read<M, P>(
&self,
cache_key: &mut CacheKey,
miss_handler: M,
) -> anyhow::Result<ReadBufResult>
where
M: FnOnce(&mut [u8; PAGE_SZ]) -> anyhow::Result<P>,
{
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
@@ -553,46 +494,41 @@ impl PageCache {
continue;
}
miss_handler(&mut inner.buf)?;
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
}));
return Ok(ReadBufResult::MissFilled);
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageWriteGuard { inner, valid: true });
}
}
None
}
/// Return a write-locked buffer for given block.
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
fn lock_for_write<P>(&self, cache_key: &CacheKey, content: P) -> anyhow::Result<WriteBufResult>
where
P: AsRef<[u8]>,
{
loop {
// First check if the key already exists in the cache.
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
return Ok(WriteBufResult::Found(write_guard));
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().unwrap();
if inner.key.as_ref() == Some(cache_key) {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert_eq!(inner.buf, content.as_ref());
slot.inc_usage_count();
return Ok(WriteBufResult::Already);
}
}
// Not found. Find a victim buffer
@@ -619,10 +555,9 @@ impl PageCache {
inner.key = Some(cache_key.clone());
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
inner,
valid: false,
}));
inner.buf.copy_from_slice(content.as_ref());
return Ok(WriteBufResult::Inserted);
}
}

View File

@@ -8,7 +8,7 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::fs::File;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
use std::os::unix::fs::FileExt;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
@@ -174,7 +174,11 @@ where
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum)
.read_immutable_buf(self.file_id, blknum, |buf| {
// Read the page from disk into the buffer
self.fill_buffer(buf, blknum)?;
Ok(())
})
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -182,14 +186,7 @@ where
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
ReadBufResult::MissFilled => continue,
};
}
}

View File

@@ -8,7 +8,6 @@ use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
@@ -67,7 +66,12 @@ impl EphemeralFile {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.read_immutable_buf(self.page_cache_file_id, blknum, |buf| {
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(buf, blknum as u64 * PAGE_SZ as u64)?;
Ok(())
})
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
@@ -83,13 +87,7 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();
page_cache::ReadBufResult::MissFilled => {
// Swap for read lock
continue;
}
@@ -138,18 +136,17 @@ impl EphemeralFile {
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
|buf| {
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
Ok(())
},
) {
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
}
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
// pre-warm successful
}
Ok(page_cache::ReadBufResult::MissFilled) => {}
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here