mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
(This PR is the successor of https://github.com/neondatabase/neon/pull/4984 ) ## Summary The current way in which `EphemeralFile` uses `PageCache` complicates the Pageserver code base to a degree that isn't worth it. This PR refactors how we cache `EphemeralFile` contents, by exploiting the append-only nature of `EphemeralFile`. The result is that `PageCache` only holds `ImmutableFilePage` and `MaterializedPage`. These types of pages are read-only and evictable without write-back. This allows us to remove the writeback code from `PageCache`, also eliminating an entire failure mode. Futher, many great open-source libraries exist to solve the problem of a read-only cache, much better than our `page_cache.rs` (e.g., better replacement policy, less global locking). With this PR, we can now explore using them. ## Problem & Analysis Before this PR, `PageCache` had three types of pages: * `ImmutableFilePage`: caches Delta / Image layer file contents * `MaterializedPage`: caches results of Timeline::get (page materialization) * `EphemeralPage`: caches `EphemeralFile` contents `EphemeralPage` is quite different from `ImmutableFilePage` and `MaterializedPage`: * Immutable and materialized pages are for the acceleration of (future) reads of the same data using `PAGE_CACHE_SIZE * PAGE_SIZE` bytes of DRAM. * Ephemeral pages are a write-back cache of `EphemeralFile` contents, i.e., if there is pressure in the page cache, we spill `EphemeralFile` contents to disk. `EphemeralFile` is only used by `InMemoryLayer`, for the following purposes: * **write**: when filling up the `InMemoryLayer`, via `impl BlobWriter for EphemeralFile` * **read**: when doing **page reconstruction** for a page@lsn that isn't written to disk * **read**: when writing L0 layer files, we re-read the `InMemoryLayer` and put the contents into the L0 delta writer (**`create_delta_layer`**). This happens every 10min or when InMemoryLayer reaches 256MB in size. The access patterns of the `InMemoryLayer` use case are as follows: * **write**: via `BlobWriter`, strictly append-only * **read for page reconstruction**: via `BlobReader`, random * **read for `create_delta_layer`**: via `BlobReader`, dependent on data, but generally random. Why? * in classical LSM terms, this function is what writes the memory-resident `C0` tree into the disk-resident `C1` tree * in our system, though, the values of InMemoryLayer are stored in an EphemeralFile, and hence they are not guaranteed to be memory-resident * the function reads `Value`s in `Key, LSN` order, which is `!=` insert order What do these `EphemeralFile`-level access patterns mean for the page cache? * **write**: * the common case is that `Value` is a WAL record, and if it isn't a full-page-image WAL record, then it's smaller than `PAGE_SIZE` * So, the `EphemeralPage` pages act as a buffer for these `< PAGE_CACHE` sized writes. * If there's no page cache eviction between subsequent `InMemoryLayer::put_value` calls, the `EphemeralPage` is still resident, so the page cache avoids doing a `write` system call. * In practice, a busy page server will have page cache evictions because we only configure 64MB of page cache size. * **reads for page reconstruction**: read acceleration, just as for the other page types. * **reads for `create_delta_layer`**: * The `Value` reads happen through a `BlockCursor`, which optimizes the case of repeated reads from the same page. * So, the best case is that subsequent values are located on the same page; hence `BlockCursor`s buffer is maximally effective. * The worst case is that each `Value` is on a different page; hence the `BlockCursor`'s 1-page-sized buffer is ineffective. * The best case translates into `256MB/PAGE_SIZE` page cache accesses, one per page. * the worst case translates into `#Values` page cache accesses * again, the page cache accesses must be assumed to be random because the `Value`s aren't accessed in insertion order but `Key, LSN` order. ## Summary of changes Preliminaries for this PR were: - #5003 - #5004 - #5005 - uncommitted microbenchmark in #5011 Based on the observations outlined above, this PR makes the following changes: * Rip out `EphemeralPage` from `page_cache.rs` * Move the `block_io::FileId` to `page_cache::FileId` * Add a `PAGE_SIZE`d buffer to the `EphemeralPage` struct. It's called `mutable_tail`. * Change `write_blob` to use `mutable_tail` for the write buffering instead of a page cache page. * if `mutable_tail` is full, it writes it out to disk, zeroes it out, and re-uses it. * There is explicitly no double-buffering, so that memory allocation per `EphemeralFile` instance is fixed. * Change `read_blob` to return different `BlockLease` variants depending on `blknum` * for the `blknum` that corresponds to the `mutable_tail`, return a ref to it * Rust borrowing rules prevent `write_blob` calls while refs are outstanding. * for all non-tail blocks, return a page-cached `ImmutablePage` * It is safe to page-cache these as ImmutablePage because EphemeralFile is append-only. ## Performance How doe the changes above affect performance? M claim is: not significantly. * **write path**: * before this PR, the `EphemeralFile::write_blob` didn't issue its own `write` system calls. * If there were enough free pages, it didn't issue *any* `write` system calls. * If it had to evict other `EphemeralPage`s to get pages a page for its writes (`get_buf_for_write`), the page cache code would implicitly issue the writeback of victim pages as needed. * With this PR, `EphemeralFile::write_blob` *always* issues *all* of its *own* `write` system calls. * Also, the writes are explicit instead of implicit through page cache write back, which will help #4743 * The perf impact of always doing the writes is the CPU overhead and syscall latency. * Before this PR, we might have never issued them if there were enough free pages. * We don't issue `fsync` and can expect the writes to only hit the kernel page cache. * There is also an advantage in issuing the writes directly: the perf impact is paid by the tenant that caused the writes, instead of whatever tenant evicts the `EphemeralPage`. * **reads for page reconstruction**: no impact. * The `write_blob` function pre-warms the page cache when it writes the `mutable_tail` to disk. * So, the behavior is the same as with the EphemeralPages before this PR. * **reads for `create_delta_layer`**: no impact. * Same argument as for page reconstruction. * Note for the future: * going through the page cache likely causes read amplification here. Why? * Due to the `Key,Lsn`-ordered access pattern, we don't read all the values in the page before moving to the next page. In the worst case, we might read the same page multiple times to read different `Values` from it. * So, it might be better to bypass the page cache here. * Idea drafts: * bypass PS page cache + prefetch pipeline + iovec-based IO * bypass PS page cache + use `copy_file_range` to copy from ephemeral file into the L0 delta file, without going through user space
870 lines
31 KiB
Rust
870 lines
31 KiB
Rust
//!
|
|
//! Global page cache
|
|
//!
|
|
//! The page cache uses up most of the memory in the page server. It is shared
|
|
//! by all tenants, and it is used to store different kinds of pages. Sharing
|
|
//! the cache allows memory to be dynamically allocated where it's needed the
|
|
//! most.
|
|
//!
|
|
//! The page cache consists of fixed-size buffers, 8 kB each to match the
|
|
//! PostgreSQL buffer size, and a Slot struct for each buffer to contain
|
|
//! information about what's stored in the buffer.
|
|
//!
|
|
//! # Types Of Pages
|
|
//!
|
|
//! [`PageCache`] only supports immutable pages.
|
|
//! Hence there is no need to worry about coherency.
|
|
//!
|
|
//! Two types of pages are supported:
|
|
//!
|
|
//! * **Materialized pages**, filled & used by page reconstruction
|
|
//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`].
|
|
//!
|
|
//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only.
|
|
//! It uses the page cache only for the blocks that are already fully written and immutable.
|
|
//!
|
|
//! # Filling The Page Cache
|
|
//!
|
|
//! Page cache maps from a cache key to a buffer slot.
|
|
//! The cache key uniquely identifies the piece of data that is being cached.
|
|
//!
|
|
//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`].
|
|
//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access.
|
|
//!
|
|
//! The cache key for **immutable file** pages is [`FileId`] and a block number.
|
|
//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following:
|
|
//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`].
|
|
//! * Get a [`FileId`] using [`next_file_id`].
|
|
//! * Use the mechanism to associate the on-disk file with the returned [`FileId`].
|
|
//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`].
|
|
//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains
|
|
//! a read guard for the page. Just use it.
|
|
//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains
|
|
//! a write guard for the page. Fill the page with the contents of the on-disk file.
|
|
//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid.
|
|
//! Then try again to [`PageCache::read_immutable_buf`].
|
|
//! Unless there's high cache pressure, the page should now be cached.
|
|
//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.)
|
|
//!
|
|
//! # Locking
|
|
//!
|
|
//! There are two levels of locking involved: There's one lock for the "mapping"
|
|
//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer
|
|
//! slot, and a separate lock on each slot. To read or write the contents of a
|
|
//! slot, you must hold the lock on the slot in read or write mode,
|
|
//! respectively. To change the mapping of a slot, i.e. to evict a page or to
|
|
//! assign a buffer for a page, you must hold the mapping lock and the lock on
|
|
//! the slot at the same time.
|
|
//!
|
|
//! Whenever you need to hold both locks simultaneously, the slot lock must be
|
|
//! acquired first. This consistent ordering avoids deadlocks. To look up a page
|
|
//! in the cache, you would first look up the mapping, while holding the mapping
|
|
//! lock, and then lock the slot. You must release the mapping lock in between,
|
|
//! to obey the lock ordering and avoid deadlock.
|
|
//!
|
|
//! A slot can momentarily have invalid contents, even if it's already been
|
|
//! inserted to the mapping, but you must hold the write-lock on the slot until
|
|
//! the contents are valid. If you need to release the lock without initializing
|
|
//! the contents, you must remove the mapping first. We make that easy for the
|
|
//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized
|
|
//! page, the caller must explicitly call guard.mark_valid() after it has
|
|
//! initialized it. If the guard is dropped without calling mark_valid(), the
|
|
//! mapping is automatically removed and the slot is marked free.
|
|
//!
|
|
|
|
use std::{
|
|
collections::{hash_map::Entry, HashMap},
|
|
convert::TryInto,
|
|
sync::{
|
|
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
|
RwLock, RwLockReadGuard, RwLockWriteGuard, TryLockError,
|
|
},
|
|
};
|
|
|
|
use anyhow::Context;
|
|
use once_cell::sync::OnceCell;
|
|
use utils::{
|
|
id::{TenantId, TimelineId},
|
|
lsn::Lsn,
|
|
};
|
|
|
|
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
|
|
|
|
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
|
|
const TEST_PAGE_CACHE_SIZE: usize = 50;
|
|
|
|
///
|
|
/// Initialize the page cache. This must be called once at page server startup.
|
|
///
|
|
pub fn init(size: usize) {
|
|
if PAGE_CACHE.set(PageCache::new(size)).is_err() {
|
|
panic!("page cache already initialized");
|
|
}
|
|
}
|
|
|
|
///
|
|
/// Get a handle to the page cache.
|
|
///
|
|
pub fn get() -> &'static PageCache {
|
|
//
|
|
// In unit tests, page server startup doesn't happen and no one calls
|
|
// page_cache::init(). Initialize it here with a tiny cache, so that the
|
|
// page cache is usable in unit tests.
|
|
//
|
|
if cfg!(test) {
|
|
PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE))
|
|
} else {
|
|
PAGE_CACHE.get().expect("page cache not initialized")
|
|
}
|
|
}
|
|
|
|
pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize;
|
|
const MAX_USAGE_COUNT: u8 = 5;
|
|
|
|
/// See module-level comment.
|
|
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
|
pub struct FileId(u64);
|
|
|
|
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
|
|
|
|
/// See module-level comment.
|
|
pub fn next_file_id() -> FileId {
|
|
FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed))
|
|
}
|
|
|
|
///
|
|
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
|
|
///
|
|
#[derive(Debug, PartialEq, Eq, Clone)]
|
|
#[allow(clippy::enum_variant_names)]
|
|
enum CacheKey {
|
|
MaterializedPage {
|
|
hash_key: MaterializedPageHashKey,
|
|
lsn: Lsn,
|
|
},
|
|
ImmutableFilePage {
|
|
file_id: FileId,
|
|
blkno: u32,
|
|
},
|
|
}
|
|
|
|
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
|
struct MaterializedPageHashKey {
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
key: Key,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
struct Version {
|
|
lsn: Lsn,
|
|
slot_idx: usize,
|
|
}
|
|
|
|
struct Slot {
|
|
inner: RwLock<SlotInner>,
|
|
usage_count: AtomicU8,
|
|
}
|
|
|
|
struct SlotInner {
|
|
key: Option<CacheKey>,
|
|
buf: &'static mut [u8; PAGE_SZ],
|
|
}
|
|
|
|
impl Slot {
|
|
/// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT.
|
|
fn inc_usage_count(&self) {
|
|
let _ = self
|
|
.usage_count
|
|
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
|
|
if val == MAX_USAGE_COUNT {
|
|
None
|
|
} else {
|
|
Some(val + 1)
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Decrement usage count on the buffer, unless it's already zero. Returns
|
|
/// the old usage count.
|
|
fn dec_usage_count(&self) -> u8 {
|
|
let count_res =
|
|
self.usage_count
|
|
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| {
|
|
if val == 0 {
|
|
None
|
|
} else {
|
|
Some(val - 1)
|
|
}
|
|
});
|
|
|
|
match count_res {
|
|
Ok(usage_count) => usage_count,
|
|
Err(usage_count) => usage_count,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct PageCache {
|
|
/// This contains the mapping from the cache key to buffer slot that currently
|
|
/// contains the page, if any.
|
|
///
|
|
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
|
|
/// this HashMap can be replaced with a more concurrent version, there are
|
|
/// plenty of such crates around.
|
|
///
|
|
/// If you add support for caching different kinds of objects, each object kind
|
|
/// can have a separate mapping map, next to this field.
|
|
materialized_page_map: RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
|
|
|
immutable_page_map: RwLock<HashMap<(FileId, u32), usize>>,
|
|
|
|
/// The actual buffers with their metadata.
|
|
slots: Box<[Slot]>,
|
|
|
|
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
|
/// This is interpreted modulo the page cache size.
|
|
next_evict_slot: AtomicUsize,
|
|
|
|
size_metrics: &'static PageCacheSizeMetrics,
|
|
}
|
|
|
|
///
|
|
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
|
/// until the guard is dropped.
|
|
///
|
|
pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>);
|
|
|
|
impl std::ops::Deref for PageReadGuard<'_> {
|
|
type Target = [u8; PAGE_SZ];
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
self.0.buf
|
|
}
|
|
}
|
|
|
|
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
|
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
|
self.0.buf
|
|
}
|
|
}
|
|
|
|
///
|
|
/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked
|
|
/// until the guard is dropped.
|
|
///
|
|
/// Counterintuitively, this is used even for a read, if the requested page is not
|
|
/// currently found in the page cache. In that case, the caller of lock_for_read()
|
|
/// is expected to fill in the page contents and call mark_valid(). Similarly
|
|
/// lock_for_write() can return an invalid buffer that the caller is expected to
|
|
/// to initialize.
|
|
///
|
|
pub struct PageWriteGuard<'i> {
|
|
inner: RwLockWriteGuard<'i, SlotInner>,
|
|
|
|
// Are the page contents currently valid?
|
|
valid: bool,
|
|
}
|
|
|
|
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
|
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
self.inner.buf
|
|
}
|
|
}
|
|
|
|
impl std::ops::Deref for PageWriteGuard<'_> {
|
|
type Target = [u8; PAGE_SZ];
|
|
|
|
fn deref(&self) -> &Self::Target {
|
|
self.inner.buf
|
|
}
|
|
}
|
|
|
|
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
|
|
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
|
|
self.inner.buf
|
|
}
|
|
}
|
|
|
|
impl PageWriteGuard<'_> {
|
|
/// Mark that the buffer contents are now valid.
|
|
pub fn mark_valid(&mut self) {
|
|
assert!(self.inner.key.is_some());
|
|
assert!(
|
|
!self.valid,
|
|
"mark_valid called on a buffer that was already valid"
|
|
);
|
|
self.valid = true;
|
|
}
|
|
}
|
|
|
|
impl Drop for PageWriteGuard<'_> {
|
|
///
|
|
/// If the buffer was allocated for a page that was not already in the
|
|
/// cache, but the lock_for_read/write() caller dropped the buffer without
|
|
/// initializing it, remove the mapping from the page cache.
|
|
///
|
|
fn drop(&mut self) {
|
|
assert!(self.inner.key.is_some());
|
|
if !self.valid {
|
|
let self_key = self.inner.key.as_ref().unwrap();
|
|
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
|
self.inner.key = None;
|
|
}
|
|
}
|
|
}
|
|
|
|
/// lock_for_read() return value
|
|
pub enum ReadBufResult<'a> {
|
|
Found(PageReadGuard<'a>),
|
|
NotFound(PageWriteGuard<'a>),
|
|
}
|
|
|
|
/// lock_for_write() return value
|
|
pub enum WriteBufResult<'a> {
|
|
Found(PageWriteGuard<'a>),
|
|
NotFound(PageWriteGuard<'a>),
|
|
}
|
|
|
|
impl PageCache {
|
|
//
|
|
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
|
// versions in the page cache
|
|
//
|
|
|
|
/// Look up a materialized page version.
|
|
///
|
|
/// The 'lsn' is an upper bound, this will return the latest version of
|
|
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
|
/// returned page.
|
|
pub fn lookup_materialized_page(
|
|
&self,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
key: &Key,
|
|
lsn: Lsn,
|
|
) -> Option<(Lsn, PageReadGuard)> {
|
|
crate::metrics::PAGE_CACHE
|
|
.read_accesses_materialized_page
|
|
.inc();
|
|
|
|
let mut cache_key = CacheKey::MaterializedPage {
|
|
hash_key: MaterializedPageHashKey {
|
|
tenant_id,
|
|
timeline_id,
|
|
key: *key,
|
|
},
|
|
lsn,
|
|
};
|
|
|
|
if let Some(guard) = self.try_lock_for_read(&mut cache_key) {
|
|
if let CacheKey::MaterializedPage {
|
|
hash_key: _,
|
|
lsn: available_lsn,
|
|
} = cache_key
|
|
{
|
|
if available_lsn == lsn {
|
|
crate::metrics::PAGE_CACHE
|
|
.read_hits_materialized_page_exact
|
|
.inc();
|
|
} else {
|
|
crate::metrics::PAGE_CACHE
|
|
.read_hits_materialized_page_older_lsn
|
|
.inc();
|
|
}
|
|
Some((available_lsn, guard))
|
|
} else {
|
|
panic!("unexpected key type in slot");
|
|
}
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
|
|
///
|
|
/// Store an image of the given page in the cache.
|
|
///
|
|
pub fn memorize_materialized_page(
|
|
&self,
|
|
tenant_id: TenantId,
|
|
timeline_id: TimelineId,
|
|
key: Key,
|
|
lsn: Lsn,
|
|
img: &[u8],
|
|
) -> anyhow::Result<()> {
|
|
let cache_key = CacheKey::MaterializedPage {
|
|
hash_key: MaterializedPageHashKey {
|
|
tenant_id,
|
|
timeline_id,
|
|
key,
|
|
},
|
|
lsn,
|
|
};
|
|
|
|
match self.lock_for_write(&cache_key)? {
|
|
WriteBufResult::Found(write_guard) => {
|
|
// We already had it in cache. Another thread must've put it there
|
|
// concurrently. Check that it had the same contents that we
|
|
// replayed.
|
|
assert!(*write_guard == img);
|
|
}
|
|
WriteBufResult::NotFound(mut write_guard) => {
|
|
write_guard.copy_from_slice(img);
|
|
write_guard.mark_valid();
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Section 1.2: Public interface functions for working with immutable file pages.
|
|
|
|
pub fn read_immutable_buf(&self, file_id: FileId, blkno: u32) -> anyhow::Result<ReadBufResult> {
|
|
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
|
|
|
|
self.lock_for_read(&mut cache_key)
|
|
}
|
|
|
|
/// Immediately drop all buffers belonging to given file
|
|
pub fn drop_buffers_for_immutable(&self, drop_file_id: FileId) {
|
|
for slot_idx in 0..self.slots.len() {
|
|
let slot = &self.slots[slot_idx];
|
|
|
|
let mut inner = slot.inner.write().unwrap();
|
|
if let Some(key) = &inner.key {
|
|
match key {
|
|
CacheKey::ImmutableFilePage { file_id, blkno: _ }
|
|
if *file_id == drop_file_id =>
|
|
{
|
|
// remove mapping for old buffer
|
|
self.remove_mapping(key);
|
|
inner.key = None;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// Section 2: Internal interface functions for lookup/update.
|
|
//
|
|
// To add support for a new kind of "thing" to cache, you will need
|
|
// to add public interface routines above, and code to deal with the
|
|
// "mappings" after this section. But the routines in this section should
|
|
// not require changes.
|
|
|
|
/// Look up a page in the cache.
|
|
///
|
|
/// If the search criteria is not exact, *cache_key is updated with the key
|
|
/// for exact key of the returned page. (For materialized pages, that means
|
|
/// that the LSN in 'cache_key' is updated with the LSN of the returned page
|
|
/// version.)
|
|
///
|
|
/// If no page is found, returns None and *cache_key is left unmodified.
|
|
///
|
|
fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
|
let cache_key_orig = cache_key.clone();
|
|
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
|
// The page was found in the mapping. Lock the slot, and re-check
|
|
// that it's still what we expected (because we released the mapping
|
|
// lock already, another thread could have evicted the page)
|
|
let slot = &self.slots[slot_idx];
|
|
let inner = slot.inner.read().unwrap();
|
|
if inner.key.as_ref() == Some(cache_key) {
|
|
slot.inc_usage_count();
|
|
return Some(PageReadGuard(inner));
|
|
} else {
|
|
// search_mapping might have modified the search key; restore it.
|
|
*cache_key = cache_key_orig;
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Return a locked buffer for given block.
|
|
///
|
|
/// Like try_lock_for_read(), if the search criteria is not exact and the
|
|
/// page is already found in the cache, *cache_key is updated.
|
|
///
|
|
/// If the page is not found in the cache, this allocates a new buffer for
|
|
/// it. The caller may then initialize the buffer with the contents, and
|
|
/// call mark_valid().
|
|
///
|
|
/// Example usage:
|
|
///
|
|
/// ```ignore
|
|
/// let cache = page_cache::get();
|
|
///
|
|
/// match cache.lock_for_read(&key) {
|
|
/// ReadBufResult::Found(read_guard) => {
|
|
/// // The page was found in cache. Use it
|
|
/// },
|
|
/// ReadBufResult::NotFound(write_guard) => {
|
|
/// // The page was not found in cache. Read it from disk into the
|
|
/// // buffer.
|
|
/// //read_my_page_from_disk(write_guard);
|
|
///
|
|
/// // The buffer contents are now valid. Tell the page cache.
|
|
/// write_guard.mark_valid();
|
|
/// },
|
|
/// }
|
|
/// ```
|
|
///
|
|
fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
|
|
let (read_access, hit) = match cache_key {
|
|
CacheKey::MaterializedPage { .. } => {
|
|
unreachable!("Materialized pages use lookup_materialized_page")
|
|
}
|
|
CacheKey::ImmutableFilePage { .. } => (
|
|
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
|
|
&crate::metrics::PAGE_CACHE.read_hits_immutable,
|
|
),
|
|
};
|
|
read_access.inc();
|
|
|
|
let mut is_first_iteration = true;
|
|
loop {
|
|
// First check if the key already exists in the cache.
|
|
if let Some(read_guard) = self.try_lock_for_read(cache_key) {
|
|
if is_first_iteration {
|
|
hit.inc();
|
|
}
|
|
return Ok(ReadBufResult::Found(read_guard));
|
|
}
|
|
is_first_iteration = false;
|
|
|
|
// Not found. Find a victim buffer
|
|
let (slot_idx, mut inner) =
|
|
self.find_victim().context("Failed to find evict victim")?;
|
|
|
|
// Insert mapping for this. At this point, we may find that another
|
|
// thread did the same thing concurrently. In that case, we evicted
|
|
// our victim buffer unnecessarily. Put it into the free list and
|
|
// continue with the slot that the other thread chose.
|
|
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
|
// TODO: put to free list
|
|
|
|
// We now just loop back to start from beginning. This is not
|
|
// optimal, we'll perform the lookup in the mapping again, which
|
|
// is not really necessary because we already got
|
|
// 'existing_slot_idx'. But this shouldn't happen often enough
|
|
// to matter much.
|
|
continue;
|
|
}
|
|
|
|
// Make the slot ready
|
|
let slot = &self.slots[slot_idx];
|
|
inner.key = Some(cache_key.clone());
|
|
slot.usage_count.store(1, Ordering::Relaxed);
|
|
|
|
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
|
inner,
|
|
valid: false,
|
|
}));
|
|
}
|
|
}
|
|
|
|
/// Look up a page in the cache and lock it in write mode. If it's not
|
|
/// found, returns None.
|
|
///
|
|
/// When locking a page for writing, the search criteria is always "exact".
|
|
fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option<PageWriteGuard> {
|
|
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
|
|
// The page was found in the mapping. Lock the slot, and re-check
|
|
// that it's still what we expected (because we don't released the mapping
|
|
// lock already, another thread could have evicted the page)
|
|
let slot = &self.slots[slot_idx];
|
|
let inner = slot.inner.write().unwrap();
|
|
if inner.key.as_ref() == Some(cache_key) {
|
|
slot.inc_usage_count();
|
|
return Some(PageWriteGuard { inner, valid: true });
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
/// Return a write-locked buffer for given block.
|
|
///
|
|
/// Similar to lock_for_read(), but the returned buffer is write-locked and
|
|
/// may be modified by the caller even if it's already found in the cache.
|
|
fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
|
|
loop {
|
|
// First check if the key already exists in the cache.
|
|
if let Some(write_guard) = self.try_lock_for_write(cache_key) {
|
|
return Ok(WriteBufResult::Found(write_guard));
|
|
}
|
|
|
|
// Not found. Find a victim buffer
|
|
let (slot_idx, mut inner) =
|
|
self.find_victim().context("Failed to find evict victim")?;
|
|
|
|
// Insert mapping for this. At this point, we may find that another
|
|
// thread did the same thing concurrently. In that case, we evicted
|
|
// our victim buffer unnecessarily. Put it into the free list and
|
|
// continue with the slot that the other thread chose.
|
|
if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) {
|
|
// TODO: put to free list
|
|
|
|
// We now just loop back to start from beginning. This is not
|
|
// optimal, we'll perform the lookup in the mapping again, which
|
|
// is not really necessary because we already got
|
|
// 'existing_slot_idx'. But this shouldn't happen often enough
|
|
// to matter much.
|
|
continue;
|
|
}
|
|
|
|
// Make the slot ready
|
|
let slot = &self.slots[slot_idx];
|
|
inner.key = Some(cache_key.clone());
|
|
slot.usage_count.store(1, Ordering::Relaxed);
|
|
|
|
return Ok(WriteBufResult::NotFound(PageWriteGuard {
|
|
inner,
|
|
valid: false,
|
|
}));
|
|
}
|
|
}
|
|
|
|
//
|
|
// Section 3: Mapping functions
|
|
//
|
|
|
|
/// Search for a page in the cache using the given search key.
|
|
///
|
|
/// Returns the slot index, if any. If the search criteria is not exact,
|
|
/// *cache_key is updated with the actual key of the found page.
|
|
///
|
|
/// NOTE: We don't hold any lock on the mapping on return, so the slot might
|
|
/// get recycled for an unrelated page immediately after this function
|
|
/// returns. The caller is responsible for re-checking that the slot still
|
|
/// contains the page with the same key before using it.
|
|
///
|
|
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
|
match cache_key {
|
|
CacheKey::MaterializedPage { hash_key, lsn } => {
|
|
let map = self.materialized_page_map.read().unwrap();
|
|
let versions = map.get(hash_key)?;
|
|
|
|
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
|
Ok(version_idx) => version_idx,
|
|
Err(0) => return None,
|
|
Err(version_idx) => version_idx - 1,
|
|
};
|
|
let version = &versions[version_idx];
|
|
*lsn = version.lsn;
|
|
Some(version.slot_idx)
|
|
}
|
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
|
let map = self.immutable_page_map.read().unwrap();
|
|
Some(*map.get(&(*file_id, *blkno))?)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Search for a page in the cache using the given search key.
|
|
///
|
|
/// Like 'search_mapping, but performs an "exact" search. Used for
|
|
/// allocating a new buffer.
|
|
fn search_mapping_for_write(&self, key: &CacheKey) -> Option<usize> {
|
|
match key {
|
|
CacheKey::MaterializedPage { hash_key, lsn } => {
|
|
let map = self.materialized_page_map.read().unwrap();
|
|
let versions = map.get(hash_key)?;
|
|
|
|
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
|
Some(versions[version_idx].slot_idx)
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
|
let map = self.immutable_page_map.read().unwrap();
|
|
Some(*map.get(&(*file_id, *blkno))?)
|
|
}
|
|
}
|
|
}
|
|
|
|
///
|
|
/// Remove mapping for given key.
|
|
///
|
|
fn remove_mapping(&self, old_key: &CacheKey) {
|
|
match old_key {
|
|
CacheKey::MaterializedPage {
|
|
hash_key: old_hash_key,
|
|
lsn: old_lsn,
|
|
} => {
|
|
let mut map = self.materialized_page_map.write().unwrap();
|
|
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
|
let versions = old_entry.get_mut();
|
|
|
|
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
|
versions.remove(version_idx);
|
|
self.size_metrics
|
|
.current_bytes_materialized_page
|
|
.sub_page_sz(1);
|
|
if versions.is_empty() {
|
|
old_entry.remove_entry();
|
|
}
|
|
}
|
|
} else {
|
|
panic!("could not find old key in mapping")
|
|
}
|
|
}
|
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
|
let mut map = self.immutable_page_map.write().unwrap();
|
|
map.remove(&(*file_id, *blkno))
|
|
.expect("could not find old key in mapping");
|
|
self.size_metrics.current_bytes_immutable.sub_page_sz(1);
|
|
}
|
|
}
|
|
}
|
|
|
|
///
|
|
/// Insert mapping for given key.
|
|
///
|
|
/// If a mapping already existed for the given key, returns the slot index
|
|
/// of the existing mapping and leaves it untouched.
|
|
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
|
match new_key {
|
|
CacheKey::MaterializedPage {
|
|
hash_key: new_key,
|
|
lsn: new_lsn,
|
|
} => {
|
|
let mut map = self.materialized_page_map.write().unwrap();
|
|
let versions = map.entry(new_key.clone()).or_default();
|
|
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
|
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
|
Err(version_idx) => {
|
|
versions.insert(
|
|
version_idx,
|
|
Version {
|
|
lsn: *new_lsn,
|
|
slot_idx,
|
|
},
|
|
);
|
|
self.size_metrics
|
|
.current_bytes_materialized_page
|
|
.add_page_sz(1);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
|
let mut map = self.immutable_page_map.write().unwrap();
|
|
match map.entry((*file_id, *blkno)) {
|
|
Entry::Occupied(entry) => Some(*entry.get()),
|
|
Entry::Vacant(entry) => {
|
|
entry.insert(slot_idx);
|
|
self.size_metrics.current_bytes_immutable.add_page_sz(1);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//
|
|
// Section 4: Misc internal helpers
|
|
//
|
|
|
|
/// Find a slot to evict.
|
|
///
|
|
/// On return, the slot is empty and write-locked.
|
|
fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard<SlotInner>)> {
|
|
let iter_limit = self.slots.len() * 10;
|
|
let mut iters = 0;
|
|
loop {
|
|
iters += 1;
|
|
let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len();
|
|
|
|
let slot = &self.slots[slot_idx];
|
|
|
|
if slot.dec_usage_count() == 0 {
|
|
let mut inner = match slot.inner.try_write() {
|
|
Ok(inner) => inner,
|
|
Err(TryLockError::Poisoned(err)) => {
|
|
anyhow::bail!("buffer lock was poisoned: {err:?}")
|
|
}
|
|
Err(TryLockError::WouldBlock) => {
|
|
// If we have looped through the whole buffer pool 10 times
|
|
// and still haven't found a victim buffer, something's wrong.
|
|
// Maybe all the buffers were in locked. That could happen in
|
|
// theory, if you have more threads holding buffers locked than
|
|
// there are buffers in the pool. In practice, with a reasonably
|
|
// large buffer pool it really shouldn't happen.
|
|
if iters > iter_limit {
|
|
anyhow::bail!("exceeded evict iter limit");
|
|
}
|
|
continue;
|
|
}
|
|
};
|
|
if let Some(old_key) = &inner.key {
|
|
// remove mapping for old buffer
|
|
self.remove_mapping(old_key);
|
|
inner.key = None;
|
|
}
|
|
return Ok((slot_idx, inner));
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Initialize a new page cache
|
|
///
|
|
/// This should be called only once at page server startup.
|
|
fn new(num_pages: usize) -> Self {
|
|
assert!(num_pages > 0, "page cache size must be > 0");
|
|
|
|
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
|
|
|
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
|
size_metrics.max_bytes.set_page_sz(num_pages);
|
|
size_metrics.current_bytes_immutable.set_page_sz(0);
|
|
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
|
|
|
let slots = page_buffer
|
|
.chunks_exact_mut(PAGE_SZ)
|
|
.map(|chunk| {
|
|
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
|
|
|
Slot {
|
|
inner: RwLock::new(SlotInner { key: None, buf }),
|
|
usage_count: AtomicU8::new(0),
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
Self {
|
|
materialized_page_map: Default::default(),
|
|
immutable_page_map: Default::default(),
|
|
slots,
|
|
next_evict_slot: AtomicUsize::new(0),
|
|
size_metrics,
|
|
}
|
|
}
|
|
}
|
|
|
|
trait PageSzBytesMetric {
|
|
fn set_page_sz(&self, count: usize);
|
|
fn add_page_sz(&self, count: usize);
|
|
fn sub_page_sz(&self, count: usize);
|
|
}
|
|
|
|
#[inline(always)]
|
|
fn count_times_page_sz(count: usize) -> u64 {
|
|
u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap()
|
|
}
|
|
|
|
impl PageSzBytesMetric for metrics::UIntGauge {
|
|
fn set_page_sz(&self, count: usize) {
|
|
self.set(count_times_page_sz(count));
|
|
}
|
|
fn add_page_sz(&self, count: usize) {
|
|
self.add(count_times_page_sz(count));
|
|
}
|
|
fn sub_page_sz(&self, count: usize) {
|
|
self.sub(count_times_page_sz(count));
|
|
}
|
|
}
|