mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 01:42:55 +00:00
implement request-scoped LRU cache
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2716,6 +2716,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3337,6 +3346,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper",
|
||||
"itertools",
|
||||
"lru",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -86,6 +86,7 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
lru = "0.12.2"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -284,4 +284,13 @@ impl Client {
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn set_request_lru_size(&self, size: usize) -> Result<()> {
|
||||
let uri = format!("{}/v1/set_req_lru_size", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, uri, size)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,6 +53,8 @@ pub(crate) struct Args {
|
||||
keyspace_cache: Option<Utf8PathBuf>,
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<String>,
|
||||
#[clap(long)]
|
||||
set_req_lru_size: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -109,6 +111,10 @@ async fn main_impl(
|
||||
mgmt_api_client.set_io_engine(engine_str).await?;
|
||||
}
|
||||
|
||||
if let Some(req_lru_size) = &args.set_req_lru_size {
|
||||
mgmt_api_client.set_request_lru_size(*req_lru_size).await?;
|
||||
}
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
|
||||
@@ -86,7 +86,9 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::{buffer_pool, page_cache, task_mgr::TaskKind};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -95,6 +97,8 @@ pub struct RequestContext {
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
pub(crate) buf_cache:
|
||||
Option<Arc<Mutex<lru::LruCache<page_cache::CacheKey, Arc<buffer_pool::Buffer>>>>>,
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -150,6 +154,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: DownloadBehavior::Download,
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
buf_cache: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -163,6 +168,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
buf_cache: original.buf_cache.as_ref().map(Arc::clone),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2063,5 +2063,17 @@ pub fn make_router(
|
||||
}
|
||||
api_handler(r, set_io_engine_handler)
|
||||
})
|
||||
.put("/v1/set_req_lru_size", |r| {
|
||||
async fn set_req_lru_size_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let size: usize = json_request(&mut r).await?;
|
||||
crate::tenant::timeline::REQ_LRU_SIZE
|
||||
.store(size, std::sync::atomic::Ordering::Relaxed);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
api_handler(r, set_req_lru_size_handler)
|
||||
})
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -137,9 +137,9 @@ pub fn next_file_id() -> FileId {
|
||||
///
|
||||
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
|
||||
///
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum CacheKey {
|
||||
pub(crate) enum CacheKey {
|
||||
ImmutableFilePage { file_id: FileId, blkno: u32 },
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -36,7 +37,7 @@ where
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
BufferPool(crate::buffer_pool::Buffer),
|
||||
BufferPool(Arc<crate::buffer_pool::Buffer>),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
@@ -219,6 +220,16 @@ impl FileBlockReader {
|
||||
}
|
||||
crate::context::PageContentKind::ImageLayerValue
|
||||
| crate::context::PageContentKind::DeltaLayerValue => {
|
||||
let cache_key = page_cache::CacheKey::ImmutableFilePage {
|
||||
file_id: self.file_id,
|
||||
blkno: blknum,
|
||||
};
|
||||
if let Some(cache) = &ctx.buf_cache {
|
||||
let mut cache = cache.lock().unwrap();
|
||||
if let Some(cached) = cache.get(&cache_key) {
|
||||
return Ok(BlockLease::BufferPool(Arc::clone(cached)));
|
||||
};
|
||||
}
|
||||
let buf = crate::buffer_pool::get();
|
||||
// Read the page from disk into the buffer
|
||||
let buf = async move {
|
||||
@@ -234,6 +245,10 @@ impl FileBlockReader {
|
||||
)
|
||||
}
|
||||
.await?;
|
||||
let buf = Arc::new(buf);
|
||||
if let Some(cache) = &ctx.buf_cache {
|
||||
cache.lock().unwrap().put(cache_key, Arc::clone(&buf));
|
||||
}
|
||||
Ok(BlockLease::BufferPool(buf))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,8 @@ pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
pub(crate) static REQ_LRU_SIZE: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -33,8 +35,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
@@ -43,6 +43,14 @@ use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
|
||||
sync::atomic::AtomicUsize,
|
||||
};
|
||||
use std::{
|
||||
num::NonZeroUsize,
|
||||
ops::{Deref, Range},
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
@@ -2290,6 +2298,16 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
|
||||
let mut ctx = RequestContextBuilder::extend(ctx).build();
|
||||
ctx.buf_cache = match REQ_LRU_SIZE.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
0 => None,
|
||||
x => Some(Arc::new(Mutex::new(lru::LruCache::new(
|
||||
// SAFETY: we just checked for 0 above
|
||||
unsafe { NonZeroUsize::new_unchecked(x) },
|
||||
)))),
|
||||
};
|
||||
let ctx = &ctx;
|
||||
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
|
||||
Reference in New Issue
Block a user