diff --git a/pageserver/src/layered_repository/block_io.rs b/pageserver/src/layered_repository/block_io.rs index bc3bc082a0..5e32b8833a 100644 --- a/pageserver/src/layered_repository/block_io.rs +++ b/pageserver/src/layered_repository/block_io.rs @@ -157,7 +157,14 @@ where // Look up the right page let cache = page_cache::get(); loop { - match cache.read_immutable_buf(self.file_id, blknum) { + match cache + .read_immutable_buf(self.file_id, blknum) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { ReadBufResult::Found(guard) => break Ok(guard), ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer diff --git a/pageserver/src/layered_repository/ephemeral_file.rs b/pageserver/src/layered_repository/ephemeral_file.rs index 1776946e7a..a1b2d68cd5 100644 --- a/pageserver/src/layered_repository/ephemeral_file.rs +++ b/pageserver/src/layered_repository/ephemeral_file.rs @@ -12,7 +12,7 @@ use once_cell::sync::Lazy; use std::cmp::min; use std::collections::HashMap; use std::fs::OpenOptions; -use std::io::{Error, ErrorKind}; +use std::io::{self, ErrorKind}; use std::ops::DerefMut; use std::path::PathBuf; use std::sync::{Arc, RwLock}; @@ -51,7 +51,7 @@ impl EphemeralFile { conf: &PageServerConf, tenantid: ZTenantId, timelineid: ZTimelineId, - ) -> Result { + ) -> Result { let mut l = EPHEMERAL_FILES.write().unwrap(); let file_id = l.next_file_id; l.next_file_id += 1; @@ -76,7 +76,7 @@ impl EphemeralFile { }) } - fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> { + fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> { let mut off = 0; while off < PAGE_SZ { let n = self @@ -96,10 +96,13 @@ impl EphemeralFile { Ok(()) } - fn get_buf_for_write(&self, blkno: u32) -> Result { + fn get_buf_for_write(&self, blkno: u32) -> Result { // Look up the right page let cache = page_cache::get(); - let mut write_guard = match cache.write_ephemeral_buf(self.file_id, blkno) { + let mut write_guard = match cache + .write_ephemeral_buf(self.file_id, blkno) + .map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))? + { WriteBufResult::Found(guard) => guard, WriteBufResult::NotFound(mut guard) => { // Read the page from disk into the buffer @@ -127,7 +130,7 @@ pub fn is_ephemeral_file(filename: &str) -> bool { } impl FileExt for EphemeralFile { - fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result { + fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result { // Look up the right page let blkno = (offset / PAGE_SZ as u64) as u32; let off = offset as usize % PAGE_SZ; @@ -137,7 +140,10 @@ impl FileExt for EphemeralFile { let mut write_guard; let cache = page_cache::get(); - let buf = match cache.read_ephemeral_buf(self.file_id, blkno) { + let buf = match cache + .read_ephemeral_buf(self.file_id, blkno) + .map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))? + { ReadBufResult::Found(guard) => { read_guard = guard; read_guard.as_ref() @@ -158,7 +164,7 @@ impl FileExt for EphemeralFile { Ok(len) } - fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result { + fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result { // Look up the right page let blkno = (offset / PAGE_SZ as u64) as u32; let off = offset as usize % PAGE_SZ; @@ -166,7 +172,10 @@ impl FileExt for EphemeralFile { let mut write_guard; let cache = page_cache::get(); - let buf = match cache.write_ephemeral_buf(self.file_id, blkno) { + let buf = match cache + .write_ephemeral_buf(self.file_id, blkno) + .map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))? + { WriteBufResult::Found(guard) => { write_guard = guard; write_guard.deref_mut() @@ -190,7 +199,7 @@ impl FileExt for EphemeralFile { } impl BlobWriter for EphemeralFile { - fn write_blob(&mut self, srcbuf: &[u8]) -> Result { + fn write_blob(&mut self, srcbuf: &[u8]) -> Result { let pos = self.size; let mut blknum = (self.size / PAGE_SZ as u64) as u32; @@ -268,11 +277,11 @@ impl Drop for EphemeralFile { } } -pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Error> { +pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), io::Error> { if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) { match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) { Ok(_) => Ok(()), - Err(e) => Err(std::io::Error::new( + Err(e) => Err(io::Error::new( ErrorKind::Other, format!( "failed to write back to ephemeral file at {} error: {}", @@ -282,7 +291,7 @@ pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Er )), } } else { - Err(std::io::Error::new( + Err(io::Error::new( ErrorKind::Other, "could not write back page, not found in ephemeral files hash", )) @@ -292,11 +301,14 @@ pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Er impl BlockReader for EphemeralFile { type BlockLease = page_cache::PageReadGuard<'static>; - fn read_blk(&self, blknum: u32) -> Result { + fn read_blk(&self, blknum: u32) -> Result { // Look up the right page let cache = page_cache::get(); loop { - match cache.read_ephemeral_buf(self.file_id, blknum) { + match cache + .read_ephemeral_buf(self.file_id, blknum) + .map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))? + { ReadBufResult::Found(guard) => return Ok(guard), ReadBufResult::NotFound(mut write_guard) => { // Read the page from disk into the buffer @@ -311,6 +323,10 @@ impl BlockReader for EphemeralFile { } } +fn to_io_error(e: anyhow::Error, context: &str) -> io::Error { + io::Error::new(ErrorKind::Other, format!("{context}: {e:#}")) +} + #[cfg(test)] mod tests { use super::*; @@ -322,7 +338,7 @@ mod tests { fn repo_harness( test_name: &str, - ) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), Error> { + ) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), io::Error> { let repo_dir = PageServerConf::test_repo_dir(test_name); let _ = fs::remove_dir_all(&repo_dir); let conf = PageServerConf::dummy_conf(repo_dir); @@ -339,7 +355,7 @@ mod tests { // Helper function to slurp contents of a file, starting at the current position, // into a string - fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result { + fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result { let mut buf = Vec::new(); buf.resize(len, 0u8); @@ -351,7 +367,7 @@ mod tests { } #[test] - fn test_ephemeral_files() -> Result<(), Error> { + fn test_ephemeral_files() -> Result<(), io::Error> { let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?; let file_a = EphemeralFile::create(conf, tenantid, timelineid)?; @@ -382,7 +398,7 @@ mod tests { } #[test] - fn test_ephemeral_blobs() -> Result<(), Error> { + fn test_ephemeral_blobs() -> Result<(), io::Error> { let (conf, tenantid, timelineid) = repo_harness("ephemeral_blobs")?; let mut file = EphemeralFile::create(conf, tenantid, timelineid)?; diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 6ef4915bdb..910fc9e9fc 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -2117,7 +2117,7 @@ impl LayeredTimeline { key: Key, request_lsn: Lsn, mut data: ValueReconstructState, - ) -> Result { + ) -> anyhow::Result { // Perform WAL redo if needed data.records.reverse(); @@ -2167,13 +2167,15 @@ impl LayeredTimeline { if img.len() == page_cache::PAGE_SZ { let cache = page_cache::get(); - cache.memorize_materialized_page( - self.tenant_id, - self.timeline_id, - key, - last_rec_lsn, - &img, - ); + cache + .memorize_materialized_page( + self.tenant_id, + self.timeline_id, + key, + last_rec_lsn, + &img, + ) + .context("Materialized page memoization failed")?; } Ok(img) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 818eaf1b8f..27b1400243 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -45,6 +45,7 @@ use std::{ }, }; +use anyhow::Context; use once_cell::sync::OnceCell; use tracing::error; use utils::{ @@ -342,7 +343,7 @@ impl PageCache { key: Key, lsn: Lsn, img: &[u8], - ) { + ) -> anyhow::Result<()> { let cache_key = CacheKey::MaterializedPage { hash_key: MaterializedPageHashKey { tenant_id, @@ -352,7 +353,7 @@ impl PageCache { lsn, }; - match self.lock_for_write(&cache_key) { + match self.lock_for_write(&cache_key)? { WriteBufResult::Found(write_guard) => { // We already had it in cache. Another thread must've put it there // concurrently. Check that it had the same contents that we @@ -364,17 +365,19 @@ impl PageCache { write_guard.mark_valid(); } } + + Ok(()) } // Section 1.2: Public interface functions for working with Ephemeral pages. - pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult { + pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result { let mut cache_key = CacheKey::EphemeralPage { file_id, blkno }; self.lock_for_read(&mut cache_key) } - pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> WriteBufResult { + pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result { let cache_key = CacheKey::EphemeralPage { file_id, blkno }; self.lock_for_write(&cache_key) @@ -402,7 +405,7 @@ impl PageCache { // Section 1.3: Public interface functions for working with immutable file pages. - pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult { + pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result { let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; self.lock_for_read(&mut cache_key) @@ -495,15 +498,16 @@ impl PageCache { /// } /// ``` /// - fn lock_for_read(&self, cache_key: &mut CacheKey) -> ReadBufResult { + fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result { loop { // First check if the key already exists in the cache. if let Some(read_guard) = self.try_lock_for_read(cache_key) { - return ReadBufResult::Found(read_guard); + return Ok(ReadBufResult::Found(read_guard)); } // Not found. Find a victim buffer - let (slot_idx, mut inner) = self.find_victim(); + let (slot_idx, mut inner) = + self.find_victim().context("Failed to find evict victim")?; // Insert mapping for this. At this point, we may find that another // thread did the same thing concurrently. In that case, we evicted @@ -526,10 +530,10 @@ impl PageCache { inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); - return ReadBufResult::NotFound(PageWriteGuard { + return Ok(ReadBufResult::NotFound(PageWriteGuard { inner, valid: false, - }); + })); } } @@ -556,15 +560,16 @@ impl PageCache { /// /// Similar to lock_for_read(), but the returned buffer is write-locked and /// may be modified by the caller even if it's already found in the cache. - fn lock_for_write(&self, cache_key: &CacheKey) -> WriteBufResult { + fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { loop { // First check if the key already exists in the cache. if let Some(write_guard) = self.try_lock_for_write(cache_key) { - return WriteBufResult::Found(write_guard); + return Ok(WriteBufResult::Found(write_guard)); } // Not found. Find a victim buffer - let (slot_idx, mut inner) = self.find_victim(); + let (slot_idx, mut inner) = + self.find_victim().context("Failed to find evict victim")?; // Insert mapping for this. At this point, we may find that another // thread did the same thing concurrently. In that case, we evicted @@ -587,10 +592,10 @@ impl PageCache { inner.dirty = false; slot.usage_count.store(1, Ordering::Relaxed); - return WriteBufResult::NotFound(PageWriteGuard { + return Ok(WriteBufResult::NotFound(PageWriteGuard { inner, valid: false, - }); + })); } } @@ -754,7 +759,7 @@ impl PageCache { /// Find a slot to evict. /// /// On return, the slot is empty and write-locked. - fn find_victim(&self) -> (usize, RwLockWriteGuard) { + fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard)> { let iter_limit = self.slots.len() * 10; let mut iters = 0; loop { @@ -767,7 +772,7 @@ impl PageCache { let mut inner = match slot.inner.try_write() { Ok(inner) => inner, Err(TryLockError::Poisoned(err)) => { - panic!("buffer lock was poisoned: {:?}", err) + anyhow::bail!("buffer lock was poisoned: {err:?}") } Err(TryLockError::WouldBlock) => { // If we have looped through the whole buffer pool 10 times @@ -777,7 +782,7 @@ impl PageCache { // there are buffers in the pool. In practice, with a reasonably // large buffer pool it really shouldn't happen. if iters > iter_limit { - panic!("could not find a victim buffer to evict"); + anyhow::bail!("exceeded evict iter limit"); } continue; } @@ -804,7 +809,7 @@ impl PageCache { inner.dirty = false; inner.key = None; } - return (slot_idx, inner); + return Ok((slot_idx, inner)); } } }