diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 883762a273..29bd6ce598 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -97,7 +97,7 @@ pub(crate) fn parse_filename(name: &str) -> Option { // Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH" async fn get_holes(path: &Path, max_holes: usize) -> Result> { let file = FileBlockReader::new(VirtualFile::open(path)?); - let summary_blk = file.read_blk(0)?; + let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( actual_summary.index_start_blk, diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 51fcd2bb22..2af54902f7 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -48,7 +48,7 @@ async fn read_delta_file(path: impl AsRef) -> Result<()> { virtual_file::init(10); page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path)?); - let summary_blk = file.read_blk(0)?; + let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new( actual_summary.index_start_blk, diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index e83206b4a8..fb1c5fc485 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -75,10 +75,7 @@ use std::{ collections::{hash_map::Entry, HashMap}, convert::TryInto, - sync::{ - atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError, - }, + sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, }; use anyhow::Context; @@ -162,7 +159,7 @@ struct Version { } struct Slot { - inner: RwLock, + inner: tokio::sync::RwLock, usage_count: AtomicU8, } @@ -220,9 +217,9 @@ pub struct PageCache { /// /// If you add support for caching different kinds of objects, each object kind /// can have a separate mapping map, next to this field. - materialized_page_map: RwLock>>, + materialized_page_map: std::sync::RwLock>>, - immutable_page_map: RwLock>, + immutable_page_map: std::sync::RwLock>, /// The actual buffers with their metadata. slots: Box<[Slot]>, @@ -238,7 +235,7 @@ pub struct PageCache { /// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked /// until the guard is dropped. /// -pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>); +pub struct PageReadGuard<'i>(tokio::sync::RwLockReadGuard<'i, SlotInner>); impl std::ops::Deref for PageReadGuard<'_> { type Target = [u8; PAGE_SZ]; @@ -265,7 +262,7 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { /// to initialize. /// pub struct PageWriteGuard<'i> { - inner: RwLockWriteGuard<'i, SlotInner>, + inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, // Are the page contents currently valid? // Used to mark pages as invalid that are assigned but not yet filled with data. @@ -343,7 +340,7 @@ impl PageCache { /// The 'lsn' is an upper bound, this will return the latest version of /// the given block, but not newer than 'lsn'. Returns the actual LSN of the /// returned page. - pub fn lookup_materialized_page( + pub async fn lookup_materialized_page( &self, tenant_id: TenantId, timeline_id: TimelineId, @@ -363,7 +360,7 @@ impl PageCache { lsn, }; - if let Some(guard) = self.try_lock_for_read(&mut cache_key) { + if let Some(guard) = self.try_lock_for_read(&mut cache_key).await { if let CacheKey::MaterializedPage { hash_key: _, lsn: available_lsn, @@ -390,7 +387,7 @@ impl PageCache { /// /// Store an image of the given page in the cache. /// - pub fn memorize_materialized_page( + pub async fn memorize_materialized_page( &self, tenant_id: TenantId, timeline_id: TimelineId, @@ -407,7 +404,7 @@ impl PageCache { lsn, }; - match self.lock_for_write(&cache_key)? { + match self.lock_for_write(&cache_key).await? { WriteBufResult::Found(write_guard) => { // We already had it in cache. Another thread must've put it there // concurrently. Check that it had the same contents that we @@ -425,10 +422,14 @@ impl PageCache { // Section 1.2: Public interface functions for working with immutable file pages. - pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result { + pub async fn read_immutable_buf( + &self, + file_id: FileId, + blkno: u32, + ) -> anyhow::Result { let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; - self.lock_for_read(&mut cache_key) + self.lock_for_read(&mut cache_key).await } // @@ -448,14 +449,14 @@ impl PageCache { /// /// If no page is found, returns None and *cache_key is left unmodified. /// - fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { + async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { let cache_key_orig = cache_key.clone(); if let Some(slot_idx) = self.search_mapping(cache_key) { // The page was found in the mapping. Lock the slot, and re-check // that it's still what we expected (because we released the mapping // lock already, another thread could have evicted the page) let slot = &self.slots[slot_idx]; - let inner = slot.inner.read().unwrap(); + let inner = slot.inner.read().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); return Some(PageReadGuard(inner)); @@ -496,7 +497,7 @@ impl PageCache { /// } /// ``` /// - fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result { + async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result { let (read_access, hit) = match cache_key { CacheKey::MaterializedPage { .. } => { unreachable!("Materialized pages use lookup_materialized_page") @@ -511,7 +512,7 @@ impl PageCache { let mut is_first_iteration = true; loop { // First check if the key already exists in the cache. - if let Some(read_guard) = self.try_lock_for_read(cache_key) { + if let Some(read_guard) = self.try_lock_for_read(cache_key).await { if is_first_iteration { hit.inc(); } @@ -554,13 +555,13 @@ impl PageCache { /// found, returns None. /// /// When locking a page for writing, the search criteria is always "exact". - fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option { + async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option { if let Some(slot_idx) = self.search_mapping_for_write(cache_key) { // The page was found in the mapping. Lock the slot, and re-check // that it's still what we expected (because we don't released the mapping // lock already, another thread could have evicted the page) let slot = &self.slots[slot_idx]; - let inner = slot.inner.write().unwrap(); + let inner = slot.inner.write().await; if inner.key.as_ref() == Some(cache_key) { slot.inc_usage_count(); return Some(PageWriteGuard { inner, valid: true }); @@ -573,10 +574,10 @@ impl PageCache { /// /// Similar to lock_for_read(), but the returned buffer is write-locked and /// may be modified by the caller even if it's already found in the cache. - fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { + async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { loop { // First check if the key already exists in the cache. - if let Some(write_guard) = self.try_lock_for_write(cache_key) { + if let Some(write_guard) = self.try_lock_for_write(cache_key).await { return Ok(WriteBufResult::Found(write_guard)); } @@ -757,7 +758,7 @@ impl PageCache { /// Find a slot to evict. /// /// On return, the slot is empty and write-locked. - fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard)> { + fn find_victim(&self) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { let iter_limit = self.slots.len() * 10; let mut iters = 0; loop { @@ -769,10 +770,7 @@ impl PageCache { if slot.dec_usage_count() == 0 { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, - Err(TryLockError::Poisoned(err)) => { - anyhow::bail!("buffer lock was poisoned: {err:?}") - } - Err(TryLockError::WouldBlock) => { + Err(_err) => { // If we have looped through the whole buffer pool 10 times // and still haven't found a victim buffer, something's wrong. // Maybe all the buffers were in locked. That could happen in @@ -816,7 +814,7 @@ impl PageCache { let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); Slot { - inner: RwLock::new(SlotInner { key: None, buf }), + inner: tokio::sync::RwLock::new(SlotInner { key: None, buf }), usage_count: AtomicU8::new(0), } }) diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 2a680bd890..f5ff15b50c 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -33,7 +33,7 @@ impl<'a> BlockCursor<'a> { let mut blknum = (offset / PAGE_SZ as u64) as u32; let mut off = (offset % PAGE_SZ as u64) as usize; - let mut buf = self.read_blk(blknum)?; + let mut buf = self.read_blk(blknum).await?; // peek at the first byte, to determine if it's a 1- or 4-byte length let first_len_byte = buf[off]; @@ -49,7 +49,7 @@ impl<'a> BlockCursor<'a> { // it is split across two pages len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]); blknum += 1; - buf = self.read_blk(blknum)?; + buf = self.read_blk(blknum).await?; len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]); off = 4 - thislen; } else { @@ -70,7 +70,7 @@ impl<'a> BlockCursor<'a> { if page_remain == 0 { // continue on next page blknum += 1; - buf = self.read_blk(blknum)?; + buf = self.read_blk(blknum).await?; off = 0; page_remain = PAGE_SZ; } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 3a6806357b..69d5b49c6d 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -39,7 +39,7 @@ pub enum BlockLease<'a> { PageReadGuard(PageReadGuard<'static>), EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] - Rc(std::rc::Rc<[u8; PAGE_SZ]>), + Arc(std::sync::Arc<[u8; PAGE_SZ]>), } impl From> for BlockLease<'static> { @@ -49,9 +49,9 @@ impl From> for BlockLease<'static> { } #[cfg(test)] -impl<'a> From> for BlockLease<'a> { - fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self { - BlockLease::Rc(value) +impl<'a> From> for BlockLease<'a> { + fn from(value: std::sync::Arc<[u8; PAGE_SZ]>) -> Self { + BlockLease::Arc(value) } } @@ -63,7 +63,7 @@ impl<'a> Deref for BlockLease<'a> { BlockLease::PageReadGuard(v) => v.deref(), BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] - BlockLease::Rc(v) => v.deref(), + BlockLease::Arc(v) => v.deref(), } } } @@ -83,13 +83,13 @@ pub(crate) enum BlockReaderRef<'a> { impl<'a> BlockReaderRef<'a> { #[inline(always)] - fn read_blk(&self, blknum: u32) -> Result { + async fn read_blk(&self, blknum: u32) -> Result { use BlockReaderRef::*; match self { - FileBlockReaderVirtual(r) => r.read_blk(blknum), - FileBlockReaderFile(r) => r.read_blk(blknum), - EphemeralFile(r) => r.read_blk(blknum), - Adapter(r) => r.read_blk(blknum), + FileBlockReaderVirtual(r) => r.read_blk(blknum).await, + FileBlockReaderFile(r) => r.read_blk(blknum).await, + EphemeralFile(r) => r.read_blk(blknum).await, + Adapter(r) => r.read_blk(blknum).await, #[cfg(test)] TestDisk(r) => r.read_blk(blknum), } @@ -134,8 +134,8 @@ impl<'a> BlockCursor<'a> { /// access to the contents of the page. (For the page cache, the /// lease object represents a lock on the buffer.) #[inline(always)] - pub fn read_blk(&self, blknum: u32) -> Result { - self.reader.read_blk(blknum) + pub async fn read_blk(&self, blknum: u32) -> Result { + self.reader.read_blk(blknum).await } } @@ -170,11 +170,12 @@ where /// Returns a "lease" object that can be used to /// access to the contents of the page. (For the page cache, the /// lease object represents a lock on the buffer.) - pub fn read_blk(&self, blknum: u32) -> Result { + pub async fn read_blk(&self, blknum: u32) -> Result { let cache = page_cache::get(); loop { match cache .read_immutable_buf(self.file_id, blknum) + .await .map_err(|e| { std::io::Error::new( std::io::ErrorKind::Other, diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 02da01063c..44d6b4f87e 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -262,7 +262,7 @@ where let block_cursor = self.reader.block_cursor(); while let Some((node_blknum, opt_iter)) = stack.pop() { // Locate the node. - let node_buf = block_cursor.read_blk(self.start_blk + node_blknum)?; + let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?; let node = OnDiskNode::deparse(node_buf.as_ref())?; let prefix_len = node.prefix_len as usize; @@ -357,7 +357,7 @@ where let block_cursor = self.reader.block_cursor(); while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() { - let blk = block_cursor.read_blk(self.start_blk + blknum)?; + let blk = block_cursor.read_blk(self.start_blk + blknum).await?; let buf: &[u8] = blk.as_ref(); let node = OnDiskNode::::deparse(buf)?; @@ -704,7 +704,7 @@ pub(crate) mod tests { pub(crate) fn read_blk(&self, blknum: u32) -> io::Result { let mut buf = [0u8; PAGE_SZ]; buf.copy_from_slice(&self.blocks[blknum as usize]); - Ok(std::rc::Rc::new(buf).into()) + Ok(std::sync::Arc::new(buf).into()) } } impl BlockReader for TestDisk { diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index ce8fd9ca3e..31db3869d9 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -61,13 +61,14 @@ impl EphemeralFile { self.len } - pub(crate) fn read_blk(&self, blknum: u32) -> Result { + pub(crate) async fn read_blk(&self, blknum: u32) -> Result { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { let cache = page_cache::get(); loop { match cache .read_immutable_buf(self.page_cache_file_id, blknum) + .await .map_err(|e| { std::io::Error::new( std::io::ErrorKind::Other, @@ -135,10 +136,13 @@ impl EphemeralFile { // Pre-warm the page cache with what we just wrote. // This isn't necessary for coherency/correctness, but it's how we've always done it. let cache = page_cache::get(); - match cache.read_immutable_buf( - self.ephemeral_file.page_cache_file_id, - self.blknum, - ) { + match cache + .read_immutable_buf( + self.ephemeral_file.page_cache_file_id, + self.blknum, + ) + .await + { Ok(page_cache::ReadBufResult::Found(_guard)) => { // This function takes &mut self, so, it shouldn't be possible to reach this point. unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 7a482706e5..d9df346a14 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -467,7 +467,7 @@ impl DeltaLayer { PathOrConf::Path(_) => None, }; - let loaded = DeltaLayerInner::load(&path, summary)?; + let loaded = DeltaLayerInner::load(&path, summary).await?; if let PathOrConf::Path(ref path) = self.path_or_conf { // not production code @@ -841,12 +841,15 @@ impl Drop for DeltaLayerWriter { } impl DeltaLayerInner { - pub(super) fn load(path: &std::path::Path, summary: Option) -> anyhow::Result { + pub(super) async fn load( + path: &std::path::Path, + summary: Option, + ) -> anyhow::Result { let file = VirtualFile::open(path) .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); - let summary_blk = file.read_blk(0)?; + let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; if let Some(mut expected_summary) = summary { @@ -1028,7 +1031,7 @@ impl<'a> ValueRef<'a> { pub(crate) struct Adapter(T); impl> Adapter { - pub(crate) fn read_blk(&self, blknum: u32) -> Result { - self.0.as_ref().file.read_blk(blknum) + pub(crate) async fn read_blk(&self, blknum: u32) -> Result { + self.0.as_ref().file.read_blk(blknum).await } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index cf4f40008a..b1fc257092 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -349,7 +349,8 @@ impl ImageLayer { PathOrConf::Path(_) => None, }; - let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary)?; + let loaded = + ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?; if let PathOrConf::Path(ref path) = self.path_or_conf { // not production code @@ -432,7 +433,7 @@ impl ImageLayer { } impl ImageLayerInner { - pub(super) fn load( + pub(super) async fn load( path: &std::path::Path, lsn: Lsn, summary: Option, @@ -440,7 +441,7 @@ impl ImageLayerInner { let file = VirtualFile::open(path) .with_context(|| format!("Failed to open file '{}'", path.display()))?; let file = FileBlockReader::new(file); - let summary_blk = file.read_blk(0)?; + let summary_blk = file.read_blk(0).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; if let Some(mut expected_summary) = summary { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ddf6f0bc0a..dfeef059cd 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -465,7 +465,7 @@ impl Timeline { // The cached image can be returned directly if there is no WAL between the cached image // and requested LSN. The cached image can also be used to reduce the amount of WAL needed // for redo. - let cached_page_img = match self.lookup_cached_page(&key, lsn) { + let cached_page_img = match self.lookup_cached_page(&key, lsn).await { Some((cached_lsn, cached_img)) => { match cached_lsn.cmp(&lsn) { Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check @@ -494,6 +494,7 @@ impl Timeline { RECONSTRUCT_TIME .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) + .await } /// Get last or prev record separately. Same as get_last_record_rlsn().last/prev. @@ -2443,13 +2444,14 @@ impl Timeline { } } - fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> { + async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> { let cache = page_cache::get(); // FIXME: It's pointless to check the cache for things that are not 8kB pages. // We should look at the key to determine if it's a cacheable object - let (lsn, read_guard) = - cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?; + let (lsn, read_guard) = cache + .lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn) + .await?; let img = Bytes::from(read_guard.to_vec()); Some((lsn, img)) } @@ -4131,7 +4133,7 @@ impl Timeline { /// /// Reconstruct a value, using the given base image and WAL records in 'data'. /// - fn reconstruct_value( + async fn reconstruct_value( &self, key: Key, request_lsn: Lsn, @@ -4200,6 +4202,7 @@ impl Timeline { last_rec_lsn, &img, ) + .await .context("Materialized page memoization failed") { return Err(PageReconstructError::from(e));