From 03874009ecc9148371c901ee555605869c5082e5 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 29 Jan 2024 14:35:55 +0000 Subject: [PATCH] add back page cache but not for DeltaLayerValue and ImageLayerValue --- pageserver/ctl/src/layer_map_analyzer.rs | 4 +- pageserver/ctl/src/layers.rs | 4 +- pageserver/ctl/src/main.rs | 2 + pageserver/src/bin/pageserver.rs | 3 +- pageserver/src/context.rs | 4 + pageserver/src/lib.rs | 4 +- pageserver/src/metrics.rs | 211 ++++ pageserver/src/page_cache.rs | 1001 +++++++++++++++++ pageserver/src/tenant/blob_io.rs | 3 +- pageserver/src/tenant/block_io.rs | 90 +- pageserver/src/tenant/ephemeral_file.rs | 74 +- .../src/tenant/storage_layer/delta_layer.rs | 2 +- .../src/tenant/storage_layer/image_layer.rs | 2 +- pageserver/src/tenant/timeline.rs | 60 +- pageserver/src/virtual_file.rs | 47 + 15 files changed, 1447 insertions(+), 64 deletions(-) create mode 100644 pageserver/src/page_cache.rs diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index 771d3d8a30..eb5c3f15cf 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -12,9 +12,10 @@ use std::collections::BinaryHeap; use std::ops::Range; use std::{fs, str}; +use pageserver::page_cache::PAGE_SZ; use pageserver::repository::{Key, KEY_SIZE}; use pageserver::tenant::block_io::FileBlockReader; -use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection, PAGE_SZ}; +use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; use pageserver::tenant::storage_layer::range_overlaps; use pageserver::virtual_file::{self, VirtualFile}; @@ -142,6 +143,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; let mut total_image_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index 2c43a922b4..dbbcfedac0 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -11,7 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; use pageserver::tenant::storage_layer::{delta_layer, image_layer}; use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; -use pageserver::virtual_file; +use pageserver::{page_cache, virtual_file}; use pageserver::{ repository::{Key, KEY_SIZE}, tenant::{ @@ -60,6 +60,7 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; @@ -187,6 +188,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_timeline_id, } => { pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index cd7e6bc728..3c90933fe9 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -15,6 +15,7 @@ use index_part::IndexPartCmd; use layers::LayerCmd; use pageserver::{ context::{DownloadBehavior, RequestContext}, + page_cache, task_mgr::TaskKind, tenant::{dump_layerfile_from_path, metadata::TimelineMetadata}, virtual_file, @@ -123,6 +124,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup virtual_file::init(10, virtual_file::IoEngineKind::StdFs); + page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 3727483cb8..84de76e55e 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -24,7 +24,7 @@ use pageserver::{ config::{defaults::*, PageServerConf}, context::{DownloadBehavior, RequestContext}, deletion_queue::DeletionQueue, - http, page_service, task_mgr, + http, page_cache, page_service, task_mgr, task_mgr::TaskKind, task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, @@ -131,6 +131,7 @@ fn main() -> anyhow::Result<()> { // Basic initialization of things that don't change after startup virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); + page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index ba820bb62b..ee331ea154 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -282,4 +282,8 @@ impl RequestContext { pub(crate) fn access_stats_behavior(&self) -> AccessStatsBehavior { self.access_stats_behavior } + + pub(crate) fn page_content_kind(&self) -> PageContentKind { + self.page_content_kind + } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 654b53e0a3..35a7e4738e 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -12,7 +12,9 @@ pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; pub use pageserver_api::keyspace; +pub(crate) mod buffer_pool; pub mod metrics; +pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; pub mod repository; @@ -30,8 +32,6 @@ use camino::Utf8Path; use deletion_queue::DeletionQueue; use tracing::info; -pub mod buffer_pool; - /// Current storage format version /// /// This is embedded in the header of all the layer files. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 84e66b132a..9b3679e3c2 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -187,6 +187,216 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy = Lazy::new(|| } }); +pub(crate) struct PageCacheMetricsForTaskKind { + pub read_accesses_materialized_page: IntCounter, + pub read_accesses_immutable: IntCounter, + + pub read_hits_immutable: IntCounter, + pub read_hits_materialized_page_exact: IntCounter, + pub read_hits_materialized_page_older_lsn: IntCounter, +} + +pub(crate) struct PageCacheMetrics { + map: EnumMap>, +} + +static PAGE_CACHE_READ_HITS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_page_cache_read_hits_total", + "Number of read accesses to the page cache that hit", + &["task_kind", "key_kind", "content_kind", "hit_kind"] + ) + .expect("failed to define a metric") +}); + +static PAGE_CACHE_READ_ACCESSES: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_page_cache_read_accesses_total", + "Number of read accesses to the page cache", + &["task_kind", "key_kind", "content_kind"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static PAGE_CACHE: Lazy = Lazy::new(|| PageCacheMetrics { + map: EnumMap::from_array(std::array::from_fn(|task_kind| { + let task_kind = ::from_usize(task_kind); + let task_kind: &'static str = task_kind.into(); + EnumMap::from_array(std::array::from_fn(|content_kind| { + let content_kind = ::from_usize(content_kind); + let content_kind: &'static str = content_kind.into(); + PageCacheMetricsForTaskKind { + read_accesses_materialized_page: { + PAGE_CACHE_READ_ACCESSES + .get_metric_with_label_values(&[ + task_kind, + "materialized_page", + content_kind, + ]) + .unwrap() + }, + + read_accesses_immutable: { + PAGE_CACHE_READ_ACCESSES + .get_metric_with_label_values(&[task_kind, "immutable", content_kind]) + .unwrap() + }, + + read_hits_immutable: { + PAGE_CACHE_READ_HITS + .get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"]) + .unwrap() + }, + + read_hits_materialized_page_exact: { + PAGE_CACHE_READ_HITS + .get_metric_with_label_values(&[ + task_kind, + "materialized_page", + content_kind, + "exact", + ]) + .unwrap() + }, + + read_hits_materialized_page_older_lsn: { + PAGE_CACHE_READ_HITS + .get_metric_with_label_values(&[ + task_kind, + "materialized_page", + content_kind, + "older_lsn", + ]) + .unwrap() + }, + } + })) + })), +}); + +impl PageCacheMetrics { + pub(crate) fn for_ctx(&self, ctx: &RequestContext) -> &PageCacheMetricsForTaskKind { + &self.map[ctx.task_kind()][ctx.page_content_kind()] + } +} + +pub(crate) struct PageCacheSizeMetrics { + pub max_bytes: UIntGauge, + + pub current_bytes_immutable: UIntGauge, + pub current_bytes_materialized_page: UIntGauge, +} + +static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "pageserver_page_cache_size_current_bytes", + "Current size of the page cache in bytes, by key kind", + &["key_kind"] + ) + .expect("failed to define a metric") +}); + +pub(crate) static PAGE_CACHE_SIZE: Lazy = + Lazy::new(|| PageCacheSizeMetrics { + max_bytes: { + register_uint_gauge!( + "pageserver_page_cache_size_max_bytes", + "Maximum size of the page cache in bytes" + ) + .expect("failed to define a metric") + }, + current_bytes_immutable: { + PAGE_CACHE_SIZE_CURRENT_BYTES + .get_metric_with_label_values(&["immutable"]) + .unwrap() + }, + current_bytes_materialized_page: { + PAGE_CACHE_SIZE_CURRENT_BYTES + .get_metric_with_label_values(&["materialized_page"]) + .unwrap() + }, + }); + +pub(crate) mod page_cache_eviction_metrics { + use std::num::NonZeroUsize; + + use metrics::{register_int_counter_vec, IntCounter, IntCounterVec}; + use once_cell::sync::Lazy; + + #[derive(Clone, Copy)] + pub(crate) enum Outcome { + FoundSlotUnused { iters: NonZeroUsize }, + FoundSlotEvicted { iters: NonZeroUsize }, + ItersExceeded { iters: NonZeroUsize }, + } + + static ITERS_TOTAL_VEC: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_page_cache_find_victim_iters_total", + "Counter for the number of iterations in the find_victim loop", + &["outcome"], + ) + .expect("failed to define a metric") + }); + + static CALLS_VEC: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_page_cache_find_victim_calls", + "Incremented at the end of each find_victim() call.\ + Filter by outcome to get e.g., eviction rate.", + &["outcome"] + ) + .unwrap() + }); + + pub(crate) fn observe(outcome: Outcome) { + macro_rules! dry { + ($label:literal, $iters:expr) => {{ + static LABEL: &'static str = $label; + static ITERS_TOTAL: Lazy = + Lazy::new(|| ITERS_TOTAL_VEC.with_label_values(&[LABEL])); + static CALLS: Lazy = + Lazy::new(|| CALLS_VEC.with_label_values(&[LABEL])); + ITERS_TOTAL.inc_by(($iters.get()) as u64); + CALLS.inc(); + }}; + } + match outcome { + Outcome::FoundSlotUnused { iters } => dry!("found_empty", iters), + Outcome::FoundSlotEvicted { iters } => { + dry!("found_evicted", iters) + } + Outcome::ItersExceeded { iters } => { + dry!("err_iters_exceeded", iters); + super::page_cache_errors_inc(super::PageCacheErrorKind::EvictIterLimit); + } + } + } +} + +static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "page_cache_errors_total", + "Number of timeouts while acquiring a pinned slot in the page cache", + &["error_kind"] + ) + .expect("failed to define a metric") +}); + +#[derive(IntoStaticStr)] +#[strum(serialize_all = "kebab_case")] +pub(crate) enum PageCacheErrorKind { + AcquirePinnedSlotTimeout, + EvictIterLimit, +} + +pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { + PAGE_CACHE_ERRORS + .get_metric_with_label_values(&[error_kind.into()]) + .unwrap() + .inc(); +} + pub(crate) static WAIT_LSN_TIME: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wait_lsn_seconds", @@ -1788,6 +1998,7 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use crate::context::{PageContentKind, RequestContext}; use crate::task_mgr::TaskKind; /// Maintain a per timeline gauge in addition to the global gauge. diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs new file mode 100644 index 0000000000..28d2584bf4 --- /dev/null +++ b/pageserver/src/page_cache.rs @@ -0,0 +1,1001 @@ +//! +//! Global page cache +//! +//! The page cache uses up most of the memory in the page server. It is shared +//! by all tenants, and it is used to store different kinds of pages. Sharing +//! the cache allows memory to be dynamically allocated where it's needed the +//! most. +//! +//! The page cache consists of fixed-size buffers, 8 kB each to match the +//! PostgreSQL buffer size, and a Slot struct for each buffer to contain +//! information about what's stored in the buffer. +//! +//! # Types Of Pages +//! +//! [`PageCache`] only supports immutable pages. +//! Hence there is no need to worry about coherency. +//! +//! Two types of pages are supported: +//! +//! * **Materialized pages**, filled & used by page reconstruction +//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`]. +//! +//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only. +//! It uses the page cache only for the blocks that are already fully written and immutable. +//! +//! # Filling The Page Cache +//! +//! Page cache maps from a cache key to a buffer slot. +//! The cache key uniquely identifies the piece of data that is being cached. +//! +//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`]. +//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access. +//! +//! The cache key for **immutable file** pages is [`FileId`] and a block number. +//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following: +//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`]. +//! * Get a [`FileId`] using [`next_file_id`]. +//! * Use the mechanism to associate the on-disk file with the returned [`FileId`]. +//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`]. +//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains +//! a read guard for the page. Just use it. +//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains +//! a write guard for the page. Fill the page with the contents of the on-disk file. +//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid. +//! Then try again to [`PageCache::read_immutable_buf`]. +//! Unless there's high cache pressure, the page should now be cached. +//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.) +//! +//! # Locking +//! +//! There are two levels of locking involved: There's one lock for the "mapping" +//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer +//! slot, and a separate lock on each slot. To read or write the contents of a +//! slot, you must hold the lock on the slot in read or write mode, +//! respectively. To change the mapping of a slot, i.e. to evict a page or to +//! assign a buffer for a page, you must hold the mapping lock and the lock on +//! the slot at the same time. +//! +//! Whenever you need to hold both locks simultaneously, the slot lock must be +//! acquired first. This consistent ordering avoids deadlocks. To look up a page +//! in the cache, you would first look up the mapping, while holding the mapping +//! lock, and then lock the slot. You must release the mapping lock in between, +//! to obey the lock ordering and avoid deadlock. +//! +//! A slot can momentarily have invalid contents, even if it's already been +//! inserted to the mapping, but you must hold the write-lock on the slot until +//! the contents are valid. If you need to release the lock without initializing +//! the contents, you must remove the mapping first. We make that easy for the +//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has +//! initialized it. If the guard is dropped without calling mark_valid(), the +//! mapping is automatically removed and the slot is marked free. +//! + +use std::{ + collections::{hash_map::Entry, HashMap}, + convert::TryInto, + sync::{ + atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, + Arc, Weak, + }, + time::Duration, +}; + +use anyhow::Context; +use once_cell::sync::OnceCell; +use pageserver_api::shard::TenantShardId; +use utils::{id::TimelineId, lsn::Lsn}; + +use crate::{ + context::RequestContext, + metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics}, + repository::Key, +}; + +static PAGE_CACHE: OnceCell = OnceCell::new(); +const TEST_PAGE_CACHE_SIZE: usize = 50; + +/// +/// Initialize the page cache. This must be called once at page server startup. +/// +pub fn init(size: usize) { + if PAGE_CACHE.set(PageCache::new(size)).is_err() { + panic!("page cache already initialized"); + } +} + +/// +/// Get a handle to the page cache. +/// +pub fn get() -> &'static PageCache { + // + // In unit tests, page server startup doesn't happen and no one calls + // page_cache::init(). Initialize it here with a tiny cache, so that the + // page cache is usable in unit tests. + // + if cfg!(test) { + PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE)) + } else { + PAGE_CACHE.get().expect("page cache not initialized") + } +} + +pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; +const MAX_USAGE_COUNT: u8 = 5; + +/// See module-level comment. +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +pub struct FileId(u64); + +static NEXT_ID: AtomicU64 = AtomicU64::new(1); + +/// See module-level comment. +pub fn next_file_id() -> FileId { + FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) +} + +/// +/// CacheKey uniquely identifies a "thing" to cache in the page cache. +/// +#[derive(Debug, PartialEq, Eq, Clone)] +#[allow(clippy::enum_variant_names)] +enum CacheKey { + MaterializedPage { + hash_key: MaterializedPageHashKey, + lsn: Lsn, + }, + ImmutableFilePage { + file_id: FileId, + blkno: u32, + }, +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +struct MaterializedPageHashKey { + /// Why is this TenantShardId rather than TenantId? + /// + /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this + /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this + /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are + /// special-cased in some other way. + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + key: Key, +} + +#[derive(Clone)] +struct Version { + lsn: Lsn, + slot_idx: usize, +} + +struct Slot { + inner: tokio::sync::RwLock, + usage_count: AtomicU8, +} + +struct SlotInner { + key: Option, + // for `coalesce_readers_permit` + permit: std::sync::Mutex>, + buf: &'static mut [u8; PAGE_SZ], +} + +impl Slot { + /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT. + fn inc_usage_count(&self) { + let _ = self + .usage_count + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { + if val == MAX_USAGE_COUNT { + None + } else { + Some(val + 1) + } + }); + } + + /// Decrement usage count on the buffer, unless it's already zero. Returns + /// the old usage count. + fn dec_usage_count(&self) -> u8 { + let count_res = + self.usage_count + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { + if val == 0 { + None + } else { + Some(val - 1) + } + }); + + match count_res { + Ok(usage_count) => usage_count, + Err(usage_count) => usage_count, + } + } + + /// Sets the usage count to a specific value. + fn set_usage_count(&self, count: u8) { + self.usage_count.store(count, Ordering::Relaxed); + } +} + +impl SlotInner { + /// If there is aready a reader, drop our permit and share its permit, just like we share read access. + fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc { + let mut guard = self.permit.lock().unwrap(); + if let Some(existing_permit) = guard.upgrade() { + drop(guard); + drop(permit); + existing_permit + } else { + let permit = Arc::new(permit); + *guard = Arc::downgrade(&permit); + permit + } + } +} + +pub struct PageCache { + /// This contains the mapping from the cache key to buffer slot that currently + /// contains the page, if any. + /// + /// TODO: This is protected by a single lock. If that becomes a bottleneck, + /// this HashMap can be replaced with a more concurrent version, there are + /// plenty of such crates around. + /// + /// If you add support for caching different kinds of objects, each object kind + /// can have a separate mapping map, next to this field. + materialized_page_map: std::sync::RwLock>>, + + immutable_page_map: std::sync::RwLock>, + + /// The actual buffers with their metadata. + slots: Box<[Slot]>, + + pinned_slots: Arc, + + /// Index of the next candidate to evict, for the Clock replacement algorithm. + /// This is interpreted modulo the page cache size. + next_evict_slot: AtomicUsize, + + size_metrics: &'static PageCacheSizeMetrics, +} + +struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); + +/// +/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked +/// until the guard is dropped. +/// +pub struct PageReadGuard<'i> { + _permit: Arc, + slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>, +} + +impl std::ops::Deref for PageReadGuard<'_> { + type Target = [u8; PAGE_SZ]; + + fn deref(&self) -> &Self::Target { + self.slot_guard.buf + } +} + +impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { + fn as_ref(&self) -> &[u8; PAGE_SZ] { + self.slot_guard.buf + } +} + +/// +/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked +/// until the guard is dropped. +/// +/// Counterintuitively, this is used even for a read, if the requested page is not +/// currently found in the page cache. In that case, the caller of lock_for_read() +/// is expected to fill in the page contents and call mark_valid(). +pub struct PageWriteGuard<'i> { + state: PageWriteGuardState<'i>, +} + +enum PageWriteGuardState<'i> { + Invalid { + inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, + _permit: PinnedSlotsPermit, + }, + Downgraded, +} + +impl std::ops::DerefMut for PageWriteGuard<'_> { + fn deref_mut(&mut self) -> &mut Self::Target { + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } + } +} + +impl std::ops::Deref for PageWriteGuard<'_> { + type Target = [u8; PAGE_SZ]; + + fn deref(&self) -> &Self::Target { + match &self.state { + PageWriteGuardState::Invalid { inner, _permit } => inner.buf, + PageWriteGuardState::Downgraded => unreachable!(), + } + } +} + +impl<'a> PageWriteGuard<'a> { + /// Mark that the buffer contents are now valid. + #[must_use] + pub fn mark_valid(mut self) -> PageReadGuard<'a> { + let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); + match prev { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + PageReadGuard { + _permit: Arc::new(_permit), + slot_guard: inner.downgrade(), + } + } + PageWriteGuardState::Downgraded => unreachable!(), + } + } +} + +impl Drop for PageWriteGuard<'_> { + /// + /// If the buffer was allocated for a page that was not already in the + /// cache, but the lock_for_read/write() caller dropped the buffer without + /// initializing it, remove the mapping from the page cache. + /// + fn drop(&mut self) { + match &mut self.state { + PageWriteGuardState::Invalid { inner, _permit } => { + assert!(inner.key.is_some()); + let self_key = inner.key.as_ref().unwrap(); + PAGE_CACHE.get().unwrap().remove_mapping(self_key); + inner.key = None; + } + PageWriteGuardState::Downgraded => {} + } + } +} + +/// lock_for_read() return value +pub enum ReadBufResult<'a> { + Found(PageReadGuard<'a>), + NotFound(PageWriteGuard<'a>), +} + +impl PageCache { + // + // Section 1.1: Public interface functions for looking up and memorizing materialized page + // versions in the page cache + // + + /// Look up a materialized page version. + /// + /// The 'lsn' is an upper bound, this will return the latest version of + /// the given block, but not newer than 'lsn'. Returns the actual LSN of the + /// returned page. + pub async fn lookup_materialized_page( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + key: &Key, + lsn: Lsn, + ctx: &RequestContext, + ) -> Option<(Lsn, PageReadGuard)> { + let Ok(permit) = self.try_get_pinned_slot_permit().await else { + return None; + }; + + crate::metrics::PAGE_CACHE + .for_ctx(ctx) + .read_accesses_materialized_page + .inc(); + + let mut cache_key = CacheKey::MaterializedPage { + hash_key: MaterializedPageHashKey { + tenant_shard_id, + timeline_id, + key: *key, + }, + lsn, + }; + + if let Some(guard) = self + .try_lock_for_read(&mut cache_key, &mut Some(permit)) + .await + { + if let CacheKey::MaterializedPage { + hash_key: _, + lsn: available_lsn, + } = cache_key + { + if available_lsn == lsn { + crate::metrics::PAGE_CACHE + .for_ctx(ctx) + .read_hits_materialized_page_exact + .inc(); + } else { + crate::metrics::PAGE_CACHE + .for_ctx(ctx) + .read_hits_materialized_page_older_lsn + .inc(); + } + Some((available_lsn, guard)) + } else { + panic!("unexpected key type in slot"); + } + } else { + None + } + } + + /// + /// Store an image of the given page in the cache. + /// + pub async fn memorize_materialized_page( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + key: Key, + lsn: Lsn, + img: &[u8], + ) -> anyhow::Result<()> { + let cache_key = CacheKey::MaterializedPage { + hash_key: MaterializedPageHashKey { + tenant_shard_id, + timeline_id, + key, + }, + lsn, + }; + + let mut permit = Some(self.try_get_pinned_slot_permit().await?); + loop { + // First check if the key already exists in the cache. + if let Some(slot_idx) = self.search_mapping_exact(&cache_key) { + // The page was found in the mapping. Lock the slot, and re-check + // that it's still what we expected (because we don't released the mapping + // lock already, another thread could have evicted the page) + let slot = &self.slots[slot_idx]; + let inner = slot.inner.write().await; + if inner.key.as_ref() == Some(&cache_key) { + slot.inc_usage_count(); + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + debug_assert_eq!(inner.buf.len(), img.len()); + // We already had it in cache. Another thread must've put it there + // concurrently. Check that it had the same contents that we + // replayed. + assert!(inner.buf == img); + return Ok(()); + } + } + debug_assert!(permit.is_some()); + + // Not found. Find a victim buffer + let (slot_idx, mut inner) = self + .find_victim(permit.as_ref().unwrap()) + .await + .context("Failed to find evict victim")?; + + // Insert mapping for this. At this point, we may find that another + // thread did the same thing concurrently. In that case, we evicted + // our victim buffer unnecessarily. Put it into the free list and + // continue with the slot that the other thread chose. + if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) { + // TODO: put to free list + + // We now just loop back to start from beginning. This is not + // optimal, we'll perform the lookup in the mapping again, which + // is not really necessary because we already got + // 'existing_slot_idx'. But this shouldn't happen often enough + // to matter much. + continue; + } + + // Make the slot ready + let slot = &self.slots[slot_idx]; + inner.key = Some(cache_key.clone()); + slot.set_usage_count(1); + // Create a write guard for the slot so we go through the expected motions. + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + let mut write_guard = PageWriteGuard { + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, + }; + write_guard.copy_from_slice(img); + let _ = write_guard.mark_valid(); + return Ok(()); + } + } + + // Section 1.2: Public interface functions for working with immutable file pages. + + pub async fn read_immutable_buf( + &self, + file_id: FileId, + blkno: u32, + ctx: &RequestContext, + ) -> anyhow::Result { + let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; + + self.lock_for_read(&mut cache_key, ctx).await + } + + // + // Section 2: Internal interface functions for lookup/update. + // + // To add support for a new kind of "thing" to cache, you will need + // to add public interface routines above, and code to deal with the + // "mappings" after this section. But the routines in this section should + // not require changes. + + async fn try_get_pinned_slot_permit(&self) -> anyhow::Result { + match tokio::time::timeout( + // Choose small timeout, neon_smgr does its own retries. + // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869 + Duration::from_secs(10), + Arc::clone(&self.pinned_slots).acquire_owned(), + ) + .await + { + Ok(res) => Ok(PinnedSlotsPermit( + res.expect("this semaphore is never closed"), + )), + Err(_timeout) => { + crate::metrics::page_cache_errors_inc( + crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout, + ); + anyhow::bail!("timeout: there were page guards alive for all page cache slots") + } + } + } + + /// Look up a page in the cache. + /// + /// If the search criteria is not exact, *cache_key is updated with the key + /// for exact key of the returned page. (For materialized pages, that means + /// that the LSN in 'cache_key' is updated with the LSN of the returned page + /// version.) + /// + /// If no page is found, returns None and *cache_key is left unmodified. + /// + async fn try_lock_for_read( + &self, + cache_key: &mut CacheKey, + permit: &mut Option, + ) -> Option { + let cache_key_orig = cache_key.clone(); + if let Some(slot_idx) = self.search_mapping(cache_key) { + // The page was found in the mapping. Lock the slot, and re-check + // that it's still what we expected (because we released the mapping + // lock already, another thread could have evicted the page) + let slot = &self.slots[slot_idx]; + let inner = slot.inner.read().await; + if inner.key.as_ref() == Some(cache_key) { + slot.inc_usage_count(); + return Some(PageReadGuard { + _permit: inner.coalesce_readers_permit(permit.take().unwrap()), + slot_guard: inner, + }); + } else { + // search_mapping might have modified the search key; restore it. + *cache_key = cache_key_orig; + } + } + None + } + + /// Return a locked buffer for given block. + /// + /// Like try_lock_for_read(), if the search criteria is not exact and the + /// page is already found in the cache, *cache_key is updated. + /// + /// If the page is not found in the cache, this allocates a new buffer for + /// it. The caller may then initialize the buffer with the contents, and + /// call mark_valid(). + /// + /// Example usage: + /// + /// ```ignore + /// let cache = page_cache::get(); + /// + /// match cache.lock_for_read(&key) { + /// ReadBufResult::Found(read_guard) => { + /// // The page was found in cache. Use it + /// }, + /// ReadBufResult::NotFound(write_guard) => { + /// // The page was not found in cache. Read it from disk into the + /// // buffer. + /// //read_my_page_from_disk(write_guard); + /// + /// // The buffer contents are now valid. Tell the page cache. + /// write_guard.mark_valid(); + /// }, + /// } + /// ``` + /// + async fn lock_for_read( + &self, + cache_key: &mut CacheKey, + ctx: &RequestContext, + ) -> anyhow::Result { + let mut permit = Some(self.try_get_pinned_slot_permit().await?); + + let (read_access, hit) = match cache_key { + CacheKey::MaterializedPage { .. } => { + unreachable!("Materialized pages use lookup_materialized_page") + } + CacheKey::ImmutableFilePage { .. } => ( + &crate::metrics::PAGE_CACHE + .for_ctx(ctx) + .read_accesses_immutable, + &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable, + ), + }; + read_access.inc(); + + let mut is_first_iteration = true; + loop { + // First check if the key already exists in the cache. + if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await { + debug_assert!(permit.is_none()); + if is_first_iteration { + hit.inc(); + } + return Ok(ReadBufResult::Found(read_guard)); + } + debug_assert!(permit.is_some()); + is_first_iteration = false; + + // Not found. Find a victim buffer + let (slot_idx, mut inner) = self + .find_victim(permit.as_ref().unwrap()) + .await + .context("Failed to find evict victim")?; + + // Insert mapping for this. At this point, we may find that another + // thread did the same thing concurrently. In that case, we evicted + // our victim buffer unnecessarily. Put it into the free list and + // continue with the slot that the other thread chose. + if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) { + // TODO: put to free list + + // We now just loop back to start from beginning. This is not + // optimal, we'll perform the lookup in the mapping again, which + // is not really necessary because we already got + // 'existing_slot_idx'. But this shouldn't happen often enough + // to matter much. + continue; + } + + // Make the slot ready + let slot = &self.slots[slot_idx]; + inner.key = Some(cache_key.clone()); + slot.set_usage_count(1); + + debug_assert!( + { + let guard = inner.permit.lock().unwrap(); + guard.upgrade().is_none() + }, + "we hold a write lock, so, no one else should have a permit" + ); + + return Ok(ReadBufResult::NotFound(PageWriteGuard { + state: PageWriteGuardState::Invalid { + _permit: permit.take().unwrap(), + inner, + }, + })); + } + } + + // + // Section 3: Mapping functions + // + + /// Search for a page in the cache using the given search key. + /// + /// Returns the slot index, if any. If the search criteria is not exact, + /// *cache_key is updated with the actual key of the found page. + /// + /// NOTE: We don't hold any lock on the mapping on return, so the slot might + /// get recycled for an unrelated page immediately after this function + /// returns. The caller is responsible for re-checking that the slot still + /// contains the page with the same key before using it. + /// + fn search_mapping(&self, cache_key: &mut CacheKey) -> Option { + match cache_key { + CacheKey::MaterializedPage { hash_key, lsn } => { + let map = self.materialized_page_map.read().unwrap(); + let versions = map.get(hash_key)?; + + let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) { + Ok(version_idx) => version_idx, + Err(0) => return None, + Err(version_idx) => version_idx - 1, + }; + let version = &versions[version_idx]; + *lsn = version.lsn; + Some(version.slot_idx) + } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let map = self.immutable_page_map.read().unwrap(); + Some(*map.get(&(*file_id, *blkno))?) + } + } + } + + /// Search for a page in the cache using the given search key. + /// + /// Like 'search_mapping, but performs an "exact" search. Used for + /// allocating a new buffer. + fn search_mapping_exact(&self, key: &CacheKey) -> Option { + match key { + CacheKey::MaterializedPage { hash_key, lsn } => { + let map = self.materialized_page_map.read().unwrap(); + let versions = map.get(hash_key)?; + + if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) { + Some(versions[version_idx].slot_idx) + } else { + None + } + } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let map = self.immutable_page_map.read().unwrap(); + Some(*map.get(&(*file_id, *blkno))?) + } + } + } + + /// + /// Remove mapping for given key. + /// + fn remove_mapping(&self, old_key: &CacheKey) { + match old_key { + CacheKey::MaterializedPage { + hash_key: old_hash_key, + lsn: old_lsn, + } => { + let mut map = self.materialized_page_map.write().unwrap(); + if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) { + let versions = old_entry.get_mut(); + + if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) { + versions.remove(version_idx); + self.size_metrics + .current_bytes_materialized_page + .sub_page_sz(1); + if versions.is_empty() { + old_entry.remove_entry(); + } + } + } else { + panic!("could not find old key in mapping") + } + } + CacheKey::ImmutableFilePage { file_id, blkno } => { + let mut map = self.immutable_page_map.write().unwrap(); + map.remove(&(*file_id, *blkno)) + .expect("could not find old key in mapping"); + self.size_metrics.current_bytes_immutable.sub_page_sz(1); + } + } + } + + /// + /// Insert mapping for given key. + /// + /// If a mapping already existed for the given key, returns the slot index + /// of the existing mapping and leaves it untouched. + fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option { + match new_key { + CacheKey::MaterializedPage { + hash_key: new_key, + lsn: new_lsn, + } => { + let mut map = self.materialized_page_map.write().unwrap(); + let versions = map.entry(new_key.clone()).or_default(); + match versions.binary_search_by_key(new_lsn, |v| v.lsn) { + Ok(version_idx) => Some(versions[version_idx].slot_idx), + Err(version_idx) => { + versions.insert( + version_idx, + Version { + lsn: *new_lsn, + slot_idx, + }, + ); + self.size_metrics + .current_bytes_materialized_page + .add_page_sz(1); + None + } + } + } + + CacheKey::ImmutableFilePage { file_id, blkno } => { + let mut map = self.immutable_page_map.write().unwrap(); + match map.entry((*file_id, *blkno)) { + Entry::Occupied(entry) => Some(*entry.get()), + Entry::Vacant(entry) => { + entry.insert(slot_idx); + self.size_metrics.current_bytes_immutable.add_page_sz(1); + None + } + } + } + } + } + + // + // Section 4: Misc internal helpers + // + + /// Find a slot to evict. + /// + /// On return, the slot is empty and write-locked. + async fn find_victim( + &self, + _permit_witness: &PinnedSlotsPermit, + ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { + let iter_limit = self.slots.len() * 10; + let mut iters = 0; + loop { + iters += 1; + let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len(); + + let slot = &self.slots[slot_idx]; + + if slot.dec_usage_count() == 0 { + let mut inner = match slot.inner.try_write() { + Ok(inner) => inner, + Err(_err) => { + if iters > iter_limit { + // NB: Even with the permits, there's no hard guarantee that we will find a slot with + // any particular number of iterations: other threads might race ahead and acquire and + // release pins just as we're scanning the array. + // + // Imagine that nslots is 2, and as starting point, usage_count==1 on all + // slots. There are two threads running concurrently, A and B. A has just + // acquired the permit from the semaphore. + // + // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search + // B: Acquire permit. + // B: Look at slot 2, decrement its usage_count to zero and continue the search + // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. + // B: Release pin and permit again + // B: Acquire permit. + // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. + // B: Release pin and permit again + // + // Now we're back in the starting situation that both slots have + // usage_count 1, but A has now been through one iteration of the + // find_victim() loop. This can repeat indefinitely and on each + // iteration, A's iteration count increases by one. + // + // So, even though the semaphore for the permits is fair, the victim search + // itself happens in parallel and is not fair. + // Hence even with a permit, a task can theoretically be starved. + // To avoid this, we'd need tokio to give priority to tasks that are holding + // permits for longer. + // Note that just yielding to tokio during iteration without such + // priority boosting is likely counter-productive. We'd just give more opportunities + // for B to bump usage count, further starving A. + page_cache_eviction_metrics::observe( + page_cache_eviction_metrics::Outcome::ItersExceeded { + iters: iters.try_into().unwrap(), + }, + ); + anyhow::bail!("exceeded evict iter limit"); + } + continue; + } + }; + if let Some(old_key) = &inner.key { + // remove mapping for old buffer + self.remove_mapping(old_key); + inner.key = None; + page_cache_eviction_metrics::observe( + page_cache_eviction_metrics::Outcome::FoundSlotEvicted { + iters: iters.try_into().unwrap(), + }, + ); + } else { + page_cache_eviction_metrics::observe( + page_cache_eviction_metrics::Outcome::FoundSlotUnused { + iters: iters.try_into().unwrap(), + }, + ); + } + return Ok((slot_idx, inner)); + } + } + } + + /// Initialize a new page cache + /// + /// This should be called only once at page server startup. + fn new(num_pages: usize) -> Self { + assert!(num_pages > 0, "page cache size must be > 0"); + + // We could use Vec::leak here, but that potentially also leaks + // uninitialized reserved capacity. With into_boxed_slice and Box::leak + // this is avoided. + let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice()); + + let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; + size_metrics.max_bytes.set_page_sz(num_pages); + size_metrics.current_bytes_immutable.set_page_sz(0); + size_metrics.current_bytes_materialized_page.set_page_sz(0); + + let slots = page_buffer + .chunks_exact_mut(PAGE_SZ) + .map(|chunk| { + let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); + + Slot { + inner: tokio::sync::RwLock::new(SlotInner { + key: None, + buf, + permit: std::sync::Mutex::new(Weak::new()), + }), + usage_count: AtomicU8::new(0), + } + }) + .collect(); + + Self { + materialized_page_map: Default::default(), + immutable_page_map: Default::default(), + slots, + next_evict_slot: AtomicUsize::new(0), + size_metrics, + pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), + } + } +} + +trait PageSzBytesMetric { + fn set_page_sz(&self, count: usize); + fn add_page_sz(&self, count: usize); + fn sub_page_sz(&self, count: usize); +} + +#[inline(always)] +fn count_times_page_sz(count: usize) -> u64 { + u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap() +} + +impl PageSzBytesMetric for metrics::UIntGauge { + fn set_page_sz(&self, count: usize) { + self.set(count_times_page_sz(count)); + } + fn add_page_sz(&self, count: usize) { + self.add(count_times_page_sz(count)); + } + fn sub_page_sz(&self, count: usize) { + self.sub(count_times_page_sz(count)); + } +} diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 3277b2d0a5..6de2e95055 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -12,13 +12,12 @@ //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use crate::context::RequestContext; +use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; -use super::disk_btree::PAGE_SZ; - impl<'a> BlockCursor<'a> { /// Read a blob into a new buffer. pub async fn read_blob( diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index da7c6cc2d8..b17bbbc6e5 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,7 +5,7 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::tenant::disk_btree::PAGE_SZ; +use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; @@ -35,7 +35,8 @@ where /// Reference to an in-memory copy of an immutable on-disk block. pub enum BlockLease<'a> { - PageReadGuard(crate::buffer_pool::Buffer), + PageReadGuard(PageReadGuard<'static>), + BufferPool(crate::buffer_pool::Buffer), EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), @@ -43,8 +44,8 @@ pub enum BlockLease<'a> { Vec(Vec), } -impl From for BlockLease<'static> { - fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> { +impl From> for BlockLease<'static> { + fn from(value: PageReadGuard<'static>) -> BlockLease<'static> { BlockLease::PageReadGuard(value) } } @@ -62,6 +63,7 @@ impl<'a> Deref for BlockLease<'a> { fn deref(&self) -> &Self::Target { match self { BlockLease::PageReadGuard(v) => v.deref(), + BlockLease::BufferPool(buf) => buf.deref(), BlockLease::EphemeralFileMutableTail(v) => v, #[cfg(test)] BlockLease::Arc(v) => v.deref(), @@ -162,27 +164,18 @@ impl<'a> BlockCursor<'a> { /// for modifying the file, nor for invalidating the cache if it is modified. pub struct FileBlockReader { pub file: VirtualFile, + + /// Unique ID of this file, used as key in the page cache. + file_id: page_cache::FileId, } impl FileBlockReader { pub fn new(file: VirtualFile) -> Self { - FileBlockReader { file } - } - /// Read a page from the underlying file into given buffer. - async fn fill_buffer( - &self, - buf: crate::buffer_pool::Buffer, - blkno: u32, - ) -> Result { - assert!(buf.len() == PAGE_SZ); - self.file - .read_exact_at( - crate::buffer_pool::PageWriteGuardBuf::new(buf), - blkno as u64 * PAGE_SZ as u64, - ) - .await - .map(|guard| guard.assume_init()) + let file_id = page_cache::next_file_id(); + + FileBlockReader { file_id, file } } + /// Read a block. /// /// Returns a "lease" object that can be used to @@ -191,12 +184,59 @@ impl FileBlockReader { pub async fn read_blk( &self, blknum: u32, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> Result { - let buf = crate::buffer_pool::get(); - // Read the page from disk into the buffer - let write_guard = self.fill_buffer(buf, blknum).await?; - Ok(BlockLease::PageReadGuard(write_guard)) + match ctx.page_content_kind() { + crate::context::PageContentKind::InMemoryLayer => { + unreachable!("this happens in inmemory_layer.rs") + } + crate::context::PageContentKind::Unknown + | crate::context::PageContentKind::DeltaLayerBtreeNode + | crate::context::PageContentKind::ImageLayerBtreeNode => { + let cache = page_cache::get(); + match cache + .read_immutable_buf(self.file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to read immutable buf: {e:#}"), + ) + })? { + ReadBufResult::Found(guard) => Ok(guard.into()), + ReadBufResult::NotFound(write_guard) => { + // Read the page from disk into the buffer + let write_guard = async move { + assert!(write_guard.len() == PAGE_SZ); + self.file + .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) + .await + } + .await?; + Ok(write_guard.mark_valid().into()) + } + } + } + crate::context::PageContentKind::ImageLayerValue + | crate::context::PageContentKind::DeltaLayerValue => { + let buf = crate::buffer_pool::get(); + // Read the page from disk into the buffer + let buf = async move { + assert_eq!(buf.len(), PAGE_SZ); + std::io::Result::Ok( + self.file + .read_exact_at( + crate::buffer_pool::PageWriteGuardBuf::new(buf), + blknum as u64 * PAGE_SZ as u64, + ) + .await? + .assume_init(), + ) + } + .await?; + Ok(BlockLease::BufferPool(buf)) + } + } } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 229cb3e896..6b8cd77d78 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -3,8 +3,8 @@ use crate::config::PageServerConf; use crate::context::RequestContext; +use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; -use crate::tenant::disk_btree::PAGE_SZ; use crate::virtual_file::{self, VirtualFile}; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; @@ -17,6 +17,8 @@ use tracing::*; use utils::id::TimelineId; pub struct EphemeralFile { + page_cache_file_id: page_cache::FileId, + _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, file: VirtualFile, @@ -53,6 +55,7 @@ impl EphemeralFile { .await?; Ok(EphemeralFile { + page_cache_file_id: page_cache::next_file_id(), _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, file, @@ -68,22 +71,36 @@ impl EphemeralFile { pub(crate) async fn read_blk( &self, blknum: u32, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> Result { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { - let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get(); - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - let buf = self - .file - .read_exact_at( - crate::buffer_pool::PageWriteGuardBuf::new(write_guard), - blknum as u64 * PAGE_SZ as u64, - ) - .await? - .assume_init(); - Ok(BlockLease::PageReadGuard(buf)) + let cache = page_cache::get(); + match cache + .read_immutable_buf(self.page_cache_file_id, blknum, ctx) + .await + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + // order path before error because error is anyhow::Error => might have many contexts + format!( + "ephemeral file: read immutable page #{}: {}: {:#}", + blknum, self.file.path, e, + ), + ) + })? { + page_cache::ReadBufResult::Found(guard) => { + return Ok(BlockLease::PageReadGuard(guard)) + } + page_cache::ReadBufResult::NotFound(write_guard) => { + let write_guard = self + .file + .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) + .await?; + let read_guard = write_guard.mark_valid(); + return Ok(BlockLease::PageReadGuard(read_guard)); + } + }; } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) @@ -114,7 +131,7 @@ impl EphemeralFile { async fn push_bytes( &mut self, src: &[u8], - _ctx: &RequestContext, + ctx: &RequestContext, ) -> Result<(), io::Error> { let mut src_remaining = src; while !src_remaining.is_empty() { @@ -134,6 +151,33 @@ impl EphemeralFile { .await { Ok(_) => { + // Pre-warm the page cache with what we just wrote. + // This isn't necessary for coherency/correctness, but it's how we've always done it. + let cache = page_cache::get(); + match cache + .read_immutable_buf( + self.ephemeral_file.page_cache_file_id, + self.blknum, + ctx, + ) + .await + { + Ok(page_cache::ReadBufResult::Found(_guard)) => { + // This function takes &mut self, so, it shouldn't be possible to reach this point. + unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum); + } + Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + buf.copy_from_slice(&self.ephemeral_file.mutable_tail); + let _ = write_guard.mark_valid(); + // pre-warm successful + } + Err(e) => { + error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); + // fail gracefully, it's not the end of the world if we can't pre-warm the cache here + } + } // Zero the buffer for re-use. // Zeroing is critical for correcntess because the write_blob code below // and similarly read_blk expect zeroed pages. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index a5f68ab1e4..3a445ef71e 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -29,10 +29,10 @@ //! use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; +use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; -use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::Timeline; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 15c18de9ea..c62e6aed51 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -25,10 +25,10 @@ //! actual page images are stored in the "values" part. use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; +use crate::page_cache::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; -use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 51b615c055..70c6ee2042 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -33,6 +33,7 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::sync::gate::Gate; +use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; @@ -40,24 +41,19 @@ use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use std::{ cmp::{max, min, Ordering}, - collections::{BTreeMap, BinaryHeap, HashMap, HashSet}, ops::ControlFlow, }; -use crate::tenant::disk_btree::PAGE_SZ; -use crate::tenant::storage_layer::delta_layer::DeltaEntry; -use crate::{ - context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder}, - disk_usage_eviction_task::EvictionCandidate, -}; - -use crate::disk_usage_eviction_task::DiskUsageEvictionInfo; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, metadata::{save_metadata, TimelineMetadata}, par_fsync, }; +use crate::{ + context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder}, + disk_usage_eviction_task::DiskUsageEvictionInfo, +}; use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError}; use crate::{ disk_usage_eviction_task::finite_f32, @@ -67,6 +63,9 @@ use crate::{ ValueReconstructState, }, }; +use crate::{ + disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry, +}; use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind}; use crate::config::PageServerConf; @@ -91,6 +90,7 @@ use utils::{ simple_rcu::{Rcu, RcuReadGuard}, }; +use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::task_mgr; @@ -2555,13 +2555,24 @@ impl Timeline { } } + /// # Cancel-safety + /// + /// This method is cancellation-safe. async fn lookup_cached_page( &self, - _key: &Key, - _lsn: Lsn, - _ctx: &RequestContext, + key: &Key, + lsn: Lsn, + ctx: &RequestContext, ) -> Option<(Lsn, Bytes)> { - None + let cache = page_cache::get(); + + // FIXME: It's pointless to check the cache for things that are not 8kB pages. + // We should look at the key to determine if it's a cacheable object + let (lsn, read_guard) = cache + .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx) + .await?; + let img = Bytes::from(read_guard.to_vec()); + Some((lsn, img)) } fn get_ancestor_timeline(&self) -> anyhow::Result> { @@ -3607,7 +3618,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128; + let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? // min-heap (reserve space for one more element added before eviction) @@ -3859,7 +3870,7 @@ impl Timeline { // Add two pages for potential overhead. This should in theory be already // accounted for in the target calculation, but for very small targets, // we still might easily hit the limit otherwise. - let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2; + let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2; for layer in new_layers.iter() { if layer.layer_desc().file_size > warn_limit { warn!( @@ -4387,6 +4398,8 @@ impl Timeline { trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn); }; + let last_rec_lsn = data.records.last().unwrap().0; + let img = match self .walredo_mgr .request_redo(key, request_lsn, data.img, data.records, self.pg_version) @@ -4397,6 +4410,23 @@ impl Timeline { Err(e) => return Err(PageReconstructError::WalRedo(e)), }; + if img.len() == page_cache::PAGE_SZ { + let cache = page_cache::get(); + if let Err(e) = cache + .memorize_materialized_page( + self.tenant_shard_id, + self.timeline_id, + key, + last_rec_lsn, + &img, + ) + .await + .context("Materialized page memoization failed") + { + return Err(PageReconstructError::from(e)); + } + } + Ok(img) } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 53a9aaa09e..04435f8731 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,6 +12,7 @@ //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; +use crate::page_cache::PageWriteGuard; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; @@ -118,6 +119,37 @@ struct SlotInner { file: Option, } +/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. +struct PageWriteGuardBuf { + page: PageWriteGuard<'static>, + init_up_to: usize, +} +// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, +// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. +unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.page.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.page.len() + } +} +// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access, +// hence it's safe to hand out the `stable_mut_ptr()`. +unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.page.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.page.len()); + self.init_up_to = pos; + } +} + impl OpenFiles { /// Find a slot to use, evicting an existing file descriptor if needed. /// @@ -527,6 +559,21 @@ impl VirtualFile { res.map(|()| buf) } + /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. + pub async fn read_exact_at_page( + &self, + page: PageWriteGuard<'static>, + offset: u64, + ) -> Result, Error> { + let buf = PageWriteGuardBuf { + page, + init_up_to: 0, + }; + let res = self.read_exact_at(buf, offset).await; + res.map(|PageWriteGuardBuf { page, .. }| page) + .map_err(|e| Error::new(ErrorKind::Other, e)) + } + // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() {