simplify page-caching of EphemeralFile (#4994)

(This PR is the successor of https://github.com/neondatabase/neon/pull/4984 )

## Summary

The current way in which `EphemeralFile` uses `PageCache` complicates
the Pageserver code base to a degree that isn't worth it.
This PR refactors how we cache `EphemeralFile` contents, by exploiting
the append-only nature of `EphemeralFile`.

The result is that `PageCache` only holds `ImmutableFilePage` and
`MaterializedPage`.
These types of pages are read-only and evictable without write-back.
This allows us to remove the writeback code from `PageCache`, also
eliminating an entire failure mode.

Futher, many great open-source libraries exist to solve the problem of a
read-only cache,
much better than our `page_cache.rs` (e.g., better replacement policy,
less global locking).
With this PR, we can now explore using them.

## Problem & Analysis

Before this PR, `PageCache` had three types of pages:

* `ImmutableFilePage`: caches Delta / Image layer file contents
* `MaterializedPage`: caches results of Timeline::get (page
materialization)
* `EphemeralPage`: caches `EphemeralFile` contents

`EphemeralPage` is quite different from `ImmutableFilePage` and
`MaterializedPage`:

* Immutable and materialized pages are for the acceleration of (future)
reads of the same data using `PAGE_CACHE_SIZE * PAGE_SIZE` bytes of
DRAM.
* Ephemeral pages are a write-back cache of `EphemeralFile` contents,
i.e., if there is pressure in the page cache, we spill `EphemeralFile`
contents to disk.

`EphemeralFile` is only used by `InMemoryLayer`, for the following
purposes:
* **write**: when filling up the `InMemoryLayer`, via `impl BlobWriter
for EphemeralFile`
* **read**: when doing **page reconstruction** for a page@lsn that isn't
written to disk
* **read**: when writing L0 layer files, we re-read the `InMemoryLayer`
and put the contents into the L0 delta writer
(**`create_delta_layer`**). This happens every 10min or when
InMemoryLayer reaches 256MB in size.

The access patterns of the `InMemoryLayer` use case are as follows:

* **write**: via `BlobWriter`, strictly append-only
* **read for page reconstruction**: via `BlobReader`, random
* **read for `create_delta_layer`**: via `BlobReader`, dependent on
data, but generally random. Why?
* in classical LSM terms, this function is what writes the
memory-resident `C0` tree into the disk-resident `C1` tree
* in our system, though, the values of InMemoryLayer are stored in an
EphemeralFile, and hence they are not guaranteed to be memory-resident
* the function reads `Value`s in `Key, LSN` order, which is `!=` insert
order

What do these `EphemeralFile`-level access patterns mean for the page
cache?

* **write**:
* the common case is that `Value` is a WAL record, and if it isn't a
full-page-image WAL record, then it's smaller than `PAGE_SIZE`
* So, the `EphemeralPage` pages act as a buffer for these `< PAGE_CACHE`
sized writes.
* If there's no page cache eviction between subsequent
`InMemoryLayer::put_value` calls, the `EphemeralPage` is still resident,
so the page cache avoids doing a `write` system call.
* In practice, a busy page server will have page cache evictions because
we only configure 64MB of page cache size.
* **reads for page reconstruction**: read acceleration, just as for the
other page types.
* **reads for `create_delta_layer`**:
* The `Value` reads happen through a `BlockCursor`, which optimizes the
case of repeated reads from the same page.
* So, the best case is that subsequent values are located on the same
page; hence `BlockCursor`s buffer is maximally effective.
* The worst case is that each `Value` is on a different page; hence the
`BlockCursor`'s 1-page-sized buffer is ineffective.
* The best case translates into `256MB/PAGE_SIZE` page cache accesses,
one per page.
    * the worst case translates into `#Values` page cache accesses
* again, the page cache accesses must be assumed to be random because
the `Value`s aren't accessed in insertion order but `Key, LSN` order.

## Summary of changes

Preliminaries for this PR were:

- #5003
- #5004 
- #5005 
  - uncommitted microbenchmark in #5011 

Based on the observations outlined above, this PR makes the following
changes:

* Rip out `EphemeralPage` from `page_cache.rs`
* Move the `block_io::FileId` to `page_cache::FileId`
* Add a `PAGE_SIZE`d buffer to the `EphemeralPage` struct.
  It's called `mutable_tail`.
* Change `write_blob` to use `mutable_tail` for the write buffering
instead of a page cache page.
* if `mutable_tail` is full, it writes it out to disk, zeroes it out,
and re-uses it.
* There is explicitly no double-buffering, so that memory allocation per
`EphemeralFile` instance is fixed.
* Change `read_blob` to return different `BlockLease` variants depending
on `blknum`
* for the `blknum` that corresponds to the `mutable_tail`, return a ref
to it
* Rust borrowing rules prevent `write_blob` calls while refs are
outstanding.
  * for all non-tail blocks, return a page-cached `ImmutablePage`
* It is safe to page-cache these as ImmutablePage because EphemeralFile
is append-only.

## Performance 

How doe the changes above affect performance?
M claim is: not significantly.

* **write path**:
* before this PR, the `EphemeralFile::write_blob` didn't issue its own
`write` system calls.
* If there were enough free pages, it didn't issue *any* `write` system
calls.
* If it had to evict other `EphemeralPage`s to get pages a page for its
writes (`get_buf_for_write`), the page cache code would implicitly issue
the writeback of victim pages as needed.
* With this PR, `EphemeralFile::write_blob` *always* issues *all* of its
*own* `write` system calls.
* Also, the writes are explicit instead of implicit through page cache
write back, which will help #4743
* The perf impact of always doing the writes is the CPU overhead and
syscall latency.
* Before this PR, we might have never issued them if there were enough
free pages.
* We don't issue `fsync` and can expect the writes to only hit the
kernel page cache.
* There is also an advantage in issuing the writes directly: the perf
impact is paid by the tenant that caused the writes, instead of whatever
tenant evicts the `EphemeralPage`.
* **reads for page reconstruction**: no impact.
* The `write_blob` function pre-warms the page cache when it writes the
`mutable_tail` to disk.
* So, the behavior is the same as with the EphemeralPages before this
PR.
* **reads for `create_delta_layer`**: no impact.
  * Same argument as for page reconstruction.
  * Note for the future:
* going through the page cache likely causes read amplification here.
Why?
* Due to the `Key,Lsn`-ordered access pattern, we don't read all the
values in the page before moving to the next page. In the worst case, we
might read the same page multiple times to read different `Values` from
it.
    * So, it might be better to bypass the page cache here.
    * Idea drafts:
      * bypass PS page cache + prefetch pipeline + iovec-based IO
* bypass PS page cache + use `copy_file_range` to copy from ephemeral
file into the L0 delta file, without going through user space
This commit is contained in:
Christian Schwarz
2023-08-18 19:31:03 +02:00
committed by GitHub
parent 0a082aee77
commit 7a63685cde
5 changed files with 174 additions and 332 deletions

View File

@@ -10,6 +10,42 @@
//! PostgreSQL buffer size, and a Slot struct for each buffer to contain
//! information about what's stored in the buffer.
//!
//! # Types Of Pages
//!
//! [`PageCache`] only supports immutable pages.
//! Hence there is no need to worry about coherency.
//!
//! Two types of pages are supported:
//!
//! * **Materialized pages**, filled & used by page reconstruction
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
//!
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
//! It uses the page cache only for the blocks that are already fully written and immutable.
//!
//! # Filling The Page Cache
//!
//! Page cache maps from a cache key to a buffer slot.
//! The cache key uniquely identifies the piece of data that is being cached.
//!
//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
//!
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
//! * Get a [`FileId`] using [`next_file_id`].
//! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
//! a read guard for the page. Just use it.
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
//! a write guard for the page. Fill the page with the contents of the on-disk file.
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
//! Then try again to [`PageCache::read_immutable_buf`].
//! Unless there's high cache pressure, the page should now be cached.
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
//!
//! # Locking
//!
//! There are two levels of locking involved: There's one lock for the "mapping"
@@ -40,20 +76,18 @@ use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU8, AtomicUsize, Ordering},
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
},
};
use anyhow::Context;
use once_cell::sync::OnceCell;
use tracing::error;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::tenant::{block_io, ephemeral_file, writeback_ephemeral_file};
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
@@ -87,6 +121,17 @@ pub fn get() -> &'static PageCache {
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
const MAX_USAGE_COUNT: u8 = 5;
/// See module-level comment.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct FileId(u64);
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
/// See module-level comment.
pub fn next_file_id() -> FileId {
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
@@ -97,12 +142,8 @@ enum CacheKey {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
EphemeralPage {
file_id: ephemeral_file::FileId,
blkno: u32,
},
ImmutableFilePage {
file_id: block_io::FileId,
file_id: FileId,
blkno: u32,
},
}
@@ -128,7 +169,6 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
buf: &'static mut [u8; PAGE_SZ],
dirty: bool,
}
impl Slot {
@@ -177,9 +217,7 @@ pub struct PageCache {
/// can have a separate mapping map, next to this field.
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
ephemeral_page_map: RwLock<HashMap<(ephemeral_file::FileId, u32), usize>>,
immutable_page_map: RwLock<HashMap<(block_io::FileId, u32), usize>>,
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
@@ -258,14 +296,6 @@ impl PageWriteGuard<'_> {
);
self.valid = true;
}
pub fn mark_dirty(&mut self) {
// only ephemeral pages can be dirty ATM.
assert!(matches!(
self.inner.key,
Some(CacheKey::EphemeralPage { .. })
));
self.inner.dirty = true;
}
}
impl Drop for PageWriteGuard<'_> {
@@ -280,7 +310,6 @@ impl Drop for PageWriteGuard<'_> {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
self.inner.dirty = false;
}
}
}
@@ -388,62 +417,16 @@ impl PageCache {
Ok(())
}
// Section 1.2: Public interface functions for working with Ephemeral pages.
// Section 1.2: Public interface functions for working with immutable file pages.
pub fn read_ephemeral_buf(
&self,
file_id: ephemeral_file::FileId,
blkno: u32,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::EphemeralPage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
pub fn write_ephemeral_buf(
&self,
file_id: ephemeral_file::FileId,
blkno: u32,
) -> anyhow::Result<WriteBufResult> {
let cache_key = CacheKey::EphemeralPage { file_id, blkno };
self.lock_for_write(&cache_key)
}
/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_ephemeral(&self, drop_file_id: ephemeral_file::FileId) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
let mut inner = slot.inner.write().unwrap();
if let Some(key) = &inner.key {
match key {
CacheKey::EphemeralPage { file_id, blkno: _ } if *file_id == drop_file_id => {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
}
}
}
// Section 1.3: Public interface functions for working with immutable file pages.
pub fn read_immutable_buf(
&self,
file_id: block_io::FileId,
blkno: u32,
) -> anyhow::Result<ReadBufResult> {
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key)
}
/// Immediately drop all buffers belonging to given file, without writeback
pub fn drop_buffers_for_immutable(&self, drop_file_id: block_io::FileId) {
/// Immediately drop all buffers belonging to given file
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
for slot_idx in 0..self.slots.len() {
let slot = &self.slots[slot_idx];
@@ -456,7 +439,6 @@ impl PageCache {
// remove mapping for old buffer
self.remove_mapping(key);
inner.key = None;
inner.dirty = false;
}
_ => {}
}
@@ -534,10 +516,6 @@ impl PageCache {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::EphemeralPage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_ephemeral,
&crate::metrics::PAGE_CACHE.read_hits_ephemeral,
),
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
&crate::metrics::PAGE_CACHE.read_hits_immutable,
@@ -578,7 +556,6 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
inner.dirty = false;
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
@@ -640,7 +617,6 @@ impl PageCache {
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
inner.dirty = false;
slot.usage_count.store(1, Ordering::Relaxed);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
@@ -679,10 +655,6 @@ impl PageCache {
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -706,10 +678,6 @@ impl PageCache {
None
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let map = self.ephemeral_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -743,12 +711,6 @@ impl PageCache {
panic!("could not find old key in mapping")
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
.expect("could not find old key in mapping");
self.size_metrics.current_bytes_ephemeral.sub_page_sz(1);
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
@@ -788,17 +750,7 @@ impl PageCache {
}
}
}
CacheKey::EphemeralPage { file_id, blkno } => {
let mut map = self.ephemeral_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
Entry::Occupied(entry) => Some(*entry.get()),
Entry::Vacant(entry) => {
entry.insert(slot_idx);
self.size_metrics.current_bytes_ephemeral.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
@@ -849,25 +801,8 @@ impl PageCache {
}
};
if let Some(old_key) = &inner.key {
if inner.dirty {
if let Err(err) = Self::writeback(old_key, inner.buf) {
// Writing the page to disk failed.
//
// FIXME: What to do here, when? We could propagate the error to the
// caller, but victim buffer is generally unrelated to the original
// call. It can even belong to a different tenant. Currently, we
// report the error to the log and continue the clock sweep to find
// a different victim. But if the problem persists, the page cache
// could fill up with dirty pages that we cannot evict, and we will
// loop retrying the writebacks indefinitely.
error!("writeback of buffer {:?} failed: {}", old_key, err);
continue;
}
}
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.dirty = false;
inner.key = None;
}
return Ok((slot_idx, inner));
@@ -875,28 +810,6 @@ impl PageCache {
}
}
fn writeback(cache_key: &CacheKey, buf: &[u8]) -> Result<(), std::io::Error> {
match cache_key {
CacheKey::MaterializedPage {
hash_key: _,
lsn: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty materialized page",
)),
CacheKey::EphemeralPage { file_id, blkno } => {
writeback_ephemeral_file(*file_id, *blkno, buf)
}
CacheKey::ImmutableFilePage {
file_id: _,
blkno: _,
} => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"unexpected dirty immutable page",
)),
}
}
/// Initialize a new page cache
///
/// This should be called only once at page server startup.
@@ -907,7 +820,6 @@ impl PageCache {
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_ephemeral.set_page_sz(0);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
@@ -917,11 +829,7 @@ impl PageCache {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: RwLock::new(SlotInner {
key: None,
buf,
dirty: false,
}),
inner: RwLock::new(SlotInner { key: None, buf }),
usage_count: AtomicU8::new(0),
}
})
@@ -929,7 +837,6 @@ impl PageCache {
Self {
materialized_page_map: Default::default(),
ephemeral_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),

View File

@@ -136,9 +136,6 @@ pub use timeline::{
LocalLayerInfoForDiskUsageEviction, LogicalSizeCalculationCause, PageReconstructError, Timeline,
};
// re-export this function so that page_cache.rs can use it.
pub use crate::tenant::ephemeral_file::writeback as writeback_ephemeral_file;
// re-export for use in remote_timeline_client.rs
pub use crate::tenant::metadata::save_metadata;

View File

@@ -6,7 +6,6 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::os::unix::fs::FileExt;
use std::sync::atomic::AtomicU64;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -43,37 +42,34 @@ where
}
}
/// A block accessible for reading
///
/// During builds with `#[cfg(test)]`, this is a proper enum
/// with two variants to support testing code. During normal
/// builds, it just has one variant and is thus a cheap newtype
/// wrapper of [`PageReadGuard`]
pub enum BlockLease {
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Rc(std::rc::Rc<[u8; PAGE_SZ]>),
}
impl From<PageReadGuard<'static>> for BlockLease {
fn from(value: PageReadGuard<'static>) -> Self {
impl From<PageReadGuard<'static>> for BlockLease<'static> {
fn from(value: PageReadGuard<'static>) -> BlockLease<'static> {
BlockLease::PageReadGuard(value)
}
}
#[cfg(test)]
impl From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease {
impl<'a> From<std::rc::Rc<[u8; PAGE_SZ]>> for BlockLease<'a> {
fn from(value: std::rc::Rc<[u8; PAGE_SZ]>) -> Self {
BlockLease::Rc(value)
}
}
impl Deref for BlockLease {
impl<'a> Deref for BlockLease<'a> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Rc(v) => v.deref(),
}
@@ -116,13 +112,6 @@ where
self.reader.read_blk(blknum)
}
}
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct FileId(u64);
fn next_file_id() -> FileId {
FileId(NEXT_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed))
}
/// An adapter for reading a (virtual) file using the page cache.
///
@@ -132,7 +121,7 @@ pub struct FileBlockReader<F> {
pub file: F,
/// Unique ID of this file, used as key in the page cache.
file_id: FileId,
file_id: page_cache::FileId,
}
impl<F> FileBlockReader<F>
@@ -140,7 +129,7 @@ where
F: FileExt,
{
pub fn new(file: F) -> Self {
let file_id = next_file_id();
let file_id = page_cache::next_file_id();
FileBlockReader { file_id, file }
}
@@ -157,7 +146,6 @@ where
F: FileExt,
{
fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
// Look up the right page
let cache = page_cache::get();
loop {
match cache

View File

@@ -2,54 +2,31 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::page_cache::{self, ReadBufResult, WriteBufResult, PAGE_SZ};
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::blob_io::BlobWriter;
use crate::tenant::block_io::{BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use once_cell::sync::Lazy;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::os::unix::prelude::FileExt;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::sync::atomic::AtomicU64;
use tracing::*;
use utils::id::{TenantId, TimelineId};
///
/// This is the global cache of file descriptors (File objects).
///
static EPHEMERAL_FILES: Lazy<RwLock<EphemeralFiles>> = Lazy::new(|| {
RwLock::new(EphemeralFiles {
next_file_id: FileId(1),
files: HashMap::new(),
})
});
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct FileId(u64);
impl std::fmt::Display for FileId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct EphemeralFiles {
next_file_id: FileId,
files: HashMap<FileId, Arc<VirtualFile>>,
}
pub struct EphemeralFile {
file_id: FileId,
page_cache_file_id: page_cache::FileId,
_tenant_id: TenantId,
_timeline_id: TimelineId,
file: Arc<VirtualFile>,
pub size: u64,
file: VirtualFile,
size: u64,
/// An ephemeral file is append-only.
/// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
/// The other pages, which can no longer be modified, are accessed through the page cache.
mutable_tail: [u8; PAGE_SZ],
}
impl EphemeralFile {
@@ -58,74 +35,31 @@ impl EphemeralFile {
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<EphemeralFile, io::Error> {
let mut l = EPHEMERAL_FILES.write().unwrap();
let file_id = l.next_file_id;
l.next_file_id = FileId(l.next_file_id.0 + 1);
static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
let filename_disambiguator =
NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let filename = conf
.timeline_path(&tenant_id, &timeline_id)
.join(PathBuf::from(format!("ephemeral-{}", file_id)));
.join(PathBuf::from(format!("ephemeral-{filename_disambiguator}")));
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)?;
let file_rc = Arc::new(file);
l.files.insert(file_id, file_rc.clone());
Ok(EphemeralFile {
file_id,
page_cache_file_id: page_cache::next_file_id(),
_tenant_id: tenant_id,
_timeline_id: timeline_id,
file: file_rc,
file,
size: 0,
mutable_tail: [0u8; PAGE_SZ],
})
}
fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), io::Error> {
let mut off = 0;
while off < PAGE_SZ {
let n = self
.file
.read_at(&mut buf[off..], blkno as u64 * PAGE_SZ as u64 + off as u64)?;
if n == 0 {
// Reached EOF. Fill the rest of the buffer with zeros.
const ZERO_BUF: [u8; PAGE_SZ] = [0u8; PAGE_SZ];
buf[off..].copy_from_slice(&ZERO_BUF[off..]);
break;
}
off += n;
}
Ok(())
}
fn get_buf_for_write(
&self,
blkno: u32,
) -> Result<page_cache::PageWriteGuard<'static>, io::Error> {
// Look up the right page
let cache = page_cache::get();
let mut write_guard = match cache
.write_ephemeral_buf(self.file_id, blkno)
.map_err(|e| to_io_error(e, "Failed to write ephemeral buf"))?
{
WriteBufResult::Found(guard) => guard,
WriteBufResult::NotFound(mut guard) => {
// Read the page from disk into the buffer
// TODO: if we're overwriting the whole page, no need to read it in first
self.fill_buffer(guard.deref_mut(), blkno)?;
guard.mark_valid();
// And then fall through to modify it.
guard
}
};
write_guard.mark_dirty();
Ok(write_guard)
pub(crate) fn size(&self) -> u64 {
self.size
}
}
@@ -146,49 +80,74 @@ impl BlobWriter for EphemeralFile {
blknum: u32,
/// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
off: usize,
/// Used by [`push_bytes`] to memoize the page cache write guard across calls to it.
memo_page_guard: MemoizedPageWriteGuard,
}
struct MemoizedPageWriteGuard {
guard: page_cache::PageWriteGuard<'static>,
/// The block number of the page in `guard`.
blknum: u32,
}
impl<'a> Writer<'a> {
fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
let blknum = (ephemeral_file.size / PAGE_SZ as u64) as u32;
Ok(Writer {
blknum,
blknum: (ephemeral_file.size / PAGE_SZ as u64) as u32,
off: (ephemeral_file.size % PAGE_SZ as u64) as usize,
memo_page_guard: MemoizedPageWriteGuard {
guard: ephemeral_file.get_buf_for_write(blknum)?,
blknum,
},
ephemeral_file,
})
}
#[inline(always)]
fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
// `src_remaining` is the remaining bytes to be written
let mut src_remaining = src;
while !src_remaining.is_empty() {
let page = if self.memo_page_guard.blknum == self.blknum {
&mut self.memo_page_guard.guard
} else {
self.memo_page_guard.guard =
self.ephemeral_file.get_buf_for_write(self.blknum)?;
self.memo_page_guard.blknum = self.blknum;
&mut self.memo_page_guard.guard
};
let dst_remaining = &mut page[self.off..];
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
let n = min(dst_remaining.len(), src_remaining.len());
dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
self.off += n;
src_remaining = &src_remaining[n..];
if self.off == PAGE_SZ {
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
match self.ephemeral_file.file.write_all_at(
&self.ephemeral_file.mutable_tail,
self.blknum as u64 * PAGE_SZ as u64,
) {
Ok(_) => {
// Pre-warm the page cache with what we just wrote.
// This isn't necessary for coherency/correctness, but it's how we've always done it.
let cache = page_cache::get();
match cache.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
) {
Ok(page_cache::ReadBufResult::Found(_guard)) => {
// This function takes &mut self, so, it shouldn't be possible to reach this point.
unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
}
Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {
error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
// fail gracefully, it's not the end of the world if we can't pre-warm the cache here
}
}
// Zero the buffer for re-use.
// Zeroing is critical for correcntess because the write_blob code below
// and similarly read_blk expect zeroed pages.
self.ephemeral_file.mutable_tail.fill(0);
// This block is done, move to next one.
self.blknum += 1;
self.off = 0;
}
Err(e) => {
return Err(std::io::Error::new(
ErrorKind::Other,
// order error before path because path is long and error is short
format!(
"ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
self.blknum,
e,
self.ephemeral_file.file.path.display(),
),
));
}
}
}
}
Ok(())
@@ -227,10 +186,7 @@ impl Drop for EphemeralFile {
fn drop(&mut self) {
// drop all pages from page cache
let cache = page_cache::get();
cache.drop_buffers_for_ephemeral(self.file_id);
// remove entry from the hash map
EPHEMERAL_FILES.write().unwrap().files.remove(&self.file_id);
cache.drop_buffers_for_immutable(self.page_cache_file_id);
// unlink the file
let res = std::fs::remove_file(&self.file.path);
@@ -250,54 +206,48 @@ impl Drop for EphemeralFile {
}
}
pub fn writeback(file_id: FileId, blkno: u32, buf: &[u8]) -> Result<(), io::Error> {
if let Some(file) = EPHEMERAL_FILES.read().unwrap().files.get(&file_id) {
match file.write_all_at(buf, blkno as u64 * PAGE_SZ as u64) {
Ok(_) => Ok(()),
Err(e) => Err(io::Error::new(
ErrorKind::Other,
format!(
"failed to write back to ephemeral file at {} error: {}",
file.path.display(),
e
),
)),
}
} else {
Err(io::Error::new(
ErrorKind::Other,
"could not write back page, not found in ephemeral files hash",
))
}
}
impl BlockReader for EphemeralFile {
fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
// Look up the right page
let cache = page_cache::get();
loop {
match cache
.read_ephemeral_buf(self.file_id, blknum)
.map_err(|e| to_io_error(e, "Failed to read ephemeral buf"))?
{
ReadBufResult::Found(guard) => return Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum)?;
write_guard.mark_valid();
let flushed_blknums = 0..self.size / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum)
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
// Swap for read lock
continue;
}
};
}
} else {
debug_assert_eq!(blknum as u64, self.size / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
}
}
}
fn to_io_error(e: anyhow::Error, context: &str) -> io::Error {
io::Error::new(ErrorKind::Other, format!("{context}: {e:#}"))
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -230,11 +230,11 @@ impl std::fmt::Display for InMemoryLayer {
impl InMemoryLayer {
///
/// Get layer size on the disk
/// Get layer size.
///
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.size)
Ok(inner.file.size())
}
///