mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 09:30:37 +00:00
Explicitly error on cache issues during I/O (#2303)
This commit is contained in:
@@ -157,7 +157,14 @@ where
|
||||
// Look up the right page
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache.read_immutable_buf(self.file_id, blknum) {
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum)
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => break Ok(guard),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
|
||||
@@ -12,7 +12,7 @@ use once_cell::sync::Lazy;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::ops::DerefMut;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -51,7 +51,7 @@ impl EphemeralFile {
|
||||
conf: &PageServerConf,
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
) -> Result<EphemeralFile, std::io::Error> {
|
||||
) -> Result<EphemeralFile, io::Error> {
|
||||
let mut l = EPHEMERAL_FILES.write().unwrap();
|
||||
let file_id = l.next_file_id;
|
||||
l.next_file_id += 1;
|
||||
@@ -76,7 +76,7 @@ impl EphemeralFile {
|
||||
})
|
||||
}
|
||||
|
||||
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), Error> {
|
||||
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> {
|
||||
let mut off = 0;
|
||||
while off < PAGE_SZ {
|
||||
let n = self
|
||||
@@ -96,10 +96,13 @@ impl EphemeralFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_buf_for_write(&self, blkno: u32) -> Result<page_cache::PageWriteGuard, Error> {
|
||||
fn get_buf_for_write(&self, blkno: u32) -> Result<page_cache::PageWriteGuard, io::Error> {
|
||||
// Look up the right page
|
||||
let cache = page_cache::get();
|
||||
let mut write_guard = match cache.write_ephemeral_buf(self.file_id, blkno) {
|
||||
let mut write_guard = match cache
|
||||
.write_ephemeral_buf(self.file_id, blkno)
|
||||
.map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))?
|
||||
{
|
||||
WriteBufResult::Found(guard) => guard,
|
||||
WriteBufResult::NotFound(mut guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
@@ -127,7 +130,7 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
|
||||
}
|
||||
|
||||
impl FileExt for EphemeralFile {
|
||||
fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result<usize, io::Error> {
|
||||
// Look up the right page
|
||||
let blkno = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = offset as usize % PAGE_SZ;
|
||||
@@ -137,7 +140,10 @@ impl FileExt for EphemeralFile {
|
||||
let mut write_guard;
|
||||
|
||||
let cache = page_cache::get();
|
||||
let buf = match cache.read_ephemeral_buf(self.file_id, blkno) {
|
||||
let buf = match cache
|
||||
.read_ephemeral_buf(self.file_id, blkno)
|
||||
.map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))?
|
||||
{
|
||||
ReadBufResult::Found(guard) => {
|
||||
read_guard = guard;
|
||||
read_guard.as_ref()
|
||||
@@ -158,7 +164,7 @@ impl FileExt for EphemeralFile {
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result<usize, io::Error> {
|
||||
// Look up the right page
|
||||
let blkno = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = offset as usize % PAGE_SZ;
|
||||
@@ -166,7 +172,10 @@ impl FileExt for EphemeralFile {
|
||||
|
||||
let mut write_guard;
|
||||
let cache = page_cache::get();
|
||||
let buf = match cache.write_ephemeral_buf(self.file_id, blkno) {
|
||||
let buf = match cache
|
||||
.write_ephemeral_buf(self.file_id, blkno)
|
||||
.map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))?
|
||||
{
|
||||
WriteBufResult::Found(guard) => {
|
||||
write_guard = guard;
|
||||
write_guard.deref_mut()
|
||||
@@ -190,7 +199,7 @@ impl FileExt for EphemeralFile {
|
||||
}
|
||||
|
||||
impl BlobWriter for EphemeralFile {
|
||||
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, Error> {
|
||||
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
|
||||
let pos = self.size;
|
||||
|
||||
let mut blknum = (self.size / PAGE_SZ as u64) as u32;
|
||||
@@ -268,11 +277,11 @@ impl Drop for EphemeralFile {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Error> {
|
||||
pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) {
|
||||
match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(std::io::Error::new(
|
||||
Err(e) => Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
format!(
|
||||
"failed to write back to ephemeral file at {} error: {}",
|
||||
@@ -282,7 +291,7 @@ pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Er
|
||||
)),
|
||||
}
|
||||
} else {
|
||||
Err(std::io::Error::new(
|
||||
Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"could not write back page, not found in ephemeral files hash",
|
||||
))
|
||||
@@ -292,11 +301,14 @@ pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), std::io::Er
|
||||
impl BlockReader for EphemeralFile {
|
||||
type BlockLease = page_cache::PageReadGuard<'static>;
|
||||
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, std::io::Error> {
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, io::Error> {
|
||||
// Look up the right page
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache.read_ephemeral_buf(self.file_id, blknum) {
|
||||
match cache
|
||||
.read_ephemeral_buf(self.file_id, blknum)
|
||||
.map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))?
|
||||
{
|
||||
ReadBufResult::Found(guard) => return Ok(guard),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
@@ -311,6 +323,10 @@ impl BlockReader for EphemeralFile {
|
||||
}
|
||||
}
|
||||
|
||||
fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
|
||||
io::Error::new(ErrorKind::Other, format!("{context}: {e:#}"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -322,7 +338,7 @@ mod tests {
|
||||
|
||||
fn repo_harness(
|
||||
test_name: &str,
|
||||
) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), Error> {
|
||||
) -> Result<(&'static PageServerConf, ZTenantId, ZTimelineId), io::Error> {
|
||||
let repo_dir = PageServerConf::test_repo_dir(test_name);
|
||||
let _ = fs::remove_dir_all(&repo_dir);
|
||||
let conf = PageServerConf::dummy_conf(repo_dir);
|
||||
@@ -339,7 +355,7 @@ mod tests {
|
||||
|
||||
// Helper function to slurp contents of a file, starting at the current position,
|
||||
// into a string
|
||||
fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result<String, Error> {
|
||||
fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result<String, io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
buf.resize(len, 0u8);
|
||||
|
||||
@@ -351,7 +367,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ephemeral_files() -> Result<(), Error> {
|
||||
fn test_ephemeral_files() -> Result<(), io::Error> {
|
||||
let (conf, tenantid, timelineid) = repo_harness("ephemeral_files")?;
|
||||
|
||||
let file_a = EphemeralFile::create(conf, tenantid, timelineid)?;
|
||||
@@ -382,7 +398,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ephemeral_blobs() -> Result<(), Error> {
|
||||
fn test_ephemeral_blobs() -> Result<(), io::Error> {
|
||||
let (conf, tenantid, timelineid) = repo_harness("ephemeral_blobs")?;
|
||||
|
||||
let mut file = EphemeralFile::create(conf, tenantid, timelineid)?;
|
||||
|
||||
@@ -2117,7 +2117,7 @@ impl LayeredTimeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
) -> Result<Bytes> {
|
||||
) -> anyhow::Result<Bytes> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
|
||||
@@ -2167,13 +2167,15 @@ impl LayeredTimeline {
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
let cache = page_cache::get();
|
||||
cache.memorize_materialized_page(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
);
|
||||
cache
|
||||
.memorize_materialized_page(
|
||||
self.tenant_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.context("Materialized page memoization failed")?;
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
|
||||
@@ -45,6 +45,7 @@ use std::{
|
||||
},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tracing::error;
|
||||
use utils::{
|
||||
@@ -342,7 +343,7 @@ impl PageCache {
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
) {
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_id,
|
||||
@@ -352,7 +353,7 @@ impl PageCache {
|
||||
lsn,
|
||||
};
|
||||
|
||||
match self.lock_for_write(&cache_key) {
|
||||
match self.lock_for_write(&cache_key)? {
|
||||
WriteBufResult::Found(write_guard) => {
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
@@ -364,17 +365,19 @@ impl PageCache {
|
||||
write_guard.mark_valid();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with Ephemeral pages.
|
||||
|
||||
pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult {
|
||||
pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::EphemeralPage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key)
|
||||
}
|
||||
|
||||
pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> WriteBufResult {
|
||||
pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<WriteBufResult> {
|
||||
let cache_key = CacheKey::EphemeralPage { file_id, blkno };
|
||||
|
||||
self.lock_for_write(&cache_key)
|
||||
@@ -402,7 +405,7 @@ impl PageCache {
|
||||
|
||||
// Section 1.3: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> ReadBufResult {
|
||||
pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key)
|
||||
@@ -495,15 +498,16 @@ impl PageCache {
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> ReadBufResult {
|
||||
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
||||
return ReadBufResult::Found(read_guard);
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self.find_victim();
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
@@ -526,10 +530,10 @@ impl PageCache {
|
||||
inner.dirty = false;
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return ReadBufResult::NotFound(PageWriteGuard {
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -556,15 +560,16 @@ impl PageCache {
|
||||
///
|
||||
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
||||
/// may be modified by the caller even if it's already found in the cache.
|
||||
fn lock_for_write(&self, cache_key: &CacheKey) -> WriteBufResult {
|
||||
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
|
||||
return WriteBufResult::Found(write_guard);
|
||||
return Ok(WriteBufResult::Found(write_guard));
|
||||
}
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self.find_victim();
|
||||
let (slot_idx, mut inner) =
|
||||
self.find_victim().context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
@@ -587,10 +592,10 @@ impl PageCache {
|
||||
inner.dirty = false;
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return WriteBufResult::NotFound(PageWriteGuard {
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
inner,
|
||||
valid: false,
|
||||
});
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -754,7 +759,7 @@ impl PageCache {
|
||||
/// Find a slot to evict.
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
fn find_victim(&self) -> (usize, RwLockWriteGuard<SlotInner>) {
|
||||
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
@@ -767,7 +772,7 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(TryLockError::Poisoned(err)) => {
|
||||
panic!("buffer lock was poisoned: {:?}", err)
|
||||
anyhow::bail!("buffer lock was poisoned: {err:?}")
|
||||
}
|
||||
Err(TryLockError::WouldBlock) => {
|
||||
// If we have looped through the whole buffer pool 10 times
|
||||
@@ -777,7 +782,7 @@ impl PageCache {
|
||||
// there are buffers in the pool. In practice, with a reasonably
|
||||
// large buffer pool it really shouldn't happen.
|
||||
if iters > iter_limit {
|
||||
panic!("could not find a victim buffer to evict");
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -804,7 +809,7 @@ impl PageCache {
|
||||
inner.dirty = false;
|
||||
inner.key = None;
|
||||
}
|
||||
return (slot_idx, inner);
|
||||
return Ok((slot_idx, inner));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user