diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index eb5c3f15cf..771d3d8a30 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -12,10 +12,9 @@ use std::collections::BinaryHeap; use std::ops::Range; use std::{fs, str}; -use pageserver::page_cache::PAGE_SZ; use pageserver::repository::{Key, KEY_SIZE}; use pageserver::tenant::block_io::FileBlockReader; -use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; +use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection, PAGE_SZ}; use pageserver::tenant::storage_layer::delta_layer::{Summary, DELTA_KEY_SIZE}; use pageserver::tenant::storage_layer::range_overlaps; use pageserver::virtual_file::{self, VirtualFile}; @@ -143,7 +142,6 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> { // Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree. pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); - pageserver::page_cache::init(100); let mut total_delta_layers = 0usize; let mut total_image_layers = 0usize; diff --git a/pageserver/ctl/src/layers.rs b/pageserver/ctl/src/layers.rs index dbbcfedac0..2c43a922b4 100644 --- a/pageserver/ctl/src/layers.rs +++ b/pageserver/ctl/src/layers.rs @@ -11,7 +11,7 @@ use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary}; use pageserver::tenant::storage_layer::{delta_layer, image_layer}; use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer}; use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; -use pageserver::{page_cache, virtual_file}; +use pageserver::virtual_file; use pageserver::{ repository::{Key, KEY_SIZE}, tenant::{ @@ -60,7 +60,6 @@ pub(crate) enum LayerCmd { async fn read_delta_file(path: impl AsRef, ctx: &RequestContext) -> Result<()> { let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path"); virtual_file::init(10, virtual_file::IoEngineKind::StdFs); - page_cache::init(100); let file = FileBlockReader::new(VirtualFile::open(path).await?); let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref())?; @@ -188,7 +187,6 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> { new_timeline_id, } => { pageserver::virtual_file::init(10, virtual_file::IoEngineKind::StdFs); - pageserver::page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); diff --git a/pageserver/ctl/src/main.rs b/pageserver/ctl/src/main.rs index 3c90933fe9..cd7e6bc728 100644 --- a/pageserver/ctl/src/main.rs +++ b/pageserver/ctl/src/main.rs @@ -15,7 +15,6 @@ use index_part::IndexPartCmd; use layers::LayerCmd; use pageserver::{ context::{DownloadBehavior, RequestContext}, - page_cache, task_mgr::TaskKind, tenant::{dump_layerfile_from_path, metadata::TimelineMetadata}, virtual_file, @@ -124,7 +123,6 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> { async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> { // Basic initialization of things that don't change after startup virtual_file::init(10, virtual_file::IoEngineKind::StdFs); - page_cache::init(100); let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); dump_layerfile_from_path(path, true, &ctx).await } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 84de76e55e..3727483cb8 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -24,7 +24,7 @@ use pageserver::{ config::{defaults::*, PageServerConf}, context::{DownloadBehavior, RequestContext}, deletion_queue::DeletionQueue, - http, page_cache, page_service, task_mgr, + http, page_service, task_mgr, task_mgr::TaskKind, task_mgr::{BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME}, tenant::mgr, @@ -131,7 +131,6 @@ fn main() -> anyhow::Result<()> { // Basic initialization of things that don't change after startup virtual_file::init(conf.max_file_descriptors, conf.virtual_file_io_engine); - page_cache::init(conf.page_cache_size); start_pageserver(launch_ts, conf).context("Failed to start pageserver")?; diff --git a/pageserver/src/buffer_pool.rs b/pageserver/src/buffer_pool.rs new file mode 100644 index 0000000000..1078ee50e9 --- /dev/null +++ b/pageserver/src/buffer_pool.rs @@ -0,0 +1,82 @@ +use std::cell::RefCell; + +use crate::tenant::disk_btree::PAGE_SZ; + +pub struct Buffer(Option>); + +// Thread-local list of re-usable buffers. +thread_local! { + static POOL: RefCell>> = RefCell::new(Vec::new()); +} + +pub(crate) fn get() -> Buffer { + let maybe = POOL.with(|rc| rc.borrow_mut().pop()); + match maybe { + Some(buf) => Buffer(Some(buf)), + None => Buffer(Some(Box::new([0; PAGE_SZ]))), + } +} + +impl Drop for Buffer { + fn drop(&mut self) { + let buf = self.0.take().unwrap(); + POOL.with(|rc| rc.borrow_mut().push(buf)) + } +} + +impl std::ops::Deref for Buffer { + type Target = [u8; PAGE_SZ]; + + fn deref(&self) -> &Self::Target { + self.0.as_ref().unwrap().as_ref() + } +} + +impl std::ops::DerefMut for Buffer { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0.as_mut().unwrap().as_mut() + } +} + +pub(crate) struct PageWriteGuardBuf { + page: Buffer, + init_up_to: usize, +} +impl PageWriteGuardBuf { + pub fn new(buf: Buffer) -> Self { + PageWriteGuardBuf { + page: buf, + init_up_to: 0, + } + } + pub fn assume_init(self) -> Buffer { + assert_eq!(self.init_up_to, PAGE_SZ); + self.page + } +} + +// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, +// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. +unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.page.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.page.len() + } +} +// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access, +// hence it's safe to hand out the `stable_mut_ptr()`. +unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.page.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.page.len()); + self.init_up_to = pos; + } +} diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index ee331ea154..ba820bb62b 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -282,8 +282,4 @@ impl RequestContext { pub(crate) fn access_stats_behavior(&self) -> AccessStatsBehavior { self.access_stats_behavior } - - pub(crate) fn page_content_kind(&self) -> PageContentKind { - self.page_content_kind - } } diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index bcde1166b7..654b53e0a3 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -13,7 +13,6 @@ pub mod http; pub mod import_datadir; pub use pageserver_api::keyspace; pub mod metrics; -pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; pub mod repository; @@ -31,6 +30,8 @@ use camino::Utf8Path; use deletion_queue::DeletionQueue; use tracing::info; +pub mod buffer_pool; + /// Current storage format version /// /// This is embedded in the header of all the layer files. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9b3679e3c2..84e66b132a 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -187,216 +187,6 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy = Lazy::new(|| } }); -pub(crate) struct PageCacheMetricsForTaskKind { - pub read_accesses_materialized_page: IntCounter, - pub read_accesses_immutable: IntCounter, - - pub read_hits_immutable: IntCounter, - pub read_hits_materialized_page_exact: IntCounter, - pub read_hits_materialized_page_older_lsn: IntCounter, -} - -pub(crate) struct PageCacheMetrics { - map: EnumMap>, -} - -static PAGE_CACHE_READ_HITS: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "pageserver_page_cache_read_hits_total", - "Number of read accesses to the page cache that hit", - &["task_kind", "key_kind", "content_kind", "hit_kind"] - ) - .expect("failed to define a metric") -}); - -static PAGE_CACHE_READ_ACCESSES: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "pageserver_page_cache_read_accesses_total", - "Number of read accesses to the page cache", - &["task_kind", "key_kind", "content_kind"] - ) - .expect("failed to define a metric") -}); - -pub(crate) static PAGE_CACHE: Lazy = Lazy::new(|| PageCacheMetrics { - map: EnumMap::from_array(std::array::from_fn(|task_kind| { - let task_kind = ::from_usize(task_kind); - let task_kind: &'static str = task_kind.into(); - EnumMap::from_array(std::array::from_fn(|content_kind| { - let content_kind = ::from_usize(content_kind); - let content_kind: &'static str = content_kind.into(); - PageCacheMetricsForTaskKind { - read_accesses_materialized_page: { - PAGE_CACHE_READ_ACCESSES - .get_metric_with_label_values(&[ - task_kind, - "materialized_page", - content_kind, - ]) - .unwrap() - }, - - read_accesses_immutable: { - PAGE_CACHE_READ_ACCESSES - .get_metric_with_label_values(&[task_kind, "immutable", content_kind]) - .unwrap() - }, - - read_hits_immutable: { - PAGE_CACHE_READ_HITS - .get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"]) - .unwrap() - }, - - read_hits_materialized_page_exact: { - PAGE_CACHE_READ_HITS - .get_metric_with_label_values(&[ - task_kind, - "materialized_page", - content_kind, - "exact", - ]) - .unwrap() - }, - - read_hits_materialized_page_older_lsn: { - PAGE_CACHE_READ_HITS - .get_metric_with_label_values(&[ - task_kind, - "materialized_page", - content_kind, - "older_lsn", - ]) - .unwrap() - }, - } - })) - })), -}); - -impl PageCacheMetrics { - pub(crate) fn for_ctx(&self, ctx: &RequestContext) -> &PageCacheMetricsForTaskKind { - &self.map[ctx.task_kind()][ctx.page_content_kind()] - } -} - -pub(crate) struct PageCacheSizeMetrics { - pub max_bytes: UIntGauge, - - pub current_bytes_immutable: UIntGauge, - pub current_bytes_materialized_page: UIntGauge, -} - -static PAGE_CACHE_SIZE_CURRENT_BYTES: Lazy = Lazy::new(|| { - register_uint_gauge_vec!( - "pageserver_page_cache_size_current_bytes", - "Current size of the page cache in bytes, by key kind", - &["key_kind"] - ) - .expect("failed to define a metric") -}); - -pub(crate) static PAGE_CACHE_SIZE: Lazy = - Lazy::new(|| PageCacheSizeMetrics { - max_bytes: { - register_uint_gauge!( - "pageserver_page_cache_size_max_bytes", - "Maximum size of the page cache in bytes" - ) - .expect("failed to define a metric") - }, - current_bytes_immutable: { - PAGE_CACHE_SIZE_CURRENT_BYTES - .get_metric_with_label_values(&["immutable"]) - .unwrap() - }, - current_bytes_materialized_page: { - PAGE_CACHE_SIZE_CURRENT_BYTES - .get_metric_with_label_values(&["materialized_page"]) - .unwrap() - }, - }); - -pub(crate) mod page_cache_eviction_metrics { - use std::num::NonZeroUsize; - - use metrics::{register_int_counter_vec, IntCounter, IntCounterVec}; - use once_cell::sync::Lazy; - - #[derive(Clone, Copy)] - pub(crate) enum Outcome { - FoundSlotUnused { iters: NonZeroUsize }, - FoundSlotEvicted { iters: NonZeroUsize }, - ItersExceeded { iters: NonZeroUsize }, - } - - static ITERS_TOTAL_VEC: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "pageserver_page_cache_find_victim_iters_total", - "Counter for the number of iterations in the find_victim loop", - &["outcome"], - ) - .expect("failed to define a metric") - }); - - static CALLS_VEC: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "pageserver_page_cache_find_victim_calls", - "Incremented at the end of each find_victim() call.\ - Filter by outcome to get e.g., eviction rate.", - &["outcome"] - ) - .unwrap() - }); - - pub(crate) fn observe(outcome: Outcome) { - macro_rules! dry { - ($label:literal, $iters:expr) => {{ - static LABEL: &'static str = $label; - static ITERS_TOTAL: Lazy = - Lazy::new(|| ITERS_TOTAL_VEC.with_label_values(&[LABEL])); - static CALLS: Lazy = - Lazy::new(|| CALLS_VEC.with_label_values(&[LABEL])); - ITERS_TOTAL.inc_by(($iters.get()) as u64); - CALLS.inc(); - }}; - } - match outcome { - Outcome::FoundSlotUnused { iters } => dry!("found_empty", iters), - Outcome::FoundSlotEvicted { iters } => { - dry!("found_evicted", iters) - } - Outcome::ItersExceeded { iters } => { - dry!("err_iters_exceeded", iters); - super::page_cache_errors_inc(super::PageCacheErrorKind::EvictIterLimit); - } - } - } -} - -static PAGE_CACHE_ERRORS: Lazy = Lazy::new(|| { - register_int_counter_vec!( - "page_cache_errors_total", - "Number of timeouts while acquiring a pinned slot in the page cache", - &["error_kind"] - ) - .expect("failed to define a metric") -}); - -#[derive(IntoStaticStr)] -#[strum(serialize_all = "kebab_case")] -pub(crate) enum PageCacheErrorKind { - AcquirePinnedSlotTimeout, - EvictIterLimit, -} - -pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) { - PAGE_CACHE_ERRORS - .get_metric_with_label_values(&[error_kind.into()]) - .unwrap() - .inc(); -} - pub(crate) static WAIT_LSN_TIME: Lazy = Lazy::new(|| { register_histogram!( "pageserver_wait_lsn_seconds", @@ -1998,7 +1788,6 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use crate::context::{PageContentKind, RequestContext}; use crate::task_mgr::TaskKind; /// Maintain a per timeline gauge in addition to the global gauge. diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs deleted file mode 100644 index 28d2584bf4..0000000000 --- a/pageserver/src/page_cache.rs +++ /dev/null @@ -1,1001 +0,0 @@ -//! -//! Global page cache -//! -//! The page cache uses up most of the memory in the page server. It is shared -//! by all tenants, and it is used to store different kinds of pages. Sharing -//! the cache allows memory to be dynamically allocated where it's needed the -//! most. -//! -//! The page cache consists of fixed-size buffers, 8 kB each to match the -//! PostgreSQL buffer size, and a Slot struct for each buffer to contain -//! information about what's stored in the buffer. -//! -//! # Types Of Pages -//! -//! [`PageCache`] only supports immutable pages. -//! Hence there is no need to worry about coherency. -//! -//! Two types of pages are supported: -//! -//! * **Materialized pages**, filled & used by page reconstruction -//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`]. -//! -//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only. -//! It uses the page cache only for the blocks that are already fully written and immutable. -//! -//! # Filling The Page Cache -//! -//! Page cache maps from a cache key to a buffer slot. -//! The cache key uniquely identifies the piece of data that is being cached. -//! -//! The cache key for **materialized pages** is [`TenantShardId`], [`TimelineId`], [`Key`], and [`Lsn`]. -//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access. -//! -//! The cache key for **immutable file** pages is [`FileId`] and a block number. -//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following: -//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`]. -//! * Get a [`FileId`] using [`next_file_id`]. -//! * Use the mechanism to associate the on-disk file with the returned [`FileId`]. -//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`]. -//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains -//! a read guard for the page. Just use it. -//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains -//! a write guard for the page. Fill the page with the contents of the on-disk file. -//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid. -//! Then try again to [`PageCache::read_immutable_buf`]. -//! Unless there's high cache pressure, the page should now be cached. -//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.) -//! -//! # Locking -//! -//! There are two levels of locking involved: There's one lock for the "mapping" -//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer -//! slot, and a separate lock on each slot. To read or write the contents of a -//! slot, you must hold the lock on the slot in read or write mode, -//! respectively. To change the mapping of a slot, i.e. to evict a page or to -//! assign a buffer for a page, you must hold the mapping lock and the lock on -//! the slot at the same time. -//! -//! Whenever you need to hold both locks simultaneously, the slot lock must be -//! acquired first. This consistent ordering avoids deadlocks. To look up a page -//! in the cache, you would first look up the mapping, while holding the mapping -//! lock, and then lock the slot. You must release the mapping lock in between, -//! to obey the lock ordering and avoid deadlock. -//! -//! A slot can momentarily have invalid contents, even if it's already been -//! inserted to the mapping, but you must hold the write-lock on the slot until -//! the contents are valid. If you need to release the lock without initializing -//! the contents, you must remove the mapping first. We make that easy for the -//! callers with PageWriteGuard: the caller must explicitly call guard.mark_valid() after it has -//! initialized it. If the guard is dropped without calling mark_valid(), the -//! mapping is automatically removed and the slot is marked free. -//! - -use std::{ - collections::{hash_map::Entry, HashMap}, - convert::TryInto, - sync::{ - atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - Arc, Weak, - }, - time::Duration, -}; - -use anyhow::Context; -use once_cell::sync::OnceCell; -use pageserver_api::shard::TenantShardId; -use utils::{id::TimelineId, lsn::Lsn}; - -use crate::{ - context::RequestContext, - metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics}, - repository::Key, -}; - -static PAGE_CACHE: OnceCell = OnceCell::new(); -const TEST_PAGE_CACHE_SIZE: usize = 50; - -/// -/// Initialize the page cache. This must be called once at page server startup. -/// -pub fn init(size: usize) { - if PAGE_CACHE.set(PageCache::new(size)).is_err() { - panic!("page cache already initialized"); - } -} - -/// -/// Get a handle to the page cache. -/// -pub fn get() -> &'static PageCache { - // - // In unit tests, page server startup doesn't happen and no one calls - // page_cache::init(). Initialize it here with a tiny cache, so that the - // page cache is usable in unit tests. - // - if cfg!(test) { - PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE)) - } else { - PAGE_CACHE.get().expect("page cache not initialized") - } -} - -pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; -const MAX_USAGE_COUNT: u8 = 5; - -/// See module-level comment. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct FileId(u64); - -static NEXT_ID: AtomicU64 = AtomicU64::new(1); - -/// See module-level comment. -pub fn next_file_id() -> FileId { - FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) -} - -/// -/// CacheKey uniquely identifies a "thing" to cache in the page cache. -/// -#[derive(Debug, PartialEq, Eq, Clone)] -#[allow(clippy::enum_variant_names)] -enum CacheKey { - MaterializedPage { - hash_key: MaterializedPageHashKey, - lsn: Lsn, - }, - ImmutableFilePage { - file_id: FileId, - blkno: u32, - }, -} - -#[derive(Debug, PartialEq, Eq, Hash, Clone)] -struct MaterializedPageHashKey { - /// Why is this TenantShardId rather than TenantId? - /// - /// Usually, the materialized value of a page@lsn is identical on any shard in the same tenant. However, this - /// this not the case for certain internally-generated pages (e.g. relation sizes). In future, we may make this - /// key smaller by omitting the shard, if we ensure that reads to such pages always skip the cache, or are - /// special-cased in some other way. - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - key: Key, -} - -#[derive(Clone)] -struct Version { - lsn: Lsn, - slot_idx: usize, -} - -struct Slot { - inner: tokio::sync::RwLock, - usage_count: AtomicU8, -} - -struct SlotInner { - key: Option, - // for `coalesce_readers_permit` - permit: std::sync::Mutex>, - buf: &'static mut [u8; PAGE_SZ], -} - -impl Slot { - /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT. - fn inc_usage_count(&self) { - let _ = self - .usage_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { - if val == MAX_USAGE_COUNT { - None - } else { - Some(val + 1) - } - }); - } - - /// Decrement usage count on the buffer, unless it's already zero. Returns - /// the old usage count. - fn dec_usage_count(&self) -> u8 { - let count_res = - self.usage_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { - if val == 0 { - None - } else { - Some(val - 1) - } - }); - - match count_res { - Ok(usage_count) => usage_count, - Err(usage_count) => usage_count, - } - } - - /// Sets the usage count to a specific value. - fn set_usage_count(&self, count: u8) { - self.usage_count.store(count, Ordering::Relaxed); - } -} - -impl SlotInner { - /// If there is aready a reader, drop our permit and share its permit, just like we share read access. - fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc { - let mut guard = self.permit.lock().unwrap(); - if let Some(existing_permit) = guard.upgrade() { - drop(guard); - drop(permit); - existing_permit - } else { - let permit = Arc::new(permit); - *guard = Arc::downgrade(&permit); - permit - } - } -} - -pub struct PageCache { - /// This contains the mapping from the cache key to buffer slot that currently - /// contains the page, if any. - /// - /// TODO: This is protected by a single lock. If that becomes a bottleneck, - /// this HashMap can be replaced with a more concurrent version, there are - /// plenty of such crates around. - /// - /// If you add support for caching different kinds of objects, each object kind - /// can have a separate mapping map, next to this field. - materialized_page_map: std::sync::RwLock>>, - - immutable_page_map: std::sync::RwLock>, - - /// The actual buffers with their metadata. - slots: Box<[Slot]>, - - pinned_slots: Arc, - - /// Index of the next candidate to evict, for the Clock replacement algorithm. - /// This is interpreted modulo the page cache size. - next_evict_slot: AtomicUsize, - - size_metrics: &'static PageCacheSizeMetrics, -} - -struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit); - -/// -/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked -/// until the guard is dropped. -/// -pub struct PageReadGuard<'i> { - _permit: Arc, - slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>, -} - -impl std::ops::Deref for PageReadGuard<'_> { - type Target = [u8; PAGE_SZ]; - - fn deref(&self) -> &Self::Target { - self.slot_guard.buf - } -} - -impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { - fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.slot_guard.buf - } -} - -/// -/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked -/// until the guard is dropped. -/// -/// Counterintuitively, this is used even for a read, if the requested page is not -/// currently found in the page cache. In that case, the caller of lock_for_read() -/// is expected to fill in the page contents and call mark_valid(). -pub struct PageWriteGuard<'i> { - state: PageWriteGuardState<'i>, -} - -enum PageWriteGuardState<'i> { - Invalid { - inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>, - _permit: PinnedSlotsPermit, - }, - Downgraded, -} - -impl std::ops::DerefMut for PageWriteGuard<'_> { - fn deref_mut(&mut self) -> &mut Self::Target { - match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, - PageWriteGuardState::Downgraded => unreachable!(), - } - } -} - -impl std::ops::Deref for PageWriteGuard<'_> { - type Target = [u8; PAGE_SZ]; - - fn deref(&self) -> &Self::Target { - match &self.state { - PageWriteGuardState::Invalid { inner, _permit } => inner.buf, - PageWriteGuardState::Downgraded => unreachable!(), - } - } -} - -impl<'a> PageWriteGuard<'a> { - /// Mark that the buffer contents are now valid. - #[must_use] - pub fn mark_valid(mut self) -> PageReadGuard<'a> { - let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded); - match prev { - PageWriteGuardState::Invalid { inner, _permit } => { - assert!(inner.key.is_some()); - PageReadGuard { - _permit: Arc::new(_permit), - slot_guard: inner.downgrade(), - } - } - PageWriteGuardState::Downgraded => unreachable!(), - } - } -} - -impl Drop for PageWriteGuard<'_> { - /// - /// If the buffer was allocated for a page that was not already in the - /// cache, but the lock_for_read/write() caller dropped the buffer without - /// initializing it, remove the mapping from the page cache. - /// - fn drop(&mut self) { - match &mut self.state { - PageWriteGuardState::Invalid { inner, _permit } => { - assert!(inner.key.is_some()); - let self_key = inner.key.as_ref().unwrap(); - PAGE_CACHE.get().unwrap().remove_mapping(self_key); - inner.key = None; - } - PageWriteGuardState::Downgraded => {} - } - } -} - -/// lock_for_read() return value -pub enum ReadBufResult<'a> { - Found(PageReadGuard<'a>), - NotFound(PageWriteGuard<'a>), -} - -impl PageCache { - // - // Section 1.1: Public interface functions for looking up and memorizing materialized page - // versions in the page cache - // - - /// Look up a materialized page version. - /// - /// The 'lsn' is an upper bound, this will return the latest version of - /// the given block, but not newer than 'lsn'. Returns the actual LSN of the - /// returned page. - pub async fn lookup_materialized_page( - &self, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - key: &Key, - lsn: Lsn, - ctx: &RequestContext, - ) -> Option<(Lsn, PageReadGuard)> { - let Ok(permit) = self.try_get_pinned_slot_permit().await else { - return None; - }; - - crate::metrics::PAGE_CACHE - .for_ctx(ctx) - .read_accesses_materialized_page - .inc(); - - let mut cache_key = CacheKey::MaterializedPage { - hash_key: MaterializedPageHashKey { - tenant_shard_id, - timeline_id, - key: *key, - }, - lsn, - }; - - if let Some(guard) = self - .try_lock_for_read(&mut cache_key, &mut Some(permit)) - .await - { - if let CacheKey::MaterializedPage { - hash_key: _, - lsn: available_lsn, - } = cache_key - { - if available_lsn == lsn { - crate::metrics::PAGE_CACHE - .for_ctx(ctx) - .read_hits_materialized_page_exact - .inc(); - } else { - crate::metrics::PAGE_CACHE - .for_ctx(ctx) - .read_hits_materialized_page_older_lsn - .inc(); - } - Some((available_lsn, guard)) - } else { - panic!("unexpected key type in slot"); - } - } else { - None - } - } - - /// - /// Store an image of the given page in the cache. - /// - pub async fn memorize_materialized_page( - &self, - tenant_shard_id: TenantShardId, - timeline_id: TimelineId, - key: Key, - lsn: Lsn, - img: &[u8], - ) -> anyhow::Result<()> { - let cache_key = CacheKey::MaterializedPage { - hash_key: MaterializedPageHashKey { - tenant_shard_id, - timeline_id, - key, - }, - lsn, - }; - - let mut permit = Some(self.try_get_pinned_slot_permit().await?); - loop { - // First check if the key already exists in the cache. - if let Some(slot_idx) = self.search_mapping_exact(&cache_key) { - // The page was found in the mapping. Lock the slot, and re-check - // that it's still what we expected (because we don't released the mapping - // lock already, another thread could have evicted the page) - let slot = &self.slots[slot_idx]; - let inner = slot.inner.write().await; - if inner.key.as_ref() == Some(&cache_key) { - slot.inc_usage_count(); - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - debug_assert_eq!(inner.buf.len(), img.len()); - // We already had it in cache. Another thread must've put it there - // concurrently. Check that it had the same contents that we - // replayed. - assert!(inner.buf == img); - return Ok(()); - } - } - debug_assert!(permit.is_some()); - - // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim(permit.as_ref().unwrap()) - .await - .context("Failed to find evict victim")?; - - // Insert mapping for this. At this point, we may find that another - // thread did the same thing concurrently. In that case, we evicted - // our victim buffer unnecessarily. Put it into the free list and - // continue with the slot that the other thread chose. - if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) { - // TODO: put to free list - - // We now just loop back to start from beginning. This is not - // optimal, we'll perform the lookup in the mapping again, which - // is not really necessary because we already got - // 'existing_slot_idx'. But this shouldn't happen often enough - // to matter much. - continue; - } - - // Make the slot ready - let slot = &self.slots[slot_idx]; - inner.key = Some(cache_key.clone()); - slot.set_usage_count(1); - // Create a write guard for the slot so we go through the expected motions. - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - let mut write_guard = PageWriteGuard { - state: PageWriteGuardState::Invalid { - _permit: permit.take().unwrap(), - inner, - }, - }; - write_guard.copy_from_slice(img); - let _ = write_guard.mark_valid(); - return Ok(()); - } - } - - // Section 1.2: Public interface functions for working with immutable file pages. - - pub async fn read_immutable_buf( - &self, - file_id: FileId, - blkno: u32, - ctx: &RequestContext, - ) -> anyhow::Result { - let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; - - self.lock_for_read(&mut cache_key, ctx).await - } - - // - // Section 2: Internal interface functions for lookup/update. - // - // To add support for a new kind of "thing" to cache, you will need - // to add public interface routines above, and code to deal with the - // "mappings" after this section. But the routines in this section should - // not require changes. - - async fn try_get_pinned_slot_permit(&self) -> anyhow::Result { - match tokio::time::timeout( - // Choose small timeout, neon_smgr does its own retries. - // https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869 - Duration::from_secs(10), - Arc::clone(&self.pinned_slots).acquire_owned(), - ) - .await - { - Ok(res) => Ok(PinnedSlotsPermit( - res.expect("this semaphore is never closed"), - )), - Err(_timeout) => { - crate::metrics::page_cache_errors_inc( - crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout, - ); - anyhow::bail!("timeout: there were page guards alive for all page cache slots") - } - } - } - - /// Look up a page in the cache. - /// - /// If the search criteria is not exact, *cache_key is updated with the key - /// for exact key of the returned page. (For materialized pages, that means - /// that the LSN in 'cache_key' is updated with the LSN of the returned page - /// version.) - /// - /// If no page is found, returns None and *cache_key is left unmodified. - /// - async fn try_lock_for_read( - &self, - cache_key: &mut CacheKey, - permit: &mut Option, - ) -> Option { - let cache_key_orig = cache_key.clone(); - if let Some(slot_idx) = self.search_mapping(cache_key) { - // The page was found in the mapping. Lock the slot, and re-check - // that it's still what we expected (because we released the mapping - // lock already, another thread could have evicted the page) - let slot = &self.slots[slot_idx]; - let inner = slot.inner.read().await; - if inner.key.as_ref() == Some(cache_key) { - slot.inc_usage_count(); - return Some(PageReadGuard { - _permit: inner.coalesce_readers_permit(permit.take().unwrap()), - slot_guard: inner, - }); - } else { - // search_mapping might have modified the search key; restore it. - *cache_key = cache_key_orig; - } - } - None - } - - /// Return a locked buffer for given block. - /// - /// Like try_lock_for_read(), if the search criteria is not exact and the - /// page is already found in the cache, *cache_key is updated. - /// - /// If the page is not found in the cache, this allocates a new buffer for - /// it. The caller may then initialize the buffer with the contents, and - /// call mark_valid(). - /// - /// Example usage: - /// - /// ```ignore - /// let cache = page_cache::get(); - /// - /// match cache.lock_for_read(&key) { - /// ReadBufResult::Found(read_guard) => { - /// // The page was found in cache. Use it - /// }, - /// ReadBufResult::NotFound(write_guard) => { - /// // The page was not found in cache. Read it from disk into the - /// // buffer. - /// //read_my_page_from_disk(write_guard); - /// - /// // The buffer contents are now valid. Tell the page cache. - /// write_guard.mark_valid(); - /// }, - /// } - /// ``` - /// - async fn lock_for_read( - &self, - cache_key: &mut CacheKey, - ctx: &RequestContext, - ) -> anyhow::Result { - let mut permit = Some(self.try_get_pinned_slot_permit().await?); - - let (read_access, hit) = match cache_key { - CacheKey::MaterializedPage { .. } => { - unreachable!("Materialized pages use lookup_materialized_page") - } - CacheKey::ImmutableFilePage { .. } => ( - &crate::metrics::PAGE_CACHE - .for_ctx(ctx) - .read_accesses_immutable, - &crate::metrics::PAGE_CACHE.for_ctx(ctx).read_hits_immutable, - ), - }; - read_access.inc(); - - let mut is_first_iteration = true; - loop { - // First check if the key already exists in the cache. - if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await { - debug_assert!(permit.is_none()); - if is_first_iteration { - hit.inc(); - } - return Ok(ReadBufResult::Found(read_guard)); - } - debug_assert!(permit.is_some()); - is_first_iteration = false; - - // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim(permit.as_ref().unwrap()) - .await - .context("Failed to find evict victim")?; - - // Insert mapping for this. At this point, we may find that another - // thread did the same thing concurrently. In that case, we evicted - // our victim buffer unnecessarily. Put it into the free list and - // continue with the slot that the other thread chose. - if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) { - // TODO: put to free list - - // We now just loop back to start from beginning. This is not - // optimal, we'll perform the lookup in the mapping again, which - // is not really necessary because we already got - // 'existing_slot_idx'. But this shouldn't happen often enough - // to matter much. - continue; - } - - // Make the slot ready - let slot = &self.slots[slot_idx]; - inner.key = Some(cache_key.clone()); - slot.set_usage_count(1); - - debug_assert!( - { - let guard = inner.permit.lock().unwrap(); - guard.upgrade().is_none() - }, - "we hold a write lock, so, no one else should have a permit" - ); - - return Ok(ReadBufResult::NotFound(PageWriteGuard { - state: PageWriteGuardState::Invalid { - _permit: permit.take().unwrap(), - inner, - }, - })); - } - } - - // - // Section 3: Mapping functions - // - - /// Search for a page in the cache using the given search key. - /// - /// Returns the slot index, if any. If the search criteria is not exact, - /// *cache_key is updated with the actual key of the found page. - /// - /// NOTE: We don't hold any lock on the mapping on return, so the slot might - /// get recycled for an unrelated page immediately after this function - /// returns. The caller is responsible for re-checking that the slot still - /// contains the page with the same key before using it. - /// - fn search_mapping(&self, cache_key: &mut CacheKey) -> Option { - match cache_key { - CacheKey::MaterializedPage { hash_key, lsn } => { - let map = self.materialized_page_map.read().unwrap(); - let versions = map.get(hash_key)?; - - let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) { - Ok(version_idx) => version_idx, - Err(0) => return None, - Err(version_idx) => version_idx - 1, - }; - let version = &versions[version_idx]; - *lsn = version.lsn; - Some(version.slot_idx) - } - CacheKey::ImmutableFilePage { file_id, blkno } => { - let map = self.immutable_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } - } - } - - /// Search for a page in the cache using the given search key. - /// - /// Like 'search_mapping, but performs an "exact" search. Used for - /// allocating a new buffer. - fn search_mapping_exact(&self, key: &CacheKey) -> Option { - match key { - CacheKey::MaterializedPage { hash_key, lsn } => { - let map = self.materialized_page_map.read().unwrap(); - let versions = map.get(hash_key)?; - - if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) { - Some(versions[version_idx].slot_idx) - } else { - None - } - } - CacheKey::ImmutableFilePage { file_id, blkno } => { - let map = self.immutable_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } - } - } - - /// - /// Remove mapping for given key. - /// - fn remove_mapping(&self, old_key: &CacheKey) { - match old_key { - CacheKey::MaterializedPage { - hash_key: old_hash_key, - lsn: old_lsn, - } => { - let mut map = self.materialized_page_map.write().unwrap(); - if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) { - let versions = old_entry.get_mut(); - - if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) { - versions.remove(version_idx); - self.size_metrics - .current_bytes_materialized_page - .sub_page_sz(1); - if versions.is_empty() { - old_entry.remove_entry(); - } - } - } else { - panic!("could not find old key in mapping") - } - } - CacheKey::ImmutableFilePage { file_id, blkno } => { - let mut map = self.immutable_page_map.write().unwrap(); - map.remove(&(*file_id, *blkno)) - .expect("could not find old key in mapping"); - self.size_metrics.current_bytes_immutable.sub_page_sz(1); - } - } - } - - /// - /// Insert mapping for given key. - /// - /// If a mapping already existed for the given key, returns the slot index - /// of the existing mapping and leaves it untouched. - fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option { - match new_key { - CacheKey::MaterializedPage { - hash_key: new_key, - lsn: new_lsn, - } => { - let mut map = self.materialized_page_map.write().unwrap(); - let versions = map.entry(new_key.clone()).or_default(); - match versions.binary_search_by_key(new_lsn, |v| v.lsn) { - Ok(version_idx) => Some(versions[version_idx].slot_idx), - Err(version_idx) => { - versions.insert( - version_idx, - Version { - lsn: *new_lsn, - slot_idx, - }, - ); - self.size_metrics - .current_bytes_materialized_page - .add_page_sz(1); - None - } - } - } - - CacheKey::ImmutableFilePage { file_id, blkno } => { - let mut map = self.immutable_page_map.write().unwrap(); - match map.entry((*file_id, *blkno)) { - Entry::Occupied(entry) => Some(*entry.get()), - Entry::Vacant(entry) => { - entry.insert(slot_idx); - self.size_metrics.current_bytes_immutable.add_page_sz(1); - None - } - } - } - } - } - - // - // Section 4: Misc internal helpers - // - - /// Find a slot to evict. - /// - /// On return, the slot is empty and write-locked. - async fn find_victim( - &self, - _permit_witness: &PinnedSlotsPermit, - ) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard)> { - let iter_limit = self.slots.len() * 10; - let mut iters = 0; - loop { - iters += 1; - let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len(); - - let slot = &self.slots[slot_idx]; - - if slot.dec_usage_count() == 0 { - let mut inner = match slot.inner.try_write() { - Ok(inner) => inner, - Err(_err) => { - if iters > iter_limit { - // NB: Even with the permits, there's no hard guarantee that we will find a slot with - // any particular number of iterations: other threads might race ahead and acquire and - // release pins just as we're scanning the array. - // - // Imagine that nslots is 2, and as starting point, usage_count==1 on all - // slots. There are two threads running concurrently, A and B. A has just - // acquired the permit from the semaphore. - // - // A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search - // B: Acquire permit. - // B: Look at slot 2, decrement its usage_count to zero and continue the search - // B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // B: Acquire permit. - // B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1. - // B: Release pin and permit again - // - // Now we're back in the starting situation that both slots have - // usage_count 1, but A has now been through one iteration of the - // find_victim() loop. This can repeat indefinitely and on each - // iteration, A's iteration count increases by one. - // - // So, even though the semaphore for the permits is fair, the victim search - // itself happens in parallel and is not fair. - // Hence even with a permit, a task can theoretically be starved. - // To avoid this, we'd need tokio to give priority to tasks that are holding - // permits for longer. - // Note that just yielding to tokio during iteration without such - // priority boosting is likely counter-productive. We'd just give more opportunities - // for B to bump usage count, further starving A. - page_cache_eviction_metrics::observe( - page_cache_eviction_metrics::Outcome::ItersExceeded { - iters: iters.try_into().unwrap(), - }, - ); - anyhow::bail!("exceeded evict iter limit"); - } - continue; - } - }; - if let Some(old_key) = &inner.key { - // remove mapping for old buffer - self.remove_mapping(old_key); - inner.key = None; - page_cache_eviction_metrics::observe( - page_cache_eviction_metrics::Outcome::FoundSlotEvicted { - iters: iters.try_into().unwrap(), - }, - ); - } else { - page_cache_eviction_metrics::observe( - page_cache_eviction_metrics::Outcome::FoundSlotUnused { - iters: iters.try_into().unwrap(), - }, - ); - } - return Ok((slot_idx, inner)); - } - } - } - - /// Initialize a new page cache - /// - /// This should be called only once at page server startup. - fn new(num_pages: usize) -> Self { - assert!(num_pages > 0, "page cache size must be > 0"); - - // We could use Vec::leak here, but that potentially also leaks - // uninitialized reserved capacity. With into_boxed_slice and Box::leak - // this is avoided. - let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice()); - - let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; - size_metrics.max_bytes.set_page_sz(num_pages); - size_metrics.current_bytes_immutable.set_page_sz(0); - size_metrics.current_bytes_materialized_page.set_page_sz(0); - - let slots = page_buffer - .chunks_exact_mut(PAGE_SZ) - .map(|chunk| { - let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); - - Slot { - inner: tokio::sync::RwLock::new(SlotInner { - key: None, - buf, - permit: std::sync::Mutex::new(Weak::new()), - }), - usage_count: AtomicU8::new(0), - } - }) - .collect(); - - Self { - materialized_page_map: Default::default(), - immutable_page_map: Default::default(), - slots, - next_evict_slot: AtomicUsize::new(0), - size_metrics, - pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)), - } - } -} - -trait PageSzBytesMetric { - fn set_page_sz(&self, count: usize); - fn add_page_sz(&self, count: usize); - fn sub_page_sz(&self, count: usize); -} - -#[inline(always)] -fn count_times_page_sz(count: usize) -> u64 { - u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap() -} - -impl PageSzBytesMetric for metrics::UIntGauge { - fn set_page_sz(&self, count: usize) { - self.set(count_times_page_sz(count)); - } - fn add_page_sz(&self, count: usize) { - self.add(count_times_page_sz(count)); - } - fn sub_page_sz(&self, count: usize) { - self.sub(count_times_page_sz(count)); - } -} diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index 6de2e95055..3277b2d0a5 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -12,12 +12,13 @@ //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! use crate::context::RequestContext; -use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::io::{Error, ErrorKind}; +use super::disk_btree::PAGE_SZ; + impl<'a> BlockCursor<'a> { /// Read a blob into a new buffer. pub async fn read_blob( diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 1b6bccc120..da7c6cc2d8 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,7 +5,7 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; +use crate::tenant::disk_btree::PAGE_SZ; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; @@ -35,7 +35,7 @@ where /// Reference to an in-memory copy of an immutable on-disk block. pub enum BlockLease<'a> { - PageReadGuard(PageReadGuard<'static>), + PageReadGuard(crate::buffer_pool::Buffer), EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), @@ -43,8 +43,8 @@ pub enum BlockLease<'a> { Vec(Vec), } -impl From> for BlockLease<'static> { - fn from(value: PageReadGuard<'static>) -> BlockLease<'static> { +impl From for BlockLease<'static> { + fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> { BlockLease::PageReadGuard(value) } } @@ -162,28 +162,26 @@ impl<'a> BlockCursor<'a> { /// for modifying the file, nor for invalidating the cache if it is modified. pub struct FileBlockReader { pub file: VirtualFile, - - /// Unique ID of this file, used as key in the page cache. - file_id: page_cache::FileId, } impl FileBlockReader { pub fn new(file: VirtualFile) -> Self { - let file_id = page_cache::next_file_id(); - - FileBlockReader { file_id, file } + FileBlockReader { file } } - /// Read a page from the underlying file into given buffer. async fn fill_buffer( &self, - buf: PageWriteGuard<'static>, + buf: crate::buffer_pool::Buffer, blkno: u32, - ) -> Result, std::io::Error> { + ) -> Result { assert!(buf.len() == PAGE_SZ); self.file - .read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64) + .read_exact_at( + crate::buffer_pool::PageWriteGuardBuf::new(buf), + blkno as u64 * PAGE_SZ as u64, + ) .await + .map(|guard| guard.assume_init()) } /// Read a block. /// @@ -193,25 +191,12 @@ impl FileBlockReader { pub async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + _ctx: &RequestContext, ) -> Result { - let cache = page_cache::get(); - match cache - .read_immutable_buf(self.file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { - ReadBufResult::Found(guard) => Ok(guard.into()), - ReadBufResult::NotFound(write_guard) => { - // Read the page from disk into the buffer - let write_guard = self.fill_buffer(write_guard, blknum).await?; - Ok(write_guard.mark_valid().into()) - } - } + let buf = crate::buffer_pool::get(); + // Read the page from disk into the buffer + let write_guard = self.fill_buffer(buf, blknum).await?; + Ok(BlockLease::PageReadGuard(write_guard)) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 6b8cd77d78..229cb3e896 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -3,8 +3,8 @@ use crate::config::PageServerConf; use crate::context::RequestContext; -use crate::page_cache::{self, PAGE_SZ}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; +use crate::tenant::disk_btree::PAGE_SZ; use crate::virtual_file::{self, VirtualFile}; use camino::Utf8PathBuf; use pageserver_api::shard::TenantShardId; @@ -17,8 +17,6 @@ use tracing::*; use utils::id::TimelineId; pub struct EphemeralFile { - page_cache_file_id: page_cache::FileId, - _tenant_shard_id: TenantShardId, _timeline_id: TimelineId, file: VirtualFile, @@ -55,7 +53,6 @@ impl EphemeralFile { .await?; Ok(EphemeralFile { - page_cache_file_id: page_cache::next_file_id(), _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, file, @@ -71,36 +68,22 @@ impl EphemeralFile { pub(crate) async fn read_blk( &self, blknum: u32, - ctx: &RequestContext, + _ctx: &RequestContext, ) -> Result { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { - let cache = page_cache::get(); - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, ctx) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, self.file.path, e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(write_guard) => { - let write_guard = self - .file - .read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64) - .await?; - let read_guard = write_guard.mark_valid(); - return Ok(BlockLease::PageReadGuard(read_guard)); - } - }; + let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get(); + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + let buf = self + .file + .read_exact_at( + crate::buffer_pool::PageWriteGuardBuf::new(write_guard), + blknum as u64 * PAGE_SZ as u64, + ) + .await? + .assume_init(); + Ok(BlockLease::PageReadGuard(buf)) } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) @@ -131,7 +114,7 @@ impl EphemeralFile { async fn push_bytes( &mut self, src: &[u8], - ctx: &RequestContext, + _ctx: &RequestContext, ) -> Result<(), io::Error> { let mut src_remaining = src; while !src_remaining.is_empty() { @@ -151,33 +134,6 @@ impl EphemeralFile { .await { Ok(_) => { - // Pre-warm the page cache with what we just wrote. - // This isn't necessary for coherency/correctness, but it's how we've always done it. - let cache = page_cache::get(); - match cache - .read_immutable_buf( - self.ephemeral_file.page_cache_file_id, - self.blknum, - ctx, - ) - .await - { - Ok(page_cache::ReadBufResult::Found(_guard)) => { - // This function takes &mut self, so, it shouldn't be possible to reach this point. - unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum); - } - Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - buf.copy_from_slice(&self.ephemeral_file.mutable_tail); - let _ = write_guard.mark_valid(); - // pre-warm successful - } - Err(e) => { - error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); - // fail gracefully, it's not the end of the world if we can't pre-warm the cache here - } - } // Zero the buffer for re-use. // Zeroing is critical for correcntess because the write_blob code below // and similarly read_blk expect zeroed pages. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 3a445ef71e..a5f68ab1e4 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -29,10 +29,10 @@ //! use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; -use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; +use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; use crate::tenant::Timeline; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index c62e6aed51..15c18de9ea 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -25,10 +25,10 @@ //! actual page images are stored in the "values" part. use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; -use crate::page_cache::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; use crate::tenant::blob_io::BlobWriter; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; +use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::disk_btree::{DiskBtreeBuilder, DiskBtreeReader, VisitDirection}; use crate::tenant::storage_layer::{ LayerAccessStats, ValueReconstructResult, ValueReconstructState, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 70c6ee2042..51b615c055 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -33,7 +33,6 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::sync::gate::Gate; -use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; @@ -41,19 +40,24 @@ use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use std::{ cmp::{max, min, Ordering}, + collections::{BTreeMap, BinaryHeap, HashMap, HashSet}, ops::ControlFlow, }; +use crate::tenant::disk_btree::PAGE_SZ; +use crate::tenant::storage_layer::delta_layer::DeltaEntry; +use crate::{ + context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder}, + disk_usage_eviction_task::EvictionCandidate, +}; + +use crate::disk_usage_eviction_task::DiskUsageEvictionInfo; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, metadata::{save_metadata, TimelineMetadata}, par_fsync, }; -use crate::{ - context::{AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder}, - disk_usage_eviction_task::DiskUsageEvictionInfo, -}; use crate::{deletion_queue::DeletionQueueClient, tenant::remote_timeline_client::StopError}; use crate::{ disk_usage_eviction_task::finite_f32, @@ -63,9 +67,6 @@ use crate::{ ValueReconstructState, }, }; -use crate::{ - disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry, -}; use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKind}; use crate::config::PageServerConf; @@ -90,7 +91,6 @@ use utils::{ simple_rcu::{Rcu, RcuReadGuard}, }; -use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::task_mgr; @@ -2555,24 +2555,13 @@ impl Timeline { } } - /// # Cancel-safety - /// - /// This method is cancellation-safe. async fn lookup_cached_page( &self, - key: &Key, - lsn: Lsn, - ctx: &RequestContext, + _key: &Key, + _lsn: Lsn, + _ctx: &RequestContext, ) -> Option<(Lsn, Bytes)> { - let cache = page_cache::get(); - - // FIXME: It's pointless to check the cache for things that are not 8kB pages. - // We should look at the key to determine if it's a cacheable object - let (lsn, read_guard) = cache - .lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx) - .await?; - let img = Bytes::from(read_guard.to_vec()); - Some((lsn, img)) + None } fn get_ancestor_timeline(&self) -> anyhow::Result> { @@ -3618,7 +3607,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; + let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? // min-heap (reserve space for one more element added before eviction) @@ -3870,7 +3859,7 @@ impl Timeline { // Add two pages for potential overhead. This should in theory be already // accounted for in the target calculation, but for very small targets, // we still might easily hit the limit otherwise. - let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2; + let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2; for layer in new_layers.iter() { if layer.layer_desc().file_size > warn_limit { warn!( @@ -4398,8 +4387,6 @@ impl Timeline { trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn); }; - let last_rec_lsn = data.records.last().unwrap().0; - let img = match self .walredo_mgr .request_redo(key, request_lsn, data.img, data.records, self.pg_version) @@ -4410,23 +4397,6 @@ impl Timeline { Err(e) => return Err(PageReconstructError::WalRedo(e)), }; - if img.len() == page_cache::PAGE_SZ { - let cache = page_cache::get(); - if let Err(e) = cache - .memorize_materialized_page( - self.tenant_shard_id, - self.timeline_id, - key, - last_rec_lsn, - &img, - ) - .await - .context("Materialized page memoization failed") - { - return Err(PageReconstructError::from(e)); - } - } - Ok(img) } } diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 04435f8731..53a9aaa09e 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,7 +12,6 @@ //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; -use crate::page_cache::PageWriteGuard; use crate::tenant::TENANTS_SEGMENT_NAME; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; @@ -119,37 +118,6 @@ struct SlotInner { file: Option, } -/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. -struct PageWriteGuardBuf { - page: PageWriteGuard<'static>, - init_up_to: usize, -} -// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot, -// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved. -unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { - fn stable_ptr(&self) -> *const u8 { - self.page.as_ptr() - } - fn bytes_init(&self) -> usize { - self.init_up_to - } - fn bytes_total(&self) -> usize { - self.page.len() - } -} -// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access, -// hence it's safe to hand out the `stable_mut_ptr()`. -unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { - fn stable_mut_ptr(&mut self) -> *mut u8 { - self.page.as_mut_ptr() - } - - unsafe fn set_init(&mut self, pos: usize) { - assert!(pos <= self.page.len()); - self.init_up_to = pos; - } -} - impl OpenFiles { /// Find a slot to use, evicting an existing file descriptor if needed. /// @@ -559,21 +527,6 @@ impl VirtualFile { res.map(|()| buf) } - /// Like [`Self::read_exact_at`] but for [`PageWriteGuard`]. - pub async fn read_exact_at_page( - &self, - page: PageWriteGuard<'static>, - offset: u64, - ) -> Result, Error> { - let buf = PageWriteGuardBuf { - page, - init_up_to: 0, - }; - let res = self.read_exact_at(buf, offset).await; - res.map(|PageWriteGuardBuf { page, .. }| page) - .map_err(|e| Error::new(ErrorKind::Other, e)) - } - // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 pub async fn write_all_at(&self, mut buf: &[u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() {