diff --git a/pageserver/ctl/src/layer_map_analyzer.rs b/pageserver/ctl/src/layer_map_analyzer.rs index daf529b4e7..f970cc1c56 100644 --- a/pageserver/ctl/src/layer_map_analyzer.rs +++ b/pageserver/ctl/src/layer_map_analyzer.rs @@ -8,7 +8,7 @@ use std::collections::BinaryHeap; use std::ops::Range; use std::{fs, path::Path, str}; -use pageserver::page_cache::PAGE_SZ; +use crate::tenant::disk_btree::PAGE_SZ; use pageserver::repository::{Key, KEY_SIZE}; use pageserver::tenant::block_io::FileBlockReader; use pageserver::tenant::disk_btree::{DiskBtreeReader, VisitDirection}; diff --git a/pageserver/src/buffer_pool.rs b/pageserver/src/buffer_pool.rs new file mode 100644 index 0000000000..00ba42abec --- /dev/null +++ b/pageserver/src/buffer_pool.rs @@ -0,0 +1,13 @@ +use crate::tenant::disk_btree::PAGE_SZ; + + + +pub(crate) type Buffer = Box<[u8; PAGE_SZ]>; + +pub(crate) fn get() -> Buffer { + todo!() +} + +pub(crate) fn put(buf: Buffer) { + todo!() +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index cb20caba1f..99efe1d225 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -8,7 +8,6 @@ pub mod http; pub mod import_datadir; pub mod keyspace; pub mod metrics; -pub mod page_cache; pub mod page_service; pub mod pgdatadir_mapping; pub mod repository; @@ -28,6 +27,8 @@ use std::path::Path; use crate::task_mgr::TaskKind; use tracing::info; +pub mod buffer_pool; + /// Current storage format version /// /// This is embedded in the header of all the layer files. diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs deleted file mode 100644 index 241de208b2..0000000000 --- a/pageserver/src/page_cache.rs +++ /dev/null @@ -1,852 +0,0 @@ -//! -//! Global page cache -//! -//! The page cache uses up most of the memory in the page server. It is shared -//! by all tenants, and it is used to store different kinds of pages. Sharing -//! the cache allows memory to be dynamically allocated where it's needed the -//! most. -//! -//! The page cache consists of fixed-size buffers, 8 kB each to match the -//! PostgreSQL buffer size, and a Slot struct for each buffer to contain -//! information about what's stored in the buffer. -//! -//! # Types Of Pages -//! -//! [`PageCache`] only supports immutable pages. -//! Hence there is no need to worry about coherency. -//! -//! Two types of pages are supported: -//! -//! * **Materialized pages**, filled & used by page reconstruction -//! * **Immutable File pages**, filled & used by [`crate::tenant::block_io`] and [`crate::tenant::ephemeral_file`]. -//! -//! Note that [`crate::tenant::ephemeral_file::EphemeralFile`] is generally mutable, but, it's append-only. -//! It uses the page cache only for the blocks that are already fully written and immutable. -//! -//! # Filling The Page Cache -//! -//! Page cache maps from a cache key to a buffer slot. -//! The cache key uniquely identifies the piece of data that is being cached. -//! -//! The cache key for **materialized pages** is [`TenantId`], [`TimelineId`], [`Key`], and [`Lsn`]. -//! Use [`PageCache::memorize_materialized_page`] and [`PageCache::lookup_materialized_page`] for fill & access. -//! -//! The cache key for **immutable file** pages is [`FileId`] and a block number. -//! Users of page cache that wish to page-cache an arbitrary (immutable!) on-disk file do the following: -//! * Have a mechanism to deterministically associate the on-disk file with a [`FileId`]. -//! * Get a [`FileId`] using [`next_file_id`]. -//! * Use the mechanism to associate the on-disk file with the returned [`FileId`]. -//! * Use [`PageCache::read_immutable_buf`] to get a [`ReadBufResult`]. -//! * If the page was already cached, it'll be the [`ReadBufResult::Found`] variant that contains -//! a read guard for the page. Just use it. -//! * If the page was not cached, it'll be the [`ReadBufResult::NotFound`] variant that contains -//! a write guard for the page. Fill the page with the contents of the on-disk file. -//! Then call [`PageWriteGuard::mark_valid`] to mark the page as valid. -//! Then try again to [`PageCache::read_immutable_buf`]. -//! Unless there's high cache pressure, the page should now be cached. -//! (TODO: allow downgrading the write guard to a read guard to ensure forward progress.) -//! -//! # Locking -//! -//! There are two levels of locking involved: There's one lock for the "mapping" -//! from page identifier (tenant ID, timeline ID, rel, block, LSN) to the buffer -//! slot, and a separate lock on each slot. To read or write the contents of a -//! slot, you must hold the lock on the slot in read or write mode, -//! respectively. To change the mapping of a slot, i.e. to evict a page or to -//! assign a buffer for a page, you must hold the mapping lock and the lock on -//! the slot at the same time. -//! -//! Whenever you need to hold both locks simultaneously, the slot lock must be -//! acquired first. This consistent ordering avoids deadlocks. To look up a page -//! in the cache, you would first look up the mapping, while holding the mapping -//! lock, and then lock the slot. You must release the mapping lock in between, -//! to obey the lock ordering and avoid deadlock. -//! -//! A slot can momentarily have invalid contents, even if it's already been -//! inserted to the mapping, but you must hold the write-lock on the slot until -//! the contents are valid. If you need to release the lock without initializing -//! the contents, you must remove the mapping first. We make that easy for the -//! callers with PageWriteGuard: when lock_for_write() returns an uninitialized -//! page, the caller must explicitly call guard.mark_valid() after it has -//! initialized it. If the guard is dropped without calling mark_valid(), the -//! mapping is automatically removed and the slot is marked free. -//! - -use std::{ - collections::{hash_map::Entry, HashMap}, - convert::TryInto, - sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering}, - sync::RwLock as SyncRwLock, -}; - -use anyhow::Context; -use once_cell::sync::OnceCell; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use utils::{ - id::{TenantId, TimelineId}, - lsn::Lsn, -}; - -use crate::{metrics::PageCacheSizeMetrics, repository::Key}; - -static PAGE_CACHE: OnceCell = OnceCell::new(); -const TEST_PAGE_CACHE_SIZE: usize = 50; - -/// -/// Initialize the page cache. This must be called once at page server startup. -/// -pub fn init(size: usize) { - if PAGE_CACHE.set(PageCache::new(size)).is_err() { - panic!("page cache already initialized"); - } -} - -/// -/// Get a handle to the page cache. -/// -pub fn get() -> &'static PageCache { - // - // In unit tests, page server startup doesn't happen and no one calls - // page_cache::init(). Initialize it here with a tiny cache, so that the - // page cache is usable in unit tests. - // - if cfg!(test) { - PAGE_CACHE.get_or_init(|| PageCache::new(TEST_PAGE_CACHE_SIZE)) - } else { - PAGE_CACHE.get().expect("page cache not initialized") - } -} - -pub const PAGE_SZ: usize = postgres_ffi::BLCKSZ as usize; -const MAX_USAGE_COUNT: u8 = 5; - -/// See module-level comment. -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] -pub struct FileId(u64); - -static NEXT_ID: AtomicU64 = AtomicU64::new(1); - -/// See module-level comment. -pub fn next_file_id() -> FileId { - FileId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) -} - -/// -/// CacheKey uniquely identifies a "thing" to cache in the page cache. -/// -#[derive(Debug, PartialEq, Eq, Clone)] -#[allow(clippy::enum_variant_names)] -enum CacheKey { - MaterializedPage { - hash_key: MaterializedPageHashKey, - lsn: Lsn, - }, - ImmutableFilePage { - file_id: FileId, - blkno: u32, - }, -} - -#[derive(Debug, PartialEq, Eq, Hash, Clone)] -struct MaterializedPageHashKey { - tenant_id: TenantId, - timeline_id: TimelineId, - key: Key, -} - -#[derive(Clone)] -struct Version { - lsn: Lsn, - slot_idx: usize, -} - -struct Slot { - inner: RwLock, - usage_count: AtomicU8, -} - -struct SlotInner { - key: Option, - buf: &'static mut [u8; PAGE_SZ], -} - -impl Slot { - /// Increment usage count on the buffer, with ceiling at MAX_USAGE_COUNT. - fn inc_usage_count(&self) { - let _ = self - .usage_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { - if val == MAX_USAGE_COUNT { - None - } else { - Some(val + 1) - } - }); - } - - /// Decrement usage count on the buffer, unless it's already zero. Returns - /// the old usage count. - fn dec_usage_count(&self) -> u8 { - let count_res = - self.usage_count - .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |val| { - if val == 0 { - None - } else { - Some(val - 1) - } - }); - - match count_res { - Ok(usage_count) => usage_count, - Err(usage_count) => usage_count, - } - } -} - -pub struct PageCache { - /// This contains the mapping from the cache key to buffer slot that currently - /// contains the page, if any. - /// - /// TODO: This is protected by a single lock. If that becomes a bottleneck, - /// this HashMap can be replaced with a more concurrent version, there are - /// plenty of such crates around. - /// - /// If you add support for caching different kinds of objects, each object kind - /// can have a separate mapping map, next to this field. - materialized_page_map: SyncRwLock>>, - - immutable_page_map: SyncRwLock>, - - /// The actual buffers with their metadata. - slots: Box<[Slot]>, - - /// Index of the next candidate to evict, for the Clock replacement algorithm. - /// This is interpreted modulo the page cache size. - next_evict_slot: AtomicUsize, - - size_metrics: &'static PageCacheSizeMetrics, -} - -/// -/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked -/// until the guard is dropped. -/// -pub struct PageReadGuard<'i>(RwLockReadGuard<'i, SlotInner>); - -impl std::ops::Deref for PageReadGuard<'_> { - type Target = [u8; PAGE_SZ]; - - fn deref(&self) -> &Self::Target { - self.0.buf - } -} - -impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> { - fn as_ref(&self) -> &[u8; PAGE_SZ] { - self.0.buf - } -} - -/// -/// PageWriteGuard is a lease on a buffer for modifying it. The page is kept locked -/// until the guard is dropped. -/// -/// Counterintuitively, this is used even for a read, if the requested page is not -/// currently found in the page cache. In that case, the caller of lock_for_read() -/// is expected to fill in the page contents and call mark_valid(). Similarly -/// lock_for_write() can return an invalid buffer that the caller is expected to -/// to initialize. -/// -pub struct PageWriteGuard<'i> { - inner: RwLockWriteGuard<'i, SlotInner>, - - // Are the page contents currently valid? - valid: bool, -} - -impl std::ops::DerefMut for PageWriteGuard<'_> { - fn deref_mut(&mut self) -> &mut Self::Target { - self.inner.buf - } -} - -impl std::ops::Deref for PageWriteGuard<'_> { - type Target = [u8; PAGE_SZ]; - - fn deref(&self) -> &Self::Target { - self.inner.buf - } -} - -impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> { - fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] { - self.inner.buf - } -} - -impl PageWriteGuard<'_> { - /// Mark that the buffer contents are now valid. - pub fn mark_valid(&mut self) { - assert!(self.inner.key.is_some()); - assert!( - !self.valid, - "mark_valid called on a buffer that was already valid" - ); - self.valid = true; - } -} - -impl Drop for PageWriteGuard<'_> { - /// - /// If the buffer was allocated for a page that was not already in the - /// cache, but the lock_for_read/write() caller dropped the buffer without - /// initializing it, remove the mapping from the page cache. - /// - fn drop(&mut self) { - assert!(self.inner.key.is_some()); - if !self.valid { - let self_key = self.inner.key.as_ref().unwrap(); - PAGE_CACHE.get().unwrap().remove_mapping(self_key); - self.inner.key = None; - } - } -} - -/// lock_for_read() return value -pub enum ReadBufResult<'a> { - Found(PageReadGuard<'a>), - NotFound(PageWriteGuard<'a>), -} - -/// lock_for_write() return value -pub enum WriteBufResult<'a> { - Found(PageWriteGuard<'a>), - NotFound(PageWriteGuard<'a>), -} - -impl PageCache { - // - // Section 1.1: Public interface functions for looking up and memorizing materialized page - // versions in the page cache - // - - /// Look up a materialized page version. - /// - /// The 'lsn' is an upper bound, this will return the latest version of - /// the given block, but not newer than 'lsn'. Returns the actual LSN of the - /// returned page. - pub async fn lookup_materialized_page( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - key: &Key, - lsn: Lsn, - ) -> Option<(Lsn, PageReadGuard)> { - crate::metrics::PAGE_CACHE - .read_accesses_materialized_page - .inc(); - - let mut cache_key = CacheKey::MaterializedPage { - hash_key: MaterializedPageHashKey { - tenant_id, - timeline_id, - key: *key, - }, - lsn, - }; - - if let Some(guard) = self.try_lock_for_read(&mut cache_key).await { - if let CacheKey::MaterializedPage { - hash_key: _, - lsn: available_lsn, - } = cache_key - { - if available_lsn == lsn { - crate::metrics::PAGE_CACHE - .read_hits_materialized_page_exact - .inc(); - } else { - crate::metrics::PAGE_CACHE - .read_hits_materialized_page_older_lsn - .inc(); - } - Some((available_lsn, guard)) - } else { - panic!("unexpected key type in slot"); - } - } else { - None - } - } - - /// - /// Store an image of the given page in the cache. - /// - pub async fn memorize_materialized_page( - &self, - tenant_id: TenantId, - timeline_id: TimelineId, - key: Key, - lsn: Lsn, - img: &[u8], - ) -> anyhow::Result<()> { - let cache_key = CacheKey::MaterializedPage { - hash_key: MaterializedPageHashKey { - tenant_id, - timeline_id, - key, - }, - lsn, - }; - - match self.lock_for_write(&cache_key).await? { - WriteBufResult::Found(write_guard) => { - // We already had it in cache. Another thread must've put it there - // concurrently. Check that it had the same contents that we - // replayed. - assert!(*write_guard == img); - } - WriteBufResult::NotFound(mut write_guard) => { - write_guard.copy_from_slice(img); - write_guard.mark_valid(); - } - } - - Ok(()) - } - - // Section 1.2: Public interface functions for working with immutable file pages. - - pub async fn read_immutable_buf( - &self, - file_id: FileId, - blkno: u32, - ) -> anyhow::Result { - let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno }; - - self.lock_for_read(&mut cache_key).await - } - - // - // Section 2: Internal interface functions for lookup/update. - // - // To add support for a new kind of "thing" to cache, you will need - // to add public interface routines above, and code to deal with the - // "mappings" after this section. But the routines in this section should - // not require changes. - - /// Look up a page in the cache. - /// - /// If the search criteria is not exact, *cache_key is updated with the key - /// for exact key of the returned page. (For materialized pages, that means - /// that the LSN in 'cache_key' is updated with the LSN of the returned page - /// version.) - /// - /// If no page is found, returns None and *cache_key is left unmodified. - /// - async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option { - let cache_key_orig = cache_key.clone(); - if let Some(slot_idx) = self.search_mapping(cache_key).await { - // The page was found in the mapping. Lock the slot, and re-check - // that it's still what we expected (because we released the mapping - // lock already, another thread could have evicted the page) - let slot = &self.slots[slot_idx]; - let inner = slot.inner.read().await; - if inner.key.as_ref() == Some(cache_key) { - slot.inc_usage_count(); - return Some(PageReadGuard(inner)); - } else { - // search_mapping might have modified the search key; restore it. - *cache_key = cache_key_orig; - } - } - None - } - - /// Return a locked buffer for given block. - /// - /// Like try_lock_for_read(), if the search criteria is not exact and the - /// page is already found in the cache, *cache_key is updated. - /// - /// If the page is not found in the cache, this allocates a new buffer for - /// it. The caller may then initialize the buffer with the contents, and - /// call mark_valid(). - /// - /// Example usage: - /// - /// ```ignore - /// let cache = page_cache::get(); - /// - /// match cache.lock_for_read(&key) { - /// ReadBufResult::Found(read_guard) => { - /// // The page was found in cache. Use it - /// }, - /// ReadBufResult::NotFound(write_guard) => { - /// // The page was not found in cache. Read it from disk into the - /// // buffer. - /// //read_my_page_from_disk(write_guard); - /// - /// // The buffer contents are now valid. Tell the page cache. - /// write_guard.mark_valid(); - /// }, - /// } - /// ``` - /// - async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result { - let (read_access, hit) = match cache_key { - CacheKey::MaterializedPage { .. } => { - unreachable!("Materialized pages use lookup_materialized_page") - } - CacheKey::ImmutableFilePage { .. } => ( - &crate::metrics::PAGE_CACHE.read_accesses_immutable, - &crate::metrics::PAGE_CACHE.read_hits_immutable, - ), - }; - read_access.inc(); - - let mut is_first_iteration = true; - loop { - // First check if the key already exists in the cache. - if let Some(read_guard) = self.try_lock_for_read(cache_key).await { - if is_first_iteration { - hit.inc(); - } - return Ok(ReadBufResult::Found(read_guard)); - } - is_first_iteration = false; - - // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim() - .await - .context("Failed to find evict victim")?; - - // Insert mapping for this. At this point, we may find that another - // thread did the same thing concurrently. In that case, we evicted - // our victim buffer unnecessarily. Put it into the free list and - // continue with the slot that the other thread chose. - if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) { - // TODO: put to free list - - // We now just loop back to start from beginning. This is not - // optimal, we'll perform the lookup in the mapping again, which - // is not really necessary because we already got - // 'existing_slot_idx'. But this shouldn't happen often enough - // to matter much. - continue; - } - - // Make the slot ready - let slot = &self.slots[slot_idx]; - inner.key = Some(cache_key.clone()); - slot.usage_count.store(1, Ordering::Relaxed); - - return Ok(ReadBufResult::NotFound(PageWriteGuard { - inner, - valid: false, - })); - } - } - - /// Look up a page in the cache and lock it in write mode. If it's not - /// found, returns None. - /// - /// When locking a page for writing, the search criteria is always "exact". - async fn try_lock_for_write(&self, cache_key: &CacheKey) -> Option { - if let Some(slot_idx) = self.search_mapping_for_write(cache_key) { - // The page was found in the mapping. Lock the slot, and re-check - // that it's still what we expected (because we don't released the mapping - // lock already, another thread could have evicted the page) - let slot = &self.slots[slot_idx]; - let inner = slot.inner.write().await; - if inner.key.as_ref() == Some(cache_key) { - slot.inc_usage_count(); - return Some(PageWriteGuard { inner, valid: true }); - } - } - None - } - - /// Return a write-locked buffer for given block. - /// - /// Similar to lock_for_read(), but the returned buffer is write-locked and - /// may be modified by the caller even if it's already found in the cache. - async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result { - loop { - // First check if the key already exists in the cache. - if let Some(write_guard) = self.try_lock_for_write(cache_key).await { - return Ok(WriteBufResult::Found(write_guard)); - } - - // Not found. Find a victim buffer - let (slot_idx, mut inner) = self - .find_victim() - .await - .context("Failed to find evict victim")?; - - // Insert mapping for this. At this point, we may find that another - // thread did the same thing concurrently. In that case, we evicted - // our victim buffer unnecessarily. Put it into the free list and - // continue with the slot that the other thread chose. - if let Some(_existing_slot_idx) = self.try_insert_mapping(cache_key, slot_idx) { - // TODO: put to free list - - // We now just loop back to start from beginning. This is not - // optimal, we'll perform the lookup in the mapping again, which - // is not really necessary because we already got - // 'existing_slot_idx'. But this shouldn't happen often enough - // to matter much. - continue; - } - - // Make the slot ready - let slot = &self.slots[slot_idx]; - inner.key = Some(cache_key.clone()); - slot.usage_count.store(1, Ordering::Relaxed); - - return Ok(WriteBufResult::NotFound(PageWriteGuard { - inner, - valid: false, - })); - } - } - - // - // Section 3: Mapping functions - // - - /// Search for a page in the cache using the given search key. - /// - /// Returns the slot index, if any. If the search criteria is not exact, - /// *cache_key is updated with the actual key of the found page. - /// - /// NOTE: We don't hold any lock on the mapping on return, so the slot might - /// get recycled for an unrelated page immediately after this function - /// returns. The caller is responsible for re-checking that the slot still - /// contains the page with the same key before using it. - /// - async fn search_mapping(&self, cache_key: &mut CacheKey) -> Option { - match cache_key { - CacheKey::MaterializedPage { hash_key, lsn } => { - let map = self.materialized_page_map.read().unwrap(); - let versions = map.get(hash_key)?; - - let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) { - Ok(version_idx) => version_idx, - Err(0) => return None, - Err(version_idx) => version_idx - 1, - }; - let version = &versions[version_idx]; - *lsn = version.lsn; - Some(version.slot_idx) - } - CacheKey::ImmutableFilePage { file_id, blkno } => { - let map = self.immutable_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } - } - } - - /// Search for a page in the cache using the given search key. - /// - /// Like 'search_mapping, but performs an "exact" search. Used for - /// allocating a new buffer. - fn search_mapping_for_write(&self, key: &CacheKey) -> Option { - match key { - CacheKey::MaterializedPage { hash_key, lsn } => { - let map = self.materialized_page_map.read().unwrap(); - let versions = map.get(hash_key)?; - - if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) { - Some(versions[version_idx].slot_idx) - } else { - None - } - } - CacheKey::ImmutableFilePage { file_id, blkno } => { - let map = self.immutable_page_map.read().unwrap(); - Some(*map.get(&(*file_id, *blkno))?) - } - } - } - - /// - /// Remove mapping for given key. - /// - fn remove_mapping(&self, old_key: &CacheKey) { - match old_key { - CacheKey::MaterializedPage { - hash_key: old_hash_key, - lsn: old_lsn, - } => { - let mut map = self.materialized_page_map.write().unwrap(); - if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) { - let versions = old_entry.get_mut(); - - if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) { - versions.remove(version_idx); - self.size_metrics - .current_bytes_materialized_page - .sub_page_sz(1); - if versions.is_empty() { - old_entry.remove_entry(); - } - } - } else { - panic!("could not find old key in mapping") - } - } - CacheKey::ImmutableFilePage { file_id, blkno } => { - let mut map = self.immutable_page_map.write().unwrap(); - map.remove(&(*file_id, *blkno)) - .expect("could not find old key in mapping"); - self.size_metrics.current_bytes_immutable.sub_page_sz(1); - } - } - } - - /// - /// Insert mapping for given key. - /// - /// If a mapping already existed for the given key, returns the slot index - /// of the existing mapping and leaves it untouched. - fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option { - match new_key { - CacheKey::MaterializedPage { - hash_key: new_key, - lsn: new_lsn, - } => { - let mut map = self.materialized_page_map.write().unwrap(); - let versions = map.entry(new_key.clone()).or_default(); - match versions.binary_search_by_key(new_lsn, |v| v.lsn) { - Ok(version_idx) => Some(versions[version_idx].slot_idx), - Err(version_idx) => { - versions.insert( - version_idx, - Version { - lsn: *new_lsn, - slot_idx, - }, - ); - self.size_metrics - .current_bytes_materialized_page - .add_page_sz(1); - None - } - } - } - - CacheKey::ImmutableFilePage { file_id, blkno } => { - let mut map = self.immutable_page_map.write().unwrap(); - match map.entry((*file_id, *blkno)) { - Entry::Occupied(entry) => Some(*entry.get()), - Entry::Vacant(entry) => { - entry.insert(slot_idx); - self.size_metrics.current_bytes_immutable.add_page_sz(1); - None - } - } - } - } - } - - // - // Section 4: Misc internal helpers - // - - /// Find a slot to evict. - /// - /// On return, the slot is empty and write-locked. - async fn find_victim(&self) -> anyhow::Result<(usize, RwLockWriteGuard)> { - let iter_limit = self.slots.len() * 10; - let mut iters = 0; - loop { - iters += 1; - let slot_idx = self.next_evict_slot.fetch_add(1, Ordering::Relaxed) % self.slots.len(); - - let slot = &self.slots[slot_idx]; - - if slot.dec_usage_count() == 0 { - let mut inner = match slot.inner.try_write() { - Ok(inner) => inner, - Err(_err) => { - // If we have looped through the whole buffer pool 10 times - // and still haven't found a victim buffer, something's wrong. - // Maybe all the buffers were in locked. That could happen in - // theory, if you have more threads holding buffers locked than - // there are buffers in the pool. In practice, with a reasonably - // large buffer pool it really shouldn't happen. - if iters > iter_limit { - anyhow::bail!("exceeded evict iter limit"); - } - continue; - } - }; - if let Some(old_key) = &inner.key { - // remove mapping for old buffer - self.remove_mapping(old_key); - inner.key = None; - } - return Ok((slot_idx, inner)); - } - } - } - - /// Initialize a new page cache - /// - /// This should be called only once at page server startup. - fn new(num_pages: usize) -> Self { - assert!(num_pages > 0, "page cache size must be > 0"); - - let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice()); - - let size_metrics = &crate::metrics::PAGE_CACHE_SIZE; - size_metrics.max_bytes.set_page_sz(num_pages); - size_metrics.current_bytes_immutable.set_page_sz(0); - size_metrics.current_bytes_materialized_page.set_page_sz(0); - - let slots = page_buffer - .chunks_exact_mut(PAGE_SZ) - .map(|chunk| { - let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap(); - - Slot { - inner: RwLock::new(SlotInner { key: None, buf }), - usage_count: AtomicU8::new(0), - } - }) - .collect(); - - Self { - materialized_page_map: Default::default(), - immutable_page_map: Default::default(), - slots, - next_evict_slot: AtomicUsize::new(0), - size_metrics, - } - } -} - -trait PageSzBytesMetric { - fn set_page_sz(&self, count: usize); - fn add_page_sz(&self, count: usize); - fn sub_page_sz(&self, count: usize); -} - -#[inline(always)] -fn count_times_page_sz(count: usize) -> u64 { - u64::try_from(count).unwrap() * u64::try_from(PAGE_SZ).unwrap() -} - -impl PageSzBytesMetric for metrics::UIntGauge { - fn set_page_sz(&self, count: usize) { - self.set(count_times_page_sz(count)); - } - fn add_page_sz(&self, count: usize) { - self.add(count_times_page_sz(count)); - } - fn sub_page_sz(&self, count: usize) { - self.sub(count_times_page_sz(count)); - } -} diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index f5ff15b50c..fa40d65433 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -11,11 +11,12 @@ //! len < 128: 0XXXXXXX //! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX //! -use crate::page_cache::PAGE_SZ; use crate::tenant::block_io::BlockCursor; use std::cmp::min; use std::io::{Error, ErrorKind}; +use super::disk_btree::PAGE_SZ; + impl<'a> BlockCursor<'a> { /// Read a blob into a new buffer. pub async fn read_blob(&self, offset: u64) -> Result, std::io::Error> { diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index fd13227843..7f01f81a41 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -4,7 +4,7 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; -use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; +use crate::tenant::disk_btree::PAGE_SZ; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::fs::File; @@ -36,14 +36,14 @@ where /// Reference to an in-memory copy of an immutable on-disk block. pub enum BlockLease<'a> { - PageReadGuard(PageReadGuard<'static>), + PageReadGuard(crate::buffer_pool::Buffer), EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), } -impl From> for BlockLease<'static> { - fn from(value: PageReadGuard<'static>) -> BlockLease<'static> { +impl From for BlockLease<'static> { + fn from(value: crate::buffer_pool::Buffer) -> BlockLease<'static> { BlockLease::PageReadGuard(value) } } @@ -158,15 +158,20 @@ impl FileBlockReader { } } -use crate::page_cache::PageWriteGuard; macro_rules! impls { (FileBlockReader<$ty:ty>) => { impl FileBlockReader<$ty> { /// Read a page from the underlying file into given buffer. - async fn fill_buffer(&self, buf: PageWriteGuard<'static>, blkno: u32) -> Result, std::io::Error> { + async fn fill_buffer( + &self, + buf: crate::buffer_pool::Buffer, + blkno: u32, + ) -> Result { assert!(buf.len() == PAGE_SZ); - self.file.read_exact_at_async(buf, blkno as u64 * PAGE_SZ as u64).await + self.file + .read_exact_at_async(buf, blkno as u64 * PAGE_SZ as u64) + .await } /// Read a block. /// @@ -174,29 +179,10 @@ macro_rules! impls { /// access to the contents of the page. (For the page cache, the /// lease object represents a lock on the buffer.) pub async fn read_blk(&self, blknum: u32) -> Result { - // Look up the right page - let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.file_id, blknum) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Failed to read immutable buf: {e:#}"), - ) - })? { - ReadBufResult::Found(guard) => break Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { - // Read the page from disk into the buffer - let mut write_guard = self.fill_buffer(write_guard, blknum).await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; - } + let buf = crate::buffer_pool::get(); + // Read the page from disk into the buffer + let mut write_guard = self.fill_buffer(buf, blknum).await?; + todo!() } } }; diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index de0c8c4a36..fbb2042aba 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -2,8 +2,8 @@ //! used to keep in-memory layers spilled on disk. use crate::config::PageServerConf; -use crate::page_cache::{self, PAGE_SZ, PageWriteGuard}; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; +use crate::tenant::disk_btree::PAGE_SZ; use crate::virtual_file::VirtualFile; use std::cmp::min; use std::fs::OpenOptions; @@ -63,39 +63,14 @@ impl EphemeralFile { pub(crate) async fn read_blk(&self, blknum: u32) -> Result { let flushed_blknums = 0..self.len / PAGE_SZ as u64; if flushed_blknums.contains(&(blknum as u64)) { - let cache = page_cache::get(); - loop { - match cache - .read_immutable_buf(self.page_cache_file_id, blknum) - .await - .map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - // order path before error because error is anyhow::Error => might have many contexts - format!( - "ephemeral file: read immutable page #{}: {}: {:#}", - blknum, - self.file.path.display(), - e, - ), - ) - })? { - page_cache::ReadBufResult::Found(guard) => { - return Ok(BlockLease::PageReadGuard(guard)) - } - page_cache::ReadBufResult::NotFound(write_guard) => { - let mut write_guard: PageWriteGuard<'static> = write_guard; - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - let mut write_guard = self.file - .read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64).await?; - write_guard.mark_valid(); - - // Swap for read lock - continue; - } - }; - } + let mut write_guard: crate::buffer_pool::Buffer = crate::buffer_pool::get(); + let buf: &mut [u8] = write_guard.deref_mut(); + debug_assert_eq!(buf.len(), PAGE_SZ); + let mut buf = self + .file + .read_exact_at_async(write_guard, blknum as u64 * PAGE_SZ as u64) + .await?; + Ok(BlockLease::PageReadGuard(buf)) } else { debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64); Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail)) @@ -133,32 +108,6 @@ impl EphemeralFile { self.blknum as u64 * PAGE_SZ as u64, ) { Ok(_) => { - // Pre-warm the page cache with what we just wrote. - // This isn't necessary for coherency/correctness, but it's how we've always done it. - let cache = page_cache::get(); - match cache - .read_immutable_buf( - self.ephemeral_file.page_cache_file_id, - self.blknum, - ) - .await - { - Ok(page_cache::ReadBufResult::Found(_guard)) => { - // This function takes &mut self, so, it shouldn't be possible to reach this point. - unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum); - } - Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - buf.copy_from_slice(&self.ephemeral_file.mutable_tail); - write_guard.mark_valid(); - // pre-warm successful - } - Err(e) => { - error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); - // fail gracefully, it's not the end of the world if we can't pre-warm the cache here - } - } // Zero the buffer for re-use. // Zeroing is critical for correcntess because the write_blob code below // and similarly read_blk expect zeroed pages. diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index b0d3942354..a044e2997c 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -29,7 +29,7 @@ //! use crate::config::PageServerConf; use crate::context::RequestContext; -use crate::page_cache::PAGE_SZ; +use crate::tenant::disk_btree::PAGE_SZ; use crate::repository::{Key, Value, KEY_SIZE}; use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockCursor, BlockLease, BlockReader, FileBlockReader}; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 27aa239f6f..71b0f55d22 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -25,7 +25,7 @@ //! actual page images are stored in the "values" part. use crate::config::PageServerConf; use crate::context::RequestContext; -use crate::page_cache::PAGE_SZ; +use crate::tenant::disk_btree::PAGE_SZ; use crate::repository::{Key, KEY_SIZE}; use crate::tenant::blob_io::{BlobWriter, WriteBlobWriter}; use crate::tenant::block_io::{BlockBuf, BlockReader, FileBlockReader}; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 96b17a8c32..4054475277 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -38,6 +38,7 @@ use std::time::{Duration, Instant, SystemTime}; use crate::context::{ AccessStatsBehavior, DownloadBehavior, RequestContext, RequestContextBuilder, }; +use crate::tenant::disk_btree::PAGE_SZ; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::storage_layer::delta_layer::DeltaEntry; use crate::tenant::storage_layer::{ @@ -73,7 +74,6 @@ use utils::{ simple_rcu::{Rcu, RcuReadGuard}, }; -use crate::page_cache; use crate::repository::GcResult; use crate::repository::{Key, Value}; use crate::task_mgr; @@ -3368,7 +3368,7 @@ impl Timeline { // Determine N largest holes where N is number of compacted layers. let max_holes = deltas_to_compact.len(); let last_record_lsn = self.get_last_record_lsn(); - let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128; + let min_hole_range = (target_file_size / PAGE_SZ as u64) as i128; let min_hole_coverage_size = 3; // TODO: something more flexible? // min-heap (reserve space for one more element added before eviction) @@ -3604,7 +3604,7 @@ impl Timeline { // Add two pages for potential overhead. This should in theory be already // accounted for in the target calculation, but for very small targets, // we still might easily hit the limit otherwise. - let warn_limit = target_file_size * 2 + page_cache::PAGE_SZ as u64 * 2; + let warn_limit = target_file_size * 2 + PAGE_SZ as u64 * 2; for layer in new_layers.iter() { if layer.layer_desc().file_size > warn_limit { warn!( @@ -4192,7 +4192,7 @@ impl Timeline { Err(e) => return Err(PageReconstructError::from(e)), }; - if img.len() == page_cache::PAGE_SZ { + if img.len() == PAGE_SZ { let cache = page_cache::get(); if let Err(e) = cache .memorize_materialized_page( diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 5929a594ae..b16ba2b5be 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,7 +11,6 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; -use crate::page_cache::PageWriteGuard; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom, Write}; @@ -284,9 +283,9 @@ impl VirtualFile { // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 pub async fn read_exact_at_async( &self, - mut write_guard: PageWriteGuard<'static>, + mut write_guard: crate::buffer_pool::Buffer, offset: u64, - ) -> Result, Error> { + ) -> Result { let file = self.handle.lock().unwrap().take().unwrap(); let put_back = AtomicBool::new(false); let put_back_ref = &put_back; @@ -297,7 +296,7 @@ impl VirtualFile { }; let system = tokio_epoll_uring::thread_local_system().await; struct PageWriteGuardBuf { - buf: PageWriteGuard<'static>, + buf: crate::buffer_pool::Buffer, init_up_to: usize, } unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {