diff --git a/Cargo.lock b/Cargo.lock index 6e91363de8..b0de01d14d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2716,6 +2716,15 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "lru" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -3337,6 +3346,7 @@ dependencies = [ "humantime-serde", "hyper", "itertools", + "lru", "md5", "metrics", "nix 0.27.1", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index e44501d1ed..0dd9fcd0ae 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -86,6 +86,7 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +lru = "0.12.2" [dev-dependencies] criterion.workspace = true diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index c7a6079e2c..81f248e8ed 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -284,4 +284,13 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn set_request_lru_size(&self, size: usize) -> Result<()> { + let uri = format!("{}/v1/set_req_lru_size", self.mgmt_api_endpoint); + self.request(Method::PUT, uri, size) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } } diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index e54e3f6e35..f9559d1823 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -53,6 +53,8 @@ pub(crate) struct Args { keyspace_cache: Option, #[clap(long)] set_io_engine: Option, + #[clap(long)] + set_req_lru_size: Option, targets: Option>, } @@ -109,6 +111,10 @@ async fn main_impl( mgmt_api_client.set_io_engine(engine_str).await?; } + if let Some(req_lru_size) = &args.set_req_lru_size { + mgmt_api_client.set_request_lru_size(*req_lru_size).await?; + } + // discover targets let timelines: Vec = crate::util::cli::targets::discover( &mgmt_api_client, diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index ee331ea154..14f38683a8 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -86,7 +86,9 @@ //! [`RequestContext`] argument. Functions in the middle of the call chain //! only need to pass it on. -use crate::task_mgr::TaskKind; +use std::sync::{Arc, Mutex}; + +use crate::{buffer_pool, page_cache, task_mgr::TaskKind}; // The main structure of this module, see module-level comment. #[derive(Clone, Debug)] @@ -95,6 +97,8 @@ pub struct RequestContext { download_behavior: DownloadBehavior, access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, + pub(crate) buf_cache: + Option>>>>, } /// The kind of access to the page cache. @@ -150,6 +154,7 @@ impl RequestContextBuilder { download_behavior: DownloadBehavior::Download, access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, + buf_cache: None, }, } } @@ -163,6 +168,7 @@ impl RequestContextBuilder { download_behavior: original.download_behavior, access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, + buf_cache: original.buf_cache.as_ref().map(Arc::clone), }, } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e820dd6dce..793e6e2dc9 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -2063,5 +2063,17 @@ pub fn make_router( } api_handler(r, set_io_engine_handler) }) + .put("/v1/set_req_lru_size", |r| { + async fn set_req_lru_size_handler( + mut r: Request, + _cancel: CancellationToken, + ) -> Result, ApiError> { + let size: usize = json_request(&mut r).await?; + crate::tenant::timeline::REQ_LRU_SIZE + .store(size, std::sync::atomic::Ordering::Relaxed); + json_response(StatusCode::OK, ()) + } + api_handler(r, set_req_lru_size_handler) + }) .any(handler_404)) } diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 064df52f9e..708f2b02df 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -137,9 +137,9 @@ pub fn next_file_id() -> FileId { /// /// CacheKey uniquely identifies a "thing" to cache in the page cache. /// -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, Hash)] #[allow(clippy::enum_variant_names)] -enum CacheKey { +pub(crate) enum CacheKey { ImmutableFilePage { file_id: FileId, blkno: u32 }, } diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index b17bbbc6e5..a88a06410d 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -9,6 +9,7 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; use std::ops::Deref; +use std::sync::Arc; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -36,7 +37,7 @@ where /// Reference to an in-memory copy of an immutable on-disk block. pub enum BlockLease<'a> { PageReadGuard(PageReadGuard<'static>), - BufferPool(crate::buffer_pool::Buffer), + BufferPool(Arc), EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), @@ -219,6 +220,16 @@ impl FileBlockReader { } crate::context::PageContentKind::ImageLayerValue | crate::context::PageContentKind::DeltaLayerValue => { + let cache_key = page_cache::CacheKey::ImmutableFilePage { + file_id: self.file_id, + blkno: blknum, + }; + if let Some(cache) = &ctx.buf_cache { + let mut cache = cache.lock().unwrap(); + if let Some(cached) = cache.get(&cache_key) { + return Ok(BlockLease::BufferPool(Arc::clone(cached))); + }; + } let buf = crate::buffer_pool::get(); // Read the page from disk into the buffer let buf = async move { @@ -234,6 +245,10 @@ impl FileBlockReader { ) } .await?; + let buf = Arc::new(buf); + if let Some(cache) = &ctx.buf_cache { + cache.lock().unwrap().put(cache_key, Arc::clone(&buf)); + } Ok(BlockLease::BufferPool(buf)) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index f617073dcd..95f5d0eeec 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -7,6 +7,8 @@ pub mod span; pub mod uninit; mod walreceiver; +pub(crate) static REQ_LRU_SIZE: AtomicUsize = AtomicUsize::new(0); + use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; @@ -33,8 +35,6 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::sync::gate::Gate; -use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet}; -use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -43,6 +43,14 @@ use std::{ cmp::{max, min, Ordering}, ops::ControlFlow, }; +use std::{ + collections::{BTreeMap, BinaryHeap, HashMap, HashSet}, + sync::atomic::AtomicUsize, +}; +use std::{ + num::NonZeroUsize, + ops::{Deref, Range}, +}; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ @@ -2290,6 +2298,16 @@ impl Timeline { reconstruct_state: &mut ValueReconstructState, ctx: &RequestContext, ) -> Result, PageReconstructError> { + let mut ctx = RequestContextBuilder::extend(ctx).build(); + ctx.buf_cache = match REQ_LRU_SIZE.load(std::sync::atomic::Ordering::Relaxed) { + 0 => None, + x => Some(Arc::new(Mutex::new(lru::LruCache::new( + // SAFETY: we just checked for 0 above + unsafe { NonZeroUsize::new_unchecked(x) }, + )))), + }; + let ctx = &ctx; + // Start from the current timeline. let mut timeline_owned; let mut timeline = self;