mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 10:22:56 +00:00
Compare commits
6 Commits
release-pr
...
problame/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2e9cdd3130 | ||
|
|
74212ed96e | ||
|
|
4afc4c03a4 | ||
|
|
a0b8c2f00c | ||
|
|
69e8f80472 | ||
|
|
e800a7ce92 |
@@ -47,13 +47,11 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tracing::error;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
};
|
||||
|
||||
use crate::tenant::writeback_ephemeral_file;
|
||||
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
|
||||
|
||||
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
||||
@@ -97,10 +95,6 @@ enum CacheKey {
|
||||
hash_key: MaterializedPageHashKey,
|
||||
lsn: Lsn,
|
||||
},
|
||||
EphemeralPage {
|
||||
file_id: u64,
|
||||
blkno: u32,
|
||||
},
|
||||
ImmutableFilePage {
|
||||
file_id: u64,
|
||||
blkno: u32,
|
||||
@@ -128,7 +122,6 @@ struct Slot {
|
||||
struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
dirty: bool,
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
@@ -177,8 +170,6 @@ pub struct PageCache {
|
||||
/// can have a separate mapping map, next to this field.
|
||||
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
|
||||
ephemeral_page_map: RwLock<HashMap<(u64, u32), usize>>,
|
||||
|
||||
immutable_page_map: RwLock<HashMap<(u64, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
@@ -258,14 +249,6 @@ impl PageWriteGuard<'_> {
|
||||
);
|
||||
self.valid = true;
|
||||
}
|
||||
pub fn mark_dirty(&mut self) {
|
||||
// only ephemeral pages can be dirty ATM.
|
||||
assert!(matches!(
|
||||
self.inner.key,
|
||||
Some(CacheKey::EphemeralPage { .. })
|
||||
));
|
||||
self.inner.dirty = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PageWriteGuard<'_> {
|
||||
@@ -280,7 +263,6 @@ impl Drop for PageWriteGuard<'_> {
|
||||
let self_key = self.inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
self.inner.key = None;
|
||||
self.inner.dirty = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -388,41 +370,7 @@ impl PageCache {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with Ephemeral pages.
|
||||
|
||||
pub fn read_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::EphemeralPage { file_id, blkno };
|
||||
|
||||
self.lock_for_read(&mut cache_key)
|
||||
}
|
||||
|
||||
pub fn write_ephemeral_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<WriteBufResult> {
|
||||
let cache_key = CacheKey::EphemeralPage { file_id, blkno };
|
||||
|
||||
self.lock_for_write(&cache_key)
|
||||
}
|
||||
|
||||
/// Immediately drop all buffers belonging to given file, without writeback
|
||||
pub fn drop_buffers_for_ephemeral(&self, drop_file_id: u64) {
|
||||
for slot_idx in 0..self.slots.len() {
|
||||
let slot = &self.slots[slot_idx];
|
||||
|
||||
let mut inner = slot.inner.write().unwrap();
|
||||
if let Some(key) = &inner.key {
|
||||
match key {
|
||||
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(key);
|
||||
inner.key = None;
|
||||
inner.dirty = false;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Section 1.3: Public interface functions for working with immutable file pages.
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub fn read_immutable_buf(&self, file_id: u64, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
||||
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
||||
@@ -444,7 +392,6 @@ impl PageCache {
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(key);
|
||||
inner.key = None;
|
||||
inner.dirty = false;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -522,10 +469,6 @@ impl PageCache {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::EphemeralPage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE.read_accesses_ephemeral,
|
||||
&crate::metrics::PAGE_CACHE.read_hits_ephemeral,
|
||||
),
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
|
||||
&crate::metrics::PAGE_CACHE.read_hits_immutable,
|
||||
@@ -566,7 +509,6 @@ impl PageCache {
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
inner.dirty = false;
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
@@ -628,7 +570,6 @@ impl PageCache {
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
inner.dirty = false;
|
||||
slot.usage_count.store(1, Ordering::Relaxed);
|
||||
|
||||
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
||||
@@ -667,10 +608,6 @@ impl PageCache {
|
||||
*lsn = version.lsn;
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
@@ -694,10 +631,6 @@ impl PageCache {
|
||||
None
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let map = self.ephemeral_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
@@ -731,12 +664,6 @@ impl PageCache {
|
||||
panic!("could not find old key in mapping")
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
.expect("could not find old key in mapping");
|
||||
self.size_metrics.current_bytes_ephemeral.sub_page_sz(1);
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
@@ -776,17 +703,7 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
}
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
let mut map = self.ephemeral_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
Entry::Occupied(entry) => Some(*entry.get()),
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(slot_idx);
|
||||
self.size_metrics.current_bytes_ephemeral.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
@@ -837,25 +754,8 @@ impl PageCache {
|
||||
}
|
||||
};
|
||||
if let Some(old_key) = &inner.key {
|
||||
if inner.dirty {
|
||||
if let Err(err) = Self::writeback(old_key, inner.buf) {
|
||||
// Writing the page to disk failed.
|
||||
//
|
||||
// FIXME: What to do here, when? We could propagate the error to the
|
||||
// caller, but victim buffer is generally unrelated to the original
|
||||
// call. It can even belong to a different tenant. Currently, we
|
||||
// report the error to the log and continue the clock sweep to find
|
||||
// a different victim. But if the problem persists, the page cache
|
||||
// could fill up with dirty pages that we cannot evict, and we will
|
||||
// loop retrying the writebacks indefinitely.
|
||||
error!("writeback of buffer {:?} failed: {}", old_key, err);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// remove mapping for old buffer
|
||||
self.remove_mapping(old_key);
|
||||
inner.dirty = false;
|
||||
inner.key = None;
|
||||
}
|
||||
return Ok((slot_idx, inner));
|
||||
@@ -863,28 +763,6 @@ impl PageCache {
|
||||
}
|
||||
}
|
||||
|
||||
fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: _,
|
||||
} => Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"unexpected dirty materialized page",
|
||||
)),
|
||||
CacheKey::EphemeralPage { file_id, blkno } => {
|
||||
writeback_ephemeral_file(*file_id, *blkno, buf)
|
||||
}
|
||||
CacheKey::ImmutableFilePage {
|
||||
file_id: _,
|
||||
blkno: _,
|
||||
} => Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"unexpected dirty immutable page",
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Initialize a new page cache
|
||||
///
|
||||
/// This should be called only once at page server startup.
|
||||
@@ -895,7 +773,6 @@ impl PageCache {
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_ephemeral.set_page_sz(0);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
@@ -905,11 +782,7 @@ impl PageCache {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf,
|
||||
dirty: false,
|
||||
}),
|
||||
inner: RwLock::new(SlotInner { key: None, buf }),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
})
|
||||
@@ -917,7 +790,6 @@ impl PageCache {
|
||||
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
ephemeral_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
|
||||
@@ -136,9 +136,6 @@ pub use timeline::{
|
||||
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
|
||||
};
|
||||
|
||||
// re-export this function so that page_cache.rs can use it.
|
||||
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
|
||||
|
||||
// re-export for use in remote_timeline_client.rs
|
||||
pub use crate::tenant::metadata::save_metadata;
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
//! used to keep in-memory layers spilled on disk.
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::page_cache::{self, ReadBufResult, WriteBufResult, PAGE_SZ};
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::BlockReader;
|
||||
use crate::virtual_file::VirtualFile;
|
||||
@@ -10,7 +10,7 @@ use once_cell::sync::Lazy;
|
||||
use std::cmp::min;
|
||||
use std::collections::HashMap;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, ErrorKind};
|
||||
use std::io::{self};
|
||||
use std::ops::DerefMut;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
@@ -19,6 +19,9 @@ use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use std::os::unix::fs::FileExt;
|
||||
|
||||
mod buffer_pool;
|
||||
mod dirty_buffer;
|
||||
|
||||
///
|
||||
/// This is the global cache of file descriptors (File objects).
|
||||
///
|
||||
@@ -94,27 +97,13 @@ impl EphemeralFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_buf_for_write(&self, blkno: u32) -> Result<page_cache::PageWriteGuard, io::Error> {
|
||||
// Look up the right page
|
||||
let cache = page_cache::get();
|
||||
let mut write_guard = match cache
|
||||
.write_ephemeral_buf(self.file_id, blkno)
|
||||
.map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))?
|
||||
{
|
||||
WriteBufResult::Found(guard) => guard,
|
||||
WriteBufResult::NotFound(mut guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
// TODO: if we're overwriting the whole page, no need to read it in first
|
||||
self.fill_buffer(guard.deref_mut(), blkno)?;
|
||||
guard.mark_valid();
|
||||
|
||||
// And then fall through to modify it.
|
||||
guard
|
||||
}
|
||||
};
|
||||
write_guard.mark_dirty();
|
||||
|
||||
Ok(write_guard)
|
||||
fn get_buf_for_write(&self, blkno: u32) -> Result<dirty_buffer::Buffer, io::Error> {
|
||||
let pool = buffer_pool::get();
|
||||
let mut buf = pool.get_buffer();
|
||||
// Read the page from disk into the buffer
|
||||
// TODO: if we're overwriting the whole page, no need to read it in first
|
||||
self.fill_buffer(buf.deref_mut(), blkno)?;
|
||||
Ok(dirty_buffer::Buffer::new(self, buf, blkno))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -127,75 +116,6 @@ pub fn is_ephemeral_file(filename: &str) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
impl FileExt for EphemeralFile {
|
||||
fn read_at(&self, dstbuf: &mut [u8], offset: u64) -> Result<usize, io::Error> {
|
||||
// Look up the right page
|
||||
let blkno = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = offset as usize % PAGE_SZ;
|
||||
let len = min(PAGE_SZ - off, dstbuf.len());
|
||||
|
||||
let read_guard;
|
||||
let mut write_guard;
|
||||
|
||||
let cache = page_cache::get();
|
||||
let buf = match cache
|
||||
.read_ephemeral_buf(self.file_id, blkno)
|
||||
.map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))?
|
||||
{
|
||||
ReadBufResult::Found(guard) => {
|
||||
read_guard = guard;
|
||||
read_guard.as_ref()
|
||||
}
|
||||
ReadBufResult::NotFound(guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
write_guard = guard;
|
||||
self.fill_buffer(write_guard.deref_mut(), blkno)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// And then fall through to read the requested slice from the
|
||||
// buffer.
|
||||
write_guard.as_ref()
|
||||
}
|
||||
};
|
||||
|
||||
dstbuf[0..len].copy_from_slice(&buf[off..(off + len)]);
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn write_at(&self, srcbuf: &[u8], offset: u64) -> Result<usize, io::Error> {
|
||||
// Look up the right page
|
||||
let blkno = (offset / PAGE_SZ as u64) as u32;
|
||||
let off = offset as usize % PAGE_SZ;
|
||||
let len = min(PAGE_SZ - off, srcbuf.len());
|
||||
|
||||
let mut write_guard;
|
||||
let cache = page_cache::get();
|
||||
let buf = match cache
|
||||
.write_ephemeral_buf(self.file_id, blkno)
|
||||
.map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))?
|
||||
{
|
||||
WriteBufResult::Found(guard) => {
|
||||
write_guard = guard;
|
||||
write_guard.deref_mut()
|
||||
}
|
||||
WriteBufResult::NotFound(guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
// TODO: if we're overwriting the whole page, no need to read it in first
|
||||
write_guard = guard;
|
||||
self.fill_buffer(write_guard.deref_mut(), blkno)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// And then fall through to modify it.
|
||||
write_guard.deref_mut()
|
||||
}
|
||||
};
|
||||
|
||||
buf[off..(off + len)].copy_from_slice(&srcbuf[0..len]);
|
||||
write_guard.mark_dirty();
|
||||
Ok(len)
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobWriter for EphemeralFile {
|
||||
fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
|
||||
let pos = self.size;
|
||||
@@ -217,6 +137,7 @@ impl BlobWriter for EphemeralFile {
|
||||
// it needs to be split across pages
|
||||
buf[off..(off + thislen)].copy_from_slice(&len_buf[..thislen]);
|
||||
blknum += 1;
|
||||
buf.writeback()?;
|
||||
buf = self.get_buf_for_write(blknum)?;
|
||||
buf[0..4 - thislen].copy_from_slice(&len_buf[thislen..]);
|
||||
off = 4 - thislen;
|
||||
@@ -232,6 +153,7 @@ impl BlobWriter for EphemeralFile {
|
||||
let mut page_remain = PAGE_SZ - off;
|
||||
if page_remain == 0 {
|
||||
blknum += 1;
|
||||
buf.writeback()?;
|
||||
buf = self.get_buf_for_write(blknum)?;
|
||||
off = 0;
|
||||
page_remain = PAGE_SZ;
|
||||
@@ -241,7 +163,8 @@ impl BlobWriter for EphemeralFile {
|
||||
off += this_blk_len;
|
||||
buf_remain = &buf_remain[this_blk_len..];
|
||||
}
|
||||
drop(buf);
|
||||
|
||||
buf.writeback()?;
|
||||
|
||||
if srcbuf.len() < 0x80 {
|
||||
self.size += 1;
|
||||
@@ -256,10 +179,6 @@ impl BlobWriter for EphemeralFile {
|
||||
|
||||
impl Drop for EphemeralFile {
|
||||
fn drop(&mut self) {
|
||||
// drop all pages from page cache
|
||||
let cache = page_cache::get();
|
||||
cache.drop_buffers_for_ephemeral(self.file_id);
|
||||
|
||||
// remove entry from the hash map
|
||||
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
|
||||
|
||||
@@ -281,62 +200,24 @@ impl Drop for EphemeralFile {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn writeback(file_id: u64, blkno: u32, buf: &[u8]) -> Result<(), io::Error> {
|
||||
if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) {
|
||||
match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
format!(
|
||||
"failed to write back to ephemeral file at {} error: {}",
|
||||
file.path.display(),
|
||||
e
|
||||
),
|
||||
)),
|
||||
}
|
||||
} else {
|
||||
Err(io::Error::new(
|
||||
ErrorKind::Other,
|
||||
"could not write back page, not found in ephemeral files hash",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl BlockReader for EphemeralFile {
|
||||
type BlockLease = page_cache::PageReadGuard<'static>;
|
||||
type BlockLease = buffer_pool::Handle;
|
||||
|
||||
fn read_blk(&self, blknum: u32) -> Result<Self::BlockLease, io::Error> {
|
||||
// Look up the right page
|
||||
let cache = page_cache::get();
|
||||
loop {
|
||||
match cache
|
||||
.read_ephemeral_buf(self.file_id, blknum)
|
||||
.map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))?
|
||||
{
|
||||
ReadBufResult::Found(guard) => return Ok(guard),
|
||||
ReadBufResult::NotFound(mut write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
self.fill_buffer(write_guard.deref_mut(), blknum)?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
// Read the page from disk into the buffer
|
||||
let pool = buffer_pool::get();
|
||||
let mut buf = pool.get_buffer();
|
||||
self.fill_buffer(buf.deref_mut(), blknum)?;
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
|
||||
io::Error::new(ErrorKind::Other, format!("{context}: {e:#}"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::tenant::blob_io::BlobWriter;
|
||||
use crate::tenant::block_io::BlockCursor;
|
||||
use rand::{seq::SliceRandom, thread_rng, RngCore};
|
||||
use rand::{thread_rng, RngCore};
|
||||
use std::fs;
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -357,50 +238,6 @@ mod tests {
|
||||
Ok((conf, tenant_id, timeline_id))
|
||||
}
|
||||
|
||||
// Helper function to slurp contents of a file, starting at the current position,
|
||||
// into a string
|
||||
fn read_string(efile: &EphemeralFile, offset: u64, len: usize) -> Result<String, io::Error> {
|
||||
let mut buf = Vec::new();
|
||||
buf.resize(len, 0u8);
|
||||
|
||||
efile.read_exact_at(&mut buf, offset)?;
|
||||
|
||||
Ok(String::from_utf8_lossy(&buf)
|
||||
.trim_end_matches('\0')
|
||||
.to_string())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ephemeral_files() -> Result<(), io::Error> {
|
||||
let (conf, tenant_id, timeline_id) = harness("ephemeral_files")?;
|
||||
|
||||
let file_a = EphemeralFile::create(conf, tenant_id, timeline_id)?;
|
||||
|
||||
file_a.write_all_at(b"foo", 0)?;
|
||||
assert_eq!("foo", read_string(&file_a, 0, 20)?);
|
||||
|
||||
file_a.write_all_at(b"bar", 3)?;
|
||||
assert_eq!("foobar", read_string(&file_a, 0, 20)?);
|
||||
|
||||
// Open a lot of files, enough to cause some page evictions.
|
||||
let mut efiles = Vec::new();
|
||||
for fileno in 0..100 {
|
||||
let efile = EphemeralFile::create(conf, tenant_id, timeline_id)?;
|
||||
efile.write_all_at(format!("file {}", fileno).as_bytes(), 0)?;
|
||||
assert_eq!(format!("file {}", fileno), read_string(&efile, 0, 10)?);
|
||||
efiles.push((fileno, efile));
|
||||
}
|
||||
|
||||
// Check that all the files can still be read from. Use them in random order for
|
||||
// good measure.
|
||||
efiles.as_mut_slice().shuffle(&mut thread_rng());
|
||||
for (fileno, efile) in efiles.iter_mut() {
|
||||
assert_eq!(format!("file {}", fileno), read_string(efile, 0, 10)?);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ephemeral_blobs() -> Result<(), io::Error> {
|
||||
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;
|
||||
|
||||
66
pageserver/src/tenant/ephemeral_file/buffer_pool.rs
Normal file
66
pageserver/src/tenant/ephemeral_file/buffer_pool.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
//! Buffer pool for ephemeral file buffers.
|
||||
//!
|
||||
//! Currently this is a very simple implementation that just uses `malloc`.
|
||||
//! But the interface is such that we can switch to a more sophisticated
|
||||
//! implementation later, e.g., one that caps that amount of memory used.
|
||||
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
|
||||
pub struct BufferPool;
|
||||
|
||||
const POOL: BufferPool = BufferPool;
|
||||
|
||||
pub(super) fn get() -> &'static BufferPool {
|
||||
&POOL
|
||||
}
|
||||
|
||||
impl BufferPool {
|
||||
/// Get a [`Handle`] to a buffer in the pool.
|
||||
///
|
||||
/// The buffer is guaranteed to be zeroed out.
|
||||
///
|
||||
/// The implementation may block to wait for buffers to become available,
|
||||
/// and a future async version of this method may `.await` internally to
|
||||
/// wait for buffers to become available.
|
||||
///
|
||||
/// To avoid deadlocks, a thread/task must get all the buffers it needs
|
||||
/// with a single call to `get_buffer`. Without this rule, a deadlock
|
||||
/// can happen. Take for example a buffer pool with 2 buffers X, Y
|
||||
/// and a program with two threads A and B, each requiring 2 buffers.
|
||||
/// If A gets X and B gets Y, then both threads will block forever trying
|
||||
/// to get their second buffer.
|
||||
pub fn get_buffer(&self) -> Handle {
|
||||
Handle {
|
||||
data: vec![0; PAGE_SZ],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Handle {
|
||||
data: Vec<u8>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Handle {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Handle")
|
||||
.field("data", &self.data.as_ptr())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for Handle {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
fn deref(&self) -> &Self::Target {
|
||||
let slice: &[u8] = &self.data[..];
|
||||
slice.try_into().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl DerefMut for Handle {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
let slice: &mut [u8] = &mut self.data[..];
|
||||
slice.try_into().unwrap()
|
||||
}
|
||||
}
|
||||
111
pageserver/src/tenant/ephemeral_file/dirty_buffer.rs
Normal file
111
pageserver/src/tenant/ephemeral_file/dirty_buffer.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
//! Newtypes to ensure that dirty buffers are written back to the filesystem before they are dropped.
|
||||
|
||||
use std::io::ErrorKind;
|
||||
use std::ops::Deref;
|
||||
use std::ops::DerefMut;
|
||||
use std::os::unix::prelude::FileExt;
|
||||
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
|
||||
use super::buffer_pool;
|
||||
use super::EphemeralFile;
|
||||
|
||||
pub(super) struct Buffer<'f> {
|
||||
inner: Inner<'f>,
|
||||
}
|
||||
|
||||
enum Inner<'f> {
|
||||
Dirty {
|
||||
ephemeral_file: &'f EphemeralFile,
|
||||
buf: buffer_pool::Handle,
|
||||
blkno: u32,
|
||||
},
|
||||
WritebackOngoing,
|
||||
WrittenBack,
|
||||
WritebackError,
|
||||
Dropped,
|
||||
}
|
||||
|
||||
impl<'f> Buffer<'f> {
|
||||
pub(super) fn new(
|
||||
ephemeral_file: &'f EphemeralFile,
|
||||
buf: buffer_pool::Handle,
|
||||
blkno: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Inner::Dirty {
|
||||
ephemeral_file,
|
||||
buf,
|
||||
blkno,
|
||||
},
|
||||
}
|
||||
}
|
||||
pub(super) fn writeback(mut self) -> Result<(), std::io::Error> {
|
||||
let Inner::Dirty {
|
||||
ephemeral_file,
|
||||
buf,
|
||||
blkno,
|
||||
} = std::mem::replace(&mut self.inner, Inner::WritebackOngoing) else {
|
||||
unreachable!("writeback consumes");
|
||||
};
|
||||
match ephemeral_file
|
||||
.file
|
||||
.write_all_at(buf.deref(), blkno as u64 * PAGE_SZ as u64)
|
||||
{
|
||||
Ok(_) => {
|
||||
self.inner = Inner::WrittenBack;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
self.inner = Inner::WritebackError;
|
||||
Err(std::io::Error::new(
|
||||
ErrorKind::Other,
|
||||
format!(
|
||||
"failed to write back to ephemeral file at {} error: {}",
|
||||
ephemeral_file.file.path.display(),
|
||||
e
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'f> Deref for Buffer<'f> {
|
||||
type Target = [u8];
|
||||
|
||||
fn deref(&self) -> &[u8] {
|
||||
match &self.inner {
|
||||
Inner::Dirty { buf, .. } => &**buf,
|
||||
Inner::WritebackOngoing => unreachable!("writeback consumes"),
|
||||
Inner::WrittenBack => unreachable!("writeback consumes"),
|
||||
Inner::WritebackError => unreachable!("writeback consumes"),
|
||||
Inner::Dropped => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'f> DerefMut for Buffer<'f> {
|
||||
fn deref_mut(&mut self) -> &mut [u8] {
|
||||
match &mut self.inner {
|
||||
Inner::Dirty { buf, .. } => &mut **buf,
|
||||
Inner::WritebackOngoing => unreachable!("writeback consumes"),
|
||||
Inner::WrittenBack => unreachable!("writeback consumes"),
|
||||
Inner::WritebackError => unreachable!("writeback consumes"),
|
||||
Inner::Dropped => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer<'_> {
|
||||
fn drop(&mut self) {
|
||||
let prev = std::mem::replace(&mut self.inner, Inner::Dropped);
|
||||
match prev {
|
||||
// TODO: check this at compile time
|
||||
Inner::Dirty { .. } => panic!("dropped dirty buffer, need to writeback() first"),
|
||||
Inner::WritebackOngoing => unreachable!("transitory state"),
|
||||
Inner::WrittenBack | Inner::WritebackError => {}
|
||||
Inner::Dropped => unreachable!("drop only happens once"),
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user