diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 8306ce4636..e1e696ddad 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -10,6 +10,42 @@ //! PostgreSQL buffer size, and a Slot struct for each buffer to contain //! information about what's stored in the buffer. //! +//! # Types Of Pages +//! +//! [`PageCache`] only supports immutable pages. +//! Hence there is no need to worry about coherency. +//! +//! Two types of pages are supported: +//! +//! * **Materialized pages**, filled & used by page reconstruction +//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`]. +//! +//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only. +//! It uses the page cache only for the blocks that are already fully written and immutable. +//! +//! # Filling The Page Cache +//! +//! Page cache maps from a cache key to a buffer slot. +//! The cache key uniquely identifies the piece of data that is being cached. +//! +//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`]. +//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access. +//! +//! The cache key for **immutable file** pages is [`FileId`] and a block number. +//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following: +//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`]. +//! * Get a [`FileId`] using [`next_file_id`]. +//! * Use the mechanism to associate the on-disk file with the returned [`FileId`]. +//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`]. +//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains +//! a read guard for the page. Just use it. +//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains +//! a write guard for the page. Fill the page with the contents of the on-disk file. +//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid. +//! Then try again to [`PageCache::read_immutable_buf`]. +//! Unless there's high cache pressure, the page should now be cached. +//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.) +//! //! # Locking //! //! There are two levels of locking involved: There's one lock for the "mapping" @@ -40,20 +76,18 @@ use std::{ collections::{hash_map::Entry, HashMap}, convert::TryInto, sync::{ - atomic::{AtomicU8, AtomicUsize, Ordering}, + atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError, }, }; use anyhow::Context; use once_cell::sync::OnceCell; -use tracing::error; use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, }; -use crate::tenant::{block_io, ephemeral_file, writeback_ephemeral_file}; use crate::{metrics::PageCacheSizeMetrics, repository::Key}; static PAGE_CACHE: OnceCell = OnceCell::new(); @@ -87,6 +121,17 @@ pub fn get() -> &'static PageCache { pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; const MAX_USAGE_COUNT: u8 = 5; +/// See module-level comment. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct FileId(u64); + +static NEXT_ID: AtomicU64 = AtomicU64::new(1); + +/// See module-level comment. +pub fn next_file_id() -> FileId { + FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) +} + /// /// CacheKey uniquely identifies a "thing" to cache in the page cache. /// @@ -97,12 +142,8 @@ enum CacheKey { hash_key: MaterializedPageHashKey, lsn: Lsn, }, - EphemeralPage { - file_id: ephemeral_file::FileId, - blkno: u32, - }, ImmutableFilePage { - file_id: block_io::FileId, + file_id: FileId, blkno: u32, }, } @@ -128,7 +169,6 @@ struct Slot { struct SlotInner { key: Option, buf: &'static mut [u8; PAGE_SZ], - dirty: bool, } impl Slot { @@ -177,9 +217,7 @@ pub struct PageCache { /// can have a separate mapping map, next to this field. materialized_page_map: RwLock>>, - ephemeral_page_map: RwLock>, - - immutable_page_map: RwLock>, + immutable_page_map: RwLock>, /// The actual buffers with their metadata. slots: Box<[Slot]>, @@ -258,14 +296,6 @@ impl PageWriteGuard<'_> { ); self.valid = true; } - pub fn mark_dirty(&mut self) { - // only ephemeral pages can be dirty ATM. - assert!(matches!( - self.inner.key, - Some(CacheKey::EphemeralPage { .. }) - )); - self.inner.dirty = true; - } } impl Drop for PageWriteGuard<'_> { @@ -280,7 +310,6 @@ impl Drop for PageWriteGuard<'_> { let self_key = self.inner.key.as_ref().unwrap(); PAGE_CACHE.get().unwrap().remove_mapping(self_key); self.inner.key = None; - self.inner.dirty = false; } } } @@ -388,62 +417,16 @@ impl PageCache { Ok(()) } - // Section 1.2: Public interface functions for working with Ephemeral pages. + // Section 1.2: Public interface functions for working with immutable file pages. - pub fn read_ephemeral_buf( - &self, - file_id: ephemeral_file::FileId, - blkno: u32, - ) -> anyhow::Result { - let mut cache_key = CacheKey::EphemeralPage { file_id, blkno }; - - self.lock_for_read(&mut cache_key) - } - - pub fn write_ephemeral_buf( - &self, - file_id: ephemeral_file::FileId, - blkno: u32, - ) -> anyhow::Result { - let cache_key = CacheKey::EphemeralPage { file_id, blkno }; - - self.lock_for_write(&cache_key) - } - - /// Immediately drop all buffers belonging to given file, without writeback - pub fn drop_buffers_for_ephemeral(&self, drop_file_id: ephemeral_file::FileId) { - for slot_idx in 0..self.slots.len() { - let slot = &self.slots[slot_idx]; - - let mut inner = slot.inner.write().unwrap(); - if let Some(key) = &inner.key { - match key { - CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => { - // remove mapping for old buffer - self.remove_mapping(key); - inner.key = None; - inner.dirty = false; - } - _ => {} - } - } - } - } - - // Section 1.3: Public interface functions for working with immutable file pages. - - pub fn read_immutable_buf( - &self, - file_id: block_io::FileId, - blkno: u32, - ) -> anyhow::Result { + pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result { let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; self.lock_for_read(&mut cache_key) } - /// Immediately drop all buffers belonging to given file, without writeback - pub fn drop_buffers_for_immutable(&self, drop_file_id: block_io::FileId) { + /// Immediately drop all buffers belonging to given file + pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) { for slot_idx in 0..self.slots.len() { let slot = &self.slots[slot_idx]; @@ -456,7 +439,6 @@ impl PageCache { // remove mapping for old buffer self.remove_mapping(key); inner.key = None; - inner.dirty = false; } _ => {} } @@ -534,10 +516,6 @@ impl PageCache { CacheKey::MaterializedPage { .. } => { unreachable!("Materialized pages use lookup_materialized_page") } - CacheKey::EphemeralPage { .. } => ( - &crate::metrics::PAGE_CACHE.read_accesses_ephemeral, - &crate::metrics::PAGE_CACHE.read_hits_ephemeral, - ), CacheKey::ImmutableFilePage { .. } => ( &crate::metrics::PAGE_CACHE.read_accesses_immutable, &crate::metrics::PAGE_CACHE.read_hits_immutable, @@ -578,7 +556,6 @@ impl PageCache { // Make the slot ready let slot = &self.slots[slot_idx]; inner.key = Some(cache_key.clone()); - inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); return Ok(ReadBufResult::NotFound(PageWriteGuard { @@ -640,7 +617,6 @@ impl PageCache { // Make the slot ready let slot = &self.slots[slot_idx]; inner.key = Some(cache_key.clone()); - inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); return Ok(WriteBufResult::NotFound(PageWriteGuard { @@ -679,10 +655,6 @@ impl PageCache { *lsn = version.lsn; Some(version.slot_idx) } - CacheKey::EphemeralPage { file_id, blkno } => { - let map = self.ephemeral_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } CacheKey::ImmutableFilePage { file_id, blkno } => { let map = self.immutable_page_map.read().unwrap(); Some(*map.get(&(*file_id, *blkno))?) @@ -706,10 +678,6 @@ impl PageCache { None } } - CacheKey::EphemeralPage { file_id, blkno } => { - let map = self.ephemeral_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } CacheKey::ImmutableFilePage { file_id, blkno } => { let map = self.immutable_page_map.read().unwrap(); Some(*map.get(&(*file_id, *blkno))?) @@ -743,12 +711,6 @@ impl PageCache { panic!("could not find old key in mapping") } } - CacheKey::EphemeralPage { file_id, blkno } => { - let mut map = self.ephemeral_page_map.write().unwrap(); - map.remove(&(*file_id, *blkno)) - .expect("could not find old key in mapping"); - self.size_metrics.current_bytes_ephemeral.sub_page_sz(1); - } CacheKey::ImmutableFilePage { file_id, blkno } => { let mut map = self.immutable_page_map.write().unwrap(); map.remove(&(*file_id, *blkno)) @@ -788,17 +750,7 @@ impl PageCache { } } } - CacheKey::EphemeralPage { file_id, blkno } => { - let mut map = self.ephemeral_page_map.write().unwrap(); - match map.entry((*file_id, *blkno)) { - Entry::Occupied(entry) => Some(*entry.get()), - Entry::Vacant(entry) => { - entry.insert(slot_idx); - self.size_metrics.current_bytes_ephemeral.add_page_sz(1); - None - } - } - } + CacheKey::ImmutableFilePage { file_id, blkno } => { let mut map = self.immutable_page_map.write().unwrap(); match map.entry((*file_id, *blkno)) { @@ -849,25 +801,8 @@ impl PageCache { } }; if let Some(old_key) = &inner.key { - if inner.dirty { - if let Err(err) = Self::writeback(old_key, inner.buf) { - // Writing the page to disk failed. - // - // FIXME: What to do here, when? We could propagate the error to the - // caller, but victim buffer is generally unrelated to the original - // call. It can even belong to a different tenant. Currently, we - // report the error to the log and continue the clock sweep to find - // a different victim. But if the problem persists, the page cache - // could fill up with dirty pages that we cannot evict, and we will - // loop retrying the writebacks indefinitely. - error!("writeback of buffer {:?} failed: {}", old_key, err); - continue; - } - } - // remove mapping for old buffer self.remove_mapping(old_key); - inner.dirty = false; inner.key = None; } return Ok((slot_idx, inner)); @@ -875,28 +810,6 @@ impl PageCache { } } - fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> { - match cache_key { - CacheKey::MaterializedPage { - hash_key: _, - lsn: _, - } => Err(std::io::Error::new( - std::io::ErrorKind::Other, - "unexpected dirty materialized page", - )), - CacheKey::EphemeralPage { file_id, blkno } => { - writeback_ephemeral_file(*file_id, *blkno, buf) - } - CacheKey::ImmutableFilePage { - file_id: _, - blkno: _, - } => Err(std::io::Error::new( - std::io::ErrorKind::Other, - "unexpected dirty immutable page", - )), - } - } - /// Initialize a new page cache /// /// This should be called only once at page server startup. @@ -907,7 +820,6 @@ impl PageCache { let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; size_metrics.max_bytes.set_page_sz(num_pages); - size_metrics.current_bytes_ephemeral.set_page_sz(0); size_metrics.current_bytes_immutable.set_page_sz(0); size_metrics.current_bytes_materialized_page.set_page_sz(0); @@ -917,11 +829,7 @@ impl PageCache { let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); Slot { - inner: RwLock::new(SlotInner { - key: None, - buf, - dirty: false, - }), + inner: RwLock::new(SlotInner { key: None, buf }), usage_count: AtomicU8::new(0), } }) @@ -929,7 +837,6 @@ impl PageCache { Self { materialized_page_map: Default::default(), - ephemeral_page_map: Default::default(), immutable_page_map: Default::default(), slots, next_evict_slot: AtomicUsize::new(0), diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index cedb381ccc..309020391f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -136,9 +136,6 @@ pub use timeline::{ LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline, }; -// re-export this function so that page_cache.rs can use it. -pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file; - // re-export for use in remote_timeline_client.rs pub use crate::tenant::metadata::save_metadata; diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 3cc4e61a95..503e5bd4e6 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -6,7 +6,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use bytes::Bytes; use std::ops::{Deref, DerefMut}; use std::os::unix::fs::FileExt; -use std::sync::atomic::AtomicU64; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -43,37 +42,34 @@ where } } -/// A block accessible for reading -/// -/// During builds with `#[cfg(test)]`, this is a proper enum -/// with two variants to support testing code. During normal -/// builds, it just has one variant and is thus a cheap newtype -/// wrapper of [`PageReadGuard`] -pub enum BlockLease { +/// Reference to an in-memory copy of an immutable on-disk block. +pub enum BlockLease<'a> { PageReadGuard(PageReadGuard<'static>), + EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Rc(std::rc::Rc<[u8; PAGE_SZ]>), } -impl From> for BlockLease { - fn from(value: PageReadGuard<'static>) -> Self { +impl From> for BlockLease<'static> { + fn from(value: PageReadGuard<'static>) -> BlockLease<'static> { BlockLease::PageReadGuard(value) } } #[cfg(test)] -impl From> for BlockLease { +impl<'a> From> for BlockLease<'a> { fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self { BlockLease::Rc(value) } } -impl Deref for BlockLease { +impl<'a> Deref for BlockLease<'a> { type Target = [u8; PAGE_SZ]; fn deref(&self) -> &Self::Target { match self { BlockLease::PageReadGuard(v) => v.deref(), + BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] BlockLease::Rc(v) => v.deref(), } @@ -116,13 +112,6 @@ where self.reader.read_blk(blknum) } } -static NEXT_ID: AtomicU64 = AtomicU64::new(1); -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct FileId(u64); - -fn next_file_id() -> FileId { - FileId(NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)) -} /// An adapter for reading a (virtual) file using the page cache. /// @@ -132,7 +121,7 @@ pub struct FileBlockReader { pub file: F, /// Unique ID of this file, used as key in the page cache. - file_id: FileId, + file_id: page_cache::FileId, } impl FileBlockReader @@ -140,7 +129,7 @@ where F: FileExt, { pub fn new(file: F) -> Self { - let file_id = next_file_id(); + let file_id = page_cache::next_file_id(); FileBlockReader { file_id, file } } @@ -157,7 +146,6 @@ where F: FileExt, { fn read_blk(&self, blknum: u32) -> Result { - // Look up the right page let cache = page_cache::get(); loop { match cache diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 237c17d852..5de9c24d90 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -2,54 +2,31 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; -use crate::page_cache::{self, ReadBufResult, WriteBufResult, PAGE_SZ}; +use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; -use once_cell::sync::Lazy; use std::cmp::min; -use std::collections::HashMap; use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; use std::os::unix::prelude::FileExt; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; +use std::sync::atomic::AtomicU64; use tracing::*; use utils::id::{TenantId, TimelineId}; -/// -/// This is the global cache of file descriptors (File objects). -/// -static EPHEMERAL_FILES: Lazy> = Lazy::new(|| { - RwLock::new(EphemeralFiles { - next_file_id: FileId(1), - files: HashMap::new(), - }) -}); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct FileId(u64); - -impl std::fmt::Display for FileId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -pub struct EphemeralFiles { - next_file_id: FileId, - - files: HashMap>, -} - pub struct EphemeralFile { - file_id: FileId, + page_cache_file_id: page_cache::FileId, + _tenant_id: TenantId, _timeline_id: TimelineId, - file: Arc, - - pub size: u64, + file: VirtualFile, + size: u64, + /// An ephemeral file is append-only. + /// We keep the last page, which can still be modified, in [`Self::mutable_tail`]. + /// The other pages, which can no longer be modified, are accessed through the page cache. + mutable_tail: [u8; PAGE_SZ], } impl EphemeralFile { @@ -58,74 +35,31 @@ impl EphemeralFile { tenant_id: TenantId, timeline_id: TimelineId, ) -> Result { - let mut l = EPHEMERAL_FILES.write().unwrap(); - let file_id = l.next_file_id; - l.next_file_id = FileId(l.next_file_id.0 + 1); + static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1); + let filename_disambiguator = + NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let filename = conf .timeline_path(&tenant_id, &timeline_id) - .join(PathBuf::from(format!("ephemeral-{}", file_id))); + .join(PathBuf::from(format!("ephemeral-{filename_disambiguator}"))); let file = VirtualFile::open_with_options( &filename, OpenOptions::new().read(true).write(true).create(true), )?; - let file_rc = Arc::new(file); - l.files.insert(file_id, file_rc.clone()); Ok(EphemeralFile { - file_id, + page_cache_file_id: page_cache::next_file_id(), _tenant_id: tenant_id, _timeline_id: timeline_id, - file: file_rc, + file, size: 0, + mutable_tail: [0u8; PAGE_SZ], }) } - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> { - let mut off = 0; - while off < PAGE_SZ { - let n = self - .file - .read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?; - - if n == 0 { - // Reached EOF. Fill the rest of the buffer with zeros. - const ZERO_BUF: [u8; PAGE_SZ] = [0u8; PAGE_SZ]; - - buf[off..].copy_from_slice(&ZERO_BUF[off..]); - break; - } - - off += n; - } - Ok(()) - } - - fn get_buf_for_write( - &self, - blkno: u32, - ) -> Result, io::Error> { - // Look up the right page - let cache = page_cache::get(); - let mut write_guard = match cache - .write_ephemeral_buf(self.file_id, blkno) - .map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))? - { - WriteBufResult::Found(guard) => guard, - WriteBufResult::NotFound(mut guard) => { - // Read the page from disk into the buffer - // TODO: if we're overwriting the whole page, no need to read it in first - self.fill_buffer(guard.deref_mut(), blkno)?; - guard.mark_valid(); - - // And then fall through to modify it. - guard - } - }; - write_guard.mark_dirty(); - - Ok(write_guard) + pub(crate) fn size(&self) -> u64 { + self.size } } @@ -146,49 +80,74 @@ impl BlobWriter for EphemeralFile { blknum: u32, /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write. off: usize, - /// Used by [`push_bytes`] to memoize the page cache write guard across calls to it. - memo_page_guard: MemoizedPageWriteGuard, - } - struct MemoizedPageWriteGuard { - guard: page_cache::PageWriteGuard<'static>, - /// The block number of the page in `guard`. - blknum: u32, } impl<'a> Writer<'a> { fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result> { - let blknum = (ephemeral_file.size / PAGE_SZ as u64) as u32; Ok(Writer { - blknum, + blknum: (ephemeral_file.size / PAGE_SZ as u64) as u32, off: (ephemeral_file.size % PAGE_SZ as u64) as usize, - memo_page_guard: MemoizedPageWriteGuard { - guard: ephemeral_file.get_buf_for_write(blknum)?, - blknum, - }, ephemeral_file, }) } #[inline(always)] fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> { - // `src_remaining` is the remaining bytes to be written let mut src_remaining = src; while !src_remaining.is_empty() { - let page = if self.memo_page_guard.blknum == self.blknum { - &mut self.memo_page_guard.guard - } else { - self.memo_page_guard.guard = - self.ephemeral_file.get_buf_for_write(self.blknum)?; - self.memo_page_guard.blknum = self.blknum; - &mut self.memo_page_guard.guard - }; - let dst_remaining = &mut page[self.off..]; + let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..]; let n = min(dst_remaining.len(), src_remaining.len()); dst_remaining[..n].copy_from_slice(&src_remaining[..n]); self.off += n; src_remaining = &src_remaining[n..]; if self.off == PAGE_SZ { - // This block is done, move to next one. - self.blknum += 1; - self.off = 0; + match self.ephemeral_file.file.write_all_at( + &self.ephemeral_file.mutable_tail, + self.blknum as u64 * PAGE_SZ as u64, + ) { + Ok(_) => { + // Pre-warm the page cache with what we just wrote. + // This isn't necessary for coherency/correctness, but it's how we've always done it. + let cache = page_cache::get(); + match cache.read_immutable_buf( + self.ephemeral_file.page_cache_file_id, + self.blknum, + ) { + Ok(page_cache::ReadBufResult::Found(_guard)) => { + // This function takes &mut self, so, it shouldn't be possible to reach this point. + unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum); + } + Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + buf.copy_from_slice(&self.ephemeral_file.mutable_tail); + write_guard.mark_valid(); + // pre-warm successful + } + Err(e) => { + error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); + // fail gracefully, it's not the end of the world if we can't pre-warm the cache here + } + } + // Zero the buffer for re-use. + // Zeroing is critical for correcntess because the write_blob code below + // and similarly read_blk expect zeroed pages. + self.ephemeral_file.mutable_tail.fill(0); + // This block is done, move to next one. + self.blknum += 1; + self.off = 0; + } + Err(e) => { + return Err(std::io::Error::new( + ErrorKind::Other, + // order error before path because path is long and error is short + format!( + "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}", + self.blknum, + e, + self.ephemeral_file.file.path.display(), + ), + )); + } + } } } Ok(()) @@ -227,10 +186,7 @@ impl Drop for EphemeralFile { fn drop(&mut self) { // drop all pages from page cache let cache = page_cache::get(); - cache.drop_buffers_for_ephemeral(self.file_id); - - // remove entry from the hash map - EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id); + cache.drop_buffers_for_immutable(self.page_cache_file_id); // unlink the file let res = std::fs::remove_file(&self.file.path); @@ -250,54 +206,48 @@ impl Drop for EphemeralFile { } } -pub fn writeback(file_id: FileId, blkno: u32, buf: &[u8]) -> Result<(), io::Error> { - if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) { - match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) { - Ok(_) => Ok(()), - Err(e) => Err(io::Error::new( - ErrorKind::Other, - format!( - "failed to write back to ephemeral file at {} error: {}", - file.path.display(), - e - ), - )), - } - } else { - Err(io::Error::new( - ErrorKind::Other, - "could not write back page, not found in ephemeral files hash", - )) - } -} - impl BlockReader for EphemeralFile { fn read_blk(&self, blknum: u32) -> Result { - // Look up the right page - let cache = page_cache::get(); - loop { - match cache - .read_ephemeral_buf(self.file_id, blknum) - .map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))? - { - ReadBufResult::Found(guard) => return Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum)?; - write_guard.mark_valid(); + let flushed_blknums = 0..self.size / PAGE_SZ as u64; + if flushed_blknums.contains(&(blknum as u64)) { + let cache = page_cache::get(); + loop { + match cache + .read_immutable_buf(self.page_cache_file_id, blknum) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + // order path before error because error is anyhow::Error => might have many contexts + format!( + "ephemeral file: read immutable page #{}: {}: {:#}", + blknum, + self.file.path.display(), + e, + ), + ) + })? { + page_cache::ReadBufResult::Found(guard) => { + return Ok(BlockLease::PageReadGuard(guard)) + } + page_cache::ReadBufResult::NotFound(mut write_guard) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + self.file + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; + write_guard.mark_valid(); - // Swap for read lock - continue; - } - }; + // Swap for read lock + continue; + } + }; + } + } else { + debug_assert_eq!(blknum as u64, self.size / PAGE_SZ as u64); + Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) } } } -fn to_io_error(e: anyhow::Error, context: &str) -> io::Error { - io::Error::new(ErrorKind::Other, format!("{context}: {e:#}")) -} - #[cfg(test)] mod tests { use super::*; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index aa9d0884e0..d3ec78887d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -230,11 +230,11 @@ impl std::fmt::Display for InMemoryLayer { impl InMemoryLayer { /// - /// Get layer size on the disk + /// Get layer size. /// pub async fn size(&self) -> Result { let inner = self.inner.read().await; - Ok(inner.file.size) + Ok(inner.file.size()) } ///