mirror of
https://github.com/neondatabase/neon.git
synced 2026-04-30 12:50:37 +00:00
Compare commits
1 Commits
zerocopy-p
...
problame/p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d79ace934 |
@@ -39,10 +39,9 @@
|
||||
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
|
||||
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
|
||||
//! a read guard for the page. Just use it.
|
||||
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
|
||||
//! a write guard for the page. Fill the page with the contents of the on-disk file.
|
||||
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
|
||||
//! Then try again to [`PageCache::read_immutable_buf`].
|
||||
//! * If the page was not cached, we invoke the supplied miss handler to fill the page,
|
||||
//! then return [`ReadBufResult::MissFilled`].
|
||||
//! After that, try again to [`PageCache::read_immutable_buf`].
|
||||
//! Unless there's high cache pressure, the page should now be cached.
|
||||
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
|
||||
//!
|
||||
@@ -249,81 +248,16 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
/// Counterintuitively, this is used even for a read, if the requested page is not
|
||||
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
||||
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
||||
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
||||
/// to initialize.
|
||||
///
|
||||
pub struct PageWriteGuard<'i> {
|
||||
inner: RwLockWriteGuard<'i, SlotInner>,
|
||||
|
||||
// Are the page contents currently valid?
|
||||
valid: bool,
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
||||
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
||||
self.inner.buf
|
||||
}
|
||||
}
|
||||
|
||||
impl PageWriteGuard<'_> {
|
||||
/// Mark that the buffer contents are now valid.
|
||||
pub fn mark_valid(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
assert!(
|
||||
!self.valid,
|
||||
"mark_valid called on a buffer that was already valid"
|
||||
);
|
||||
self.valid = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
///
|
||||
/// If the buffer was allocated for a page that was not already in the
|
||||
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
||||
/// initializing it, remove the mapping from the page cache.
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
assert!(self.inner.key.is_some());
|
||||
if !self.valid {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// lock_for_read() return value
|
||||
pub enum ReadBufResult<'a> {
|
||||
Found(PageReadGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
MissFilled,
|
||||
}
|
||||
|
||||
/// lock_for_write() return value
|
||||
pub enum WriteBufResult<'a> {
|
||||
Found(PageWriteGuard<'a>),
|
||||
NotFound(PageWriteGuard<'a>),
|
||||
pub enum WriteBufResult {
|
||||
Already,
|
||||
Inserted,
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
@@ -401,17 +335,9 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key)? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(*write_guard == img);
|
||||
}
|
||||
WriteBufResult::NotFound(mut write_guard) => {
|
||||
write_guard.copy_from_slice(img);
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
match self.lock_for_write(&cache_key, img)? {
|
||||
WriteBufResult::Already => {}
|
||||
WriteBufResult::Inserted => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -419,10 +345,18 @@ impl PageCache {
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
pub fn read_immutable_buf<M>(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
miss_handler: M,
|
||||
) -> anyhow::Result<ReadBufResult>
|
||||
where
|
||||
M: FnOnce(&mut [u8; PAGE_SZ]) -> anyhow::Result<()>,
|
||||
{
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key)
|
||||
self.lock_for_read(&mut cache_key, miss_handler)
|
||||
}
|
||||
|
||||
/// Immediately drop all buffers belonging to given file
|
||||
@@ -511,7 +445,14 @@ impl PageCache {
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
fn lock_for_read<M, P>(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
miss_handler: M,
|
||||
) -> anyhow::Result<ReadBufResult>
|
||||
where
|
||||
M: FnOnce(&mut [u8; PAGE_SZ]) -> anyhow::Result<P>,
|
||||
{
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
@@ -553,46 +494,41 @@ impl PageCache {
|
||||
continue;
|
||||
}
|
||||
|
||||
miss_handler(&mut inner.buf)?;
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
return Ok(ReadBufResult::MissFilled);
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache and lock it in write mode. If it's not
|
||||
/// found, returns None.
|
||||
///
|
||||
/// When locking a page for writing, the search criteria is always "exact".
|
||||
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageWriteGuard { inner, valid: true });
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Return a write-locked buffer for given block.
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
fn lock_for_write<P>(&self, cache_key: &CacheKey, content: P) -> anyhow::Result<WriteBufResult>
|
||||
where
|
||||
P: AsRef<[u8]>,
|
||||
{
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().unwrap();
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert_eq!(inner.buf, content.as_ref());
|
||||
slot.inc_usage_count();
|
||||
return Ok(WriteBufResult::Already);
|
||||
}
|
||||
}
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
@@ -619,10 +555,9 @@ impl PageCache {
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
}));
|
||||
inner.buf.copy_from_slice(content.as_ref());
|
||||
|
||||
return Ok(WriteBufResult::Inserted);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::fs::File;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::ops::Deref;
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
@@ -174,7 +174,11 @@ where
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum)
|
||||
.read_immutable_buf(self.file_id, blknum, |buf| {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(buf, blknum)?;
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
@@ -182,14 +186,7 @@ where
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard.into()),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
ReadBufResult::MissFilled => continue,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use crate::virtual_file::VirtualFile;
|
||||
use std::cmp::min;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
@@ -67,7 +66,12 @@ impl EphemeralFile {
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum)
|
||||
.read_immutable_buf(self.page_cache_file_id, blknum, |buf| {
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(buf, blknum as u64 * PAGE_SZ as u64)?;
|
||||
Ok(())
|
||||
})
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
@@ -83,13 +87,7 @@ impl EphemeralFile {
|
||||
page_cache::ReadBufResult::Found(guard) => {
|
||||
return Ok(BlockLease::PageReadGuard(guard))
|
||||
}
|
||||
page_cache::ReadBufResult::NotFound(mut write_guard) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
page_cache::ReadBufResult::MissFilled => {
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
@@ -138,18 +136,17 @@ impl EphemeralFile {
|
||||
match cache.read_immutable_buf(
|
||||
self.ephemeral_file.page_cache_file_id,
|
||||
self.blknum,
|
||||
|buf| {
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
Ok(())
|
||||
},
|
||||
) {
|
||||
Ok(page_cache::ReadBufResult::Found(_guard)) => {
|
||||
// This function takes &mut self, so, it shouldn't be possible to reach this point.
|
||||
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
|
||||
}
|
||||
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
|
||||
write_guard.mark_valid();
|
||||
// pre-warm successful
|
||||
}
|
||||
Ok(page_cache::ReadBufResult::MissFilled) => {}
|
||||
Err(e) => {
|
||||
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
|
||||
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
|
||||
|
||||
Reference in New Issue
Block a user