mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-16 20:50:37 +00:00
Compare commits
25 Commits
compress-p
...
problame/i
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3cd4f8aa59 | ||
|
|
c98215674c | ||
|
|
0e3561f6d1 | ||
|
|
28a4247c97 | ||
|
|
70bc01494c | ||
|
|
a1af2c7150 | ||
|
|
043ed5edea | ||
|
|
6753ff089c | ||
|
|
49a5e411d6 | ||
|
|
21a11822e8 | ||
|
|
aca2d7bdea | ||
|
|
db44395ee2 | ||
|
|
03874009ec | ||
|
|
0033b4c985 | ||
|
|
a608667301 | ||
|
|
b9b7670a3a | ||
|
|
62a3d87098 | ||
|
|
8d6ce71b29 | ||
|
|
d23ea718ee | ||
|
|
73a7ca38b3 | ||
|
|
7eb1d4cfa6 | ||
|
|
6ebd683327 | ||
|
|
b1ecdfe099 | ||
|
|
82a74d0e77 | ||
|
|
49b43c75e2 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2716,6 +2716,15 @@ version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "lru"
|
||||
version = "0.12.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22"
|
||||
dependencies = [
|
||||
"hashbrown 0.14.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "match_cfg"
|
||||
version = "0.1.0"
|
||||
@@ -3337,6 +3346,7 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"hyper",
|
||||
"itertools",
|
||||
"lru",
|
||||
"md5",
|
||||
"metrics",
|
||||
"nix 0.27.1",
|
||||
|
||||
@@ -86,6 +86,7 @@ enum-map.workspace = true
|
||||
enumset.workspace = true
|
||||
strum.workspace = true
|
||||
strum_macros.workspace = true
|
||||
lru = "0.12.2"
|
||||
|
||||
[dev-dependencies]
|
||||
criterion.workspace = true
|
||||
|
||||
@@ -275,4 +275,22 @@ impl Client {
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn set_io_engine(&self, engine_str: &str) -> Result<()> {
|
||||
let uri = format!("{}/v1/set_io_engine", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, uri, engine_str)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
|
||||
pub async fn set_request_lru_size(&self, size: usize) -> Result<()> {
|
||||
let uri = format!("{}/v1/set_req_lru_size", self.mgmt_api_endpoint);
|
||||
self.request(Method::PUT, uri, size)
|
||||
.await?
|
||||
.json()
|
||||
.await
|
||||
.map_err(Error::ReceiveBody)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,10 @@ pub(crate) struct Args {
|
||||
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
|
||||
#[clap(long)]
|
||||
keyspace_cache: Option<Utf8PathBuf>,
|
||||
#[clap(long)]
|
||||
set_io_engine: Option<String>,
|
||||
#[clap(long)]
|
||||
set_req_lru_size: Option<usize>,
|
||||
targets: Option<Vec<TenantTimelineId>>,
|
||||
}
|
||||
|
||||
@@ -103,6 +107,14 @@ async fn main_impl(
|
||||
args.pageserver_jwt.as_deref(),
|
||||
));
|
||||
|
||||
if let Some(engine_str) = &args.set_io_engine {
|
||||
mgmt_api_client.set_io_engine(engine_str).await?;
|
||||
}
|
||||
|
||||
if let Some(req_lru_size) = &args.set_req_lru_size {
|
||||
mgmt_api_client.set_request_lru_size(*req_lru_size).await?;
|
||||
}
|
||||
|
||||
// discover targets
|
||||
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
|
||||
&mgmt_api_client,
|
||||
|
||||
@@ -66,12 +66,12 @@ impl serde::Serialize for LatencyPercentiles {
|
||||
{
|
||||
use serde::ser::SerializeMap;
|
||||
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
|
||||
for p in LATENCY_PERCENTILES {
|
||||
for (i, p) in LATENCY_PERCENTILES.iter().enumerate() {
|
||||
ser.serialize_entry(
|
||||
&format!("p{p}"),
|
||||
&format!(
|
||||
"{}",
|
||||
&humantime::format_duration(self.latency_percentiles[0])
|
||||
&humantime::format_duration(self.latency_percentiles[i])
|
||||
),
|
||||
)?;
|
||||
}
|
||||
|
||||
91
pageserver/src/buffer_pool.rs
Normal file
91
pageserver/src/buffer_pool.rs
Normal file
@@ -0,0 +1,91 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use crate::tenant::disk_btree::PAGE_SZ;
|
||||
|
||||
#[repr(C, align(8192))]
|
||||
struct BufferContent([u8; PAGE_SZ]);
|
||||
|
||||
impl BufferContent {
|
||||
fn empty() -> Self {
|
||||
BufferContent(std::array::from_fn(|_| 0))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Buffer(Option<Box<BufferContent>>);
|
||||
|
||||
// Thread-local list of re-usable buffers.
|
||||
thread_local! {
|
||||
static POOL: RefCell<Vec<Box<BufferContent>>> = RefCell::new(Vec::new());
|
||||
}
|
||||
|
||||
pub(crate) fn get() -> Buffer {
|
||||
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
|
||||
match maybe {
|
||||
Some(buf) => Buffer(Some(buf)),
|
||||
None => Buffer(Some(Box::new(BufferContent::empty()))),
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer {
|
||||
fn drop(&mut self) {
|
||||
let buf = self.0.take().unwrap();
|
||||
POOL.with(|rc| rc.borrow_mut().push(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Buffer {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0.as_ref().unwrap().as_ref().0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for Buffer {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0.as_mut().unwrap().as_mut().0
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct PageWriteGuardBuf {
|
||||
page: Buffer,
|
||||
init_up_to: usize,
|
||||
}
|
||||
impl PageWriteGuardBuf {
|
||||
pub fn new(buf: Buffer) -> Self {
|
||||
PageWriteGuardBuf {
|
||||
page: buf,
|
||||
init_up_to: 0,
|
||||
}
|
||||
}
|
||||
pub fn assume_init(self) -> Buffer {
|
||||
assert_eq!(self.init_up_to, PAGE_SZ);
|
||||
self.page
|
||||
}
|
||||
}
|
||||
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
}
|
||||
}
|
||||
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
|
||||
// hence it's safe to hand out the `stable_mut_ptr()`.
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.page.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
@@ -86,7 +86,9 @@
|
||||
//! [`RequestContext`] argument. Functions in the middle of the call chain
|
||||
//! only need to pass it on.
|
||||
|
||||
use crate::task_mgr::TaskKind;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::{buffer_pool, page_cache, task_mgr::TaskKind};
|
||||
|
||||
// The main structure of this module, see module-level comment.
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -95,6 +97,8 @@ pub struct RequestContext {
|
||||
download_behavior: DownloadBehavior,
|
||||
access_stats_behavior: AccessStatsBehavior,
|
||||
page_content_kind: PageContentKind,
|
||||
pub(crate) buf_cache:
|
||||
Option<Arc<Mutex<lru::LruCache<page_cache::CacheKey, Arc<buffer_pool::Buffer>>>>>,
|
||||
}
|
||||
|
||||
/// The kind of access to the page cache.
|
||||
@@ -150,6 +154,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: DownloadBehavior::Download,
|
||||
access_stats_behavior: AccessStatsBehavior::Update,
|
||||
page_content_kind: PageContentKind::Unknown,
|
||||
buf_cache: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -163,6 +168,7 @@ impl RequestContextBuilder {
|
||||
download_behavior: original.download_behavior,
|
||||
access_stats_behavior: original.access_stats_behavior,
|
||||
page_content_kind: original.page_content_kind,
|
||||
buf_cache: original.buf_cache.as_ref().map(Arc::clone),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2052,5 +2052,28 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|
||||
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
|
||||
)
|
||||
.put("/v1/set_io_engine", |r| {
|
||||
async fn set_io_engine_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
|
||||
crate::virtual_file::io_engine::set(kind);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
api_handler(r, set_io_engine_handler)
|
||||
})
|
||||
.put("/v1/set_req_lru_size", |r| {
|
||||
async fn set_req_lru_size_handler(
|
||||
mut r: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let size: usize = json_request(&mut r).await?;
|
||||
crate::tenant::timeline::REQ_LRU_SIZE
|
||||
.store(size, std::sync::atomic::Ordering::Relaxed);
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
api_handler(r, set_req_lru_size_handler)
|
||||
})
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ pub mod disk_usage_eviction_task;
|
||||
pub mod http;
|
||||
pub mod import_datadir;
|
||||
pub use pageserver_api::keyspace;
|
||||
pub(crate) mod buffer_pool;
|
||||
pub mod metrics;
|
||||
pub mod page_cache;
|
||||
pub mod page_service;
|
||||
|
||||
@@ -125,14 +125,6 @@ impl ReconstructTimeMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_materialized_cache_hits_direct_total",
|
||||
"Number of cache hits from materialized page cache without redo",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_getpage_get_reconstruct_data_seconds",
|
||||
@@ -142,14 +134,6 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
register_int_counter!(
|
||||
"pageserver_materialized_cache_hits_total",
|
||||
"Number of cache hits from materialized page cache",
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) struct GetVectoredLatency {
|
||||
map: EnumMap<TaskKind, Option<Histogram>>,
|
||||
}
|
||||
@@ -188,12 +172,8 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
|
||||
});
|
||||
|
||||
pub(crate) struct PageCacheMetricsForTaskKind {
|
||||
pub read_accesses_materialized_page: IntCounter,
|
||||
pub read_accesses_immutable: IntCounter,
|
||||
|
||||
pub read_hits_immutable: IntCounter,
|
||||
pub read_hits_materialized_page_exact: IntCounter,
|
||||
pub read_hits_materialized_page_older_lsn: IntCounter,
|
||||
}
|
||||
|
||||
pub(crate) struct PageCacheMetrics {
|
||||
@@ -226,16 +206,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
|
||||
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
|
||||
let content_kind: &'static str = content_kind.into();
|
||||
PageCacheMetricsForTaskKind {
|
||||
read_accesses_materialized_page: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_accesses_immutable: {
|
||||
PAGE_CACHE_READ_ACCESSES
|
||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
|
||||
@@ -247,28 +217,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
|
||||
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_exact: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
"exact",
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
|
||||
read_hits_materialized_page_older_lsn: {
|
||||
PAGE_CACHE_READ_HITS
|
||||
.get_metric_with_label_values(&[
|
||||
task_kind,
|
||||
"materialized_page",
|
||||
content_kind,
|
||||
"older_lsn",
|
||||
])
|
||||
.unwrap()
|
||||
},
|
||||
}
|
||||
}))
|
||||
})),
|
||||
@@ -386,7 +334,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[derive(IntoStaticStr)]
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
@@ -2402,8 +2349,6 @@ pub fn preinitialize_metrics() {
|
||||
|
||||
// counters
|
||||
[
|
||||
&MATERIALIZED_PAGE_CACHE_HIT,
|
||||
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
&UNEXPECTED_ONDEMAND_DOWNLOADS,
|
||||
&WALRECEIVER_STARTED_CONNECTIONS,
|
||||
&WALRECEIVER_BROKER_UPDATES,
|
||||
|
||||
@@ -74,17 +74,13 @@
|
||||
use std::{
|
||||
collections::{hash_map::Entry, HashMap},
|
||||
convert::TryInto,
|
||||
sync::{
|
||||
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
Arc, Weak,
|
||||
},
|
||||
time::Duration,
|
||||
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use utils::{id::TimelineId, lsn::Lsn};
|
||||
use utils::id::TimelineId;
|
||||
|
||||
use crate::{
|
||||
context::RequestContext,
|
||||
@@ -137,17 +133,10 @@ pub fn next_file_id() -> FileId {
|
||||
///
|
||||
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
|
||||
///
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
enum CacheKey {
|
||||
MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey,
|
||||
lsn: Lsn,
|
||||
},
|
||||
ImmutableFilePage {
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
},
|
||||
pub(crate) enum CacheKey {
|
||||
ImmutableFilePage { file_id: FileId, blkno: u32 },
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
|
||||
@@ -163,12 +152,6 @@ struct MaterializedPageHashKey {
|
||||
key: Key,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Version {
|
||||
lsn: Lsn,
|
||||
slot_idx: usize,
|
||||
}
|
||||
|
||||
struct Slot {
|
||||
inner: tokio::sync::RwLock<SlotInner>,
|
||||
usage_count: AtomicU8,
|
||||
@@ -177,8 +160,17 @@ struct Slot {
|
||||
struct SlotInner {
|
||||
key: Option<CacheKey>,
|
||||
// for `coalesce_readers_permit`
|
||||
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
|
||||
buf: &'static mut [u8; PAGE_SZ],
|
||||
buf: &'static mut SlotContents,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
#[repr(C, align(8192))]
|
||||
struct SlotContents([u8; PAGE_SZ]);
|
||||
|
||||
impl SlotContents {
|
||||
fn empty() -> Self {
|
||||
Self(std::array::from_fn(|_| 0))
|
||||
}
|
||||
}
|
||||
|
||||
impl Slot {
|
||||
@@ -220,41 +212,12 @@ impl Slot {
|
||||
}
|
||||
}
|
||||
|
||||
impl SlotInner {
|
||||
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
|
||||
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
|
||||
let mut guard = self.permit.lock().unwrap();
|
||||
if let Some(existing_permit) = guard.upgrade() {
|
||||
drop(guard);
|
||||
drop(permit);
|
||||
existing_permit
|
||||
} else {
|
||||
let permit = Arc::new(permit);
|
||||
*guard = Arc::downgrade(&permit);
|
||||
permit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PageCache {
|
||||
/// This contains the mapping from the cache key to buffer slot that currently
|
||||
/// contains the page, if any.
|
||||
///
|
||||
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
|
||||
/// this HashMap can be replaced with a more concurrent version, there are
|
||||
/// plenty of such crates around.
|
||||
///
|
||||
/// If you add support for caching different kinds of objects, each object kind
|
||||
/// can have a separate mapping map, next to this field.
|
||||
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
|
||||
|
||||
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
|
||||
|
||||
/// The actual buffers with their metadata.
|
||||
slots: Box<[Slot]>,
|
||||
|
||||
pinned_slots: Arc<tokio::sync::Semaphore>,
|
||||
|
||||
/// Index of the next candidate to evict, for the Clock replacement algorithm.
|
||||
/// This is interpreted modulo the page cache size.
|
||||
next_evict_slot: AtomicUsize,
|
||||
@@ -262,14 +225,11 @@ pub struct PageCache {
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
|
||||
///
|
||||
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
|
||||
/// until the guard is dropped.
|
||||
///
|
||||
pub struct PageReadGuard<'i> {
|
||||
_permit: Arc<PinnedSlotsPermit>,
|
||||
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
|
||||
}
|
||||
|
||||
@@ -277,13 +237,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
|
||||
type Target = [u8; PAGE_SZ];
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.slot_guard.buf
|
||||
&self.slot_guard.buf.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
|
||||
fn as_ref(&self) -> &[u8; PAGE_SZ] {
|
||||
self.slot_guard.buf
|
||||
&self.slot_guard.buf.0
|
||||
}
|
||||
}
|
||||
|
||||
@@ -301,7 +261,6 @@ pub struct PageWriteGuard<'i> {
|
||||
enum PageWriteGuardState<'i> {
|
||||
Invalid {
|
||||
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
|
||||
_permit: PinnedSlotsPermit,
|
||||
},
|
||||
Downgraded,
|
||||
}
|
||||
@@ -309,7 +268,7 @@ enum PageWriteGuardState<'i> {
|
||||
impl std::ops::DerefMut for PageWriteGuard<'_> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Invalid { inner } => &mut inner.buf.0,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -320,7 +279,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match &self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
|
||||
PageWriteGuardState::Invalid { inner } => &inner.buf.0,
|
||||
PageWriteGuardState::Downgraded => unreachable!(),
|
||||
}
|
||||
}
|
||||
@@ -332,10 +291,9 @@ impl<'a> PageWriteGuard<'a> {
|
||||
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
|
||||
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
|
||||
match prev {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
PageWriteGuardState::Invalid { inner } => {
|
||||
assert!(inner.key.is_some());
|
||||
PageReadGuard {
|
||||
_permit: Arc::new(_permit),
|
||||
slot_guard: inner.downgrade(),
|
||||
}
|
||||
}
|
||||
@@ -352,7 +310,7 @@ impl Drop for PageWriteGuard<'_> {
|
||||
///
|
||||
fn drop(&mut self) {
|
||||
match &mut self.state {
|
||||
PageWriteGuardState::Invalid { inner, _permit } => {
|
||||
PageWriteGuardState::Invalid { inner } => {
|
||||
assert!(inner.key.is_some());
|
||||
let self_key = inner.key.as_ref().unwrap();
|
||||
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
|
||||
@@ -370,166 +328,6 @@ pub enum ReadBufResult<'a> {
|
||||
}
|
||||
|
||||
impl PageCache {
|
||||
//
|
||||
// Section 1.1: Public interface functions for looking up and memorizing materialized page
|
||||
// versions in the page cache
|
||||
//
|
||||
|
||||
/// Look up a materialized page version.
|
||||
///
|
||||
/// The 'lsn' is an upper bound, this will return the latest version of
|
||||
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
|
||||
/// returned page.
|
||||
pub async fn lookup_materialized_page(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, PageReadGuard)> {
|
||||
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
|
||||
return None;
|
||||
};
|
||||
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_accesses_materialized_page
|
||||
.inc();
|
||||
|
||||
let mut cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key: *key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
if let Some(guard) = self
|
||||
.try_lock_for_read(&mut cache_key, &mut Some(permit))
|
||||
.await
|
||||
{
|
||||
if let CacheKey::MaterializedPage {
|
||||
hash_key: _,
|
||||
lsn: available_lsn,
|
||||
} = cache_key
|
||||
{
|
||||
if available_lsn == lsn {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_hits_materialized_page_exact
|
||||
.inc();
|
||||
} else {
|
||||
crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
.read_hits_materialized_page_older_lsn
|
||||
.inc();
|
||||
}
|
||||
Some((available_lsn, guard))
|
||||
} else {
|
||||
panic!("unexpected key type in slot");
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
img: &[u8],
|
||||
) -> anyhow::Result<()> {
|
||||
let cache_key = CacheKey::MaterializedPage {
|
||||
hash_key: MaterializedPageHashKey {
|
||||
tenant_shard_id,
|
||||
timeline_id,
|
||||
key,
|
||||
},
|
||||
lsn,
|
||||
};
|
||||
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
// that it's still what we expected (because we don't released the mapping
|
||||
// lock already, another thread could have evicted the page)
|
||||
let slot = &self.slots[slot_idx];
|
||||
let inner = slot.inner.write().await;
|
||||
if inner.key.as_ref() == Some(&cache_key) {
|
||||
slot.inc_usage_count();
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
debug_assert_eq!(inner.buf.len(), img.len());
|
||||
// We already had it in cache. Another thread must've put it there
|
||||
// concurrently. Check that it had the same contents that we
|
||||
// replayed.
|
||||
assert!(inner.buf == img);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
// Insert mapping for this. At this point, we may find that another
|
||||
// thread did the same thing concurrently. In that case, we evicted
|
||||
// our victim buffer unnecessarily. Put it into the free list and
|
||||
// continue with the slot that the other thread chose.
|
||||
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
|
||||
// TODO: put to free list
|
||||
|
||||
// We now just loop back to start from beginning. This is not
|
||||
// optimal, we'll perform the lookup in the mapping again, which
|
||||
// is not really necessary because we already got
|
||||
// 'existing_slot_idx'. But this shouldn't happen often enough
|
||||
// to matter much.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make the slot ready
|
||||
let slot = &self.slots[slot_idx];
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
// Create a write guard for the slot so we go through the expected motions.
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
let mut write_guard = PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
};
|
||||
write_guard.copy_from_slice(img);
|
||||
let _ = write_guard.mark_valid();
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
file_id: FileId,
|
||||
@@ -549,27 +347,6 @@ impl PageCache {
|
||||
// "mappings" after this section. But the routines in this section should
|
||||
// not require changes.
|
||||
|
||||
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
|
||||
match tokio::time::timeout(
|
||||
// Choose small timeout, neon_smgr does its own retries.
|
||||
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
|
||||
Duration::from_secs(10),
|
||||
Arc::clone(&self.pinned_slots).acquire_owned(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(res) => Ok(PinnedSlotsPermit(
|
||||
res.expect("this semaphore is never closed"),
|
||||
)),
|
||||
Err(_timeout) => {
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
|
||||
);
|
||||
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up a page in the cache.
|
||||
///
|
||||
/// If the search criteria is not exact, *cache_key is updated with the key
|
||||
@@ -579,11 +356,7 @@ impl PageCache {
|
||||
///
|
||||
/// If no page is found, returns None and *cache_key is left unmodified.
|
||||
///
|
||||
async fn try_lock_for_read(
|
||||
&self,
|
||||
cache_key: &mut CacheKey,
|
||||
permit: &mut Option<PinnedSlotsPermit>,
|
||||
) -> Option<PageReadGuard> {
|
||||
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
|
||||
let cache_key_orig = cache_key.clone();
|
||||
if let Some(slot_idx) = self.search_mapping(cache_key) {
|
||||
// The page was found in the mapping. Lock the slot, and re-check
|
||||
@@ -593,10 +366,7 @@ impl PageCache {
|
||||
let inner = slot.inner.read().await;
|
||||
if inner.key.as_ref() == Some(cache_key) {
|
||||
slot.inc_usage_count();
|
||||
return Some(PageReadGuard {
|
||||
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
|
||||
slot_guard: inner,
|
||||
});
|
||||
return Some(PageReadGuard { slot_guard: inner });
|
||||
} else {
|
||||
// search_mapping might have modified the search key; restore it.
|
||||
*cache_key = cache_key_orig;
|
||||
@@ -639,12 +409,7 @@ impl PageCache {
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
|
||||
|
||||
let (read_access, hit) = match cache_key {
|
||||
CacheKey::MaterializedPage { .. } => {
|
||||
unreachable!("Materialized pages use lookup_materialized_page")
|
||||
}
|
||||
CacheKey::ImmutableFilePage { .. } => (
|
||||
&crate::metrics::PAGE_CACHE
|
||||
.for_ctx(ctx)
|
||||
@@ -657,19 +422,17 @@ impl PageCache {
|
||||
let mut is_first_iteration = true;
|
||||
loop {
|
||||
// First check if the key already exists in the cache.
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
|
||||
debug_assert!(permit.is_none());
|
||||
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
|
||||
if is_first_iteration {
|
||||
hit.inc();
|
||||
}
|
||||
return Ok(ReadBufResult::Found(read_guard));
|
||||
}
|
||||
debug_assert!(permit.is_some());
|
||||
is_first_iteration = false;
|
||||
|
||||
// Not found. Find a victim buffer
|
||||
let (slot_idx, mut inner) = self
|
||||
.find_victim(permit.as_ref().unwrap())
|
||||
.find_victim()
|
||||
.await
|
||||
.context("Failed to find evict victim")?;
|
||||
|
||||
@@ -693,19 +456,8 @@ impl PageCache {
|
||||
inner.key = Some(cache_key.clone());
|
||||
slot.set_usage_count(1);
|
||||
|
||||
debug_assert!(
|
||||
{
|
||||
let guard = inner.permit.lock().unwrap();
|
||||
guard.upgrade().is_none()
|
||||
},
|
||||
"we hold a write lock, so, no one else should have a permit"
|
||||
);
|
||||
|
||||
return Ok(ReadBufResult::NotFound(PageWriteGuard {
|
||||
state: PageWriteGuardState::Invalid {
|
||||
_permit: permit.take().unwrap(),
|
||||
inner,
|
||||
},
|
||||
state: PageWriteGuardState::Invalid { inner },
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -726,42 +478,6 @@ impl PageCache {
|
||||
///
|
||||
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
|
||||
match cache_key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => version_idx,
|
||||
Err(0) => return None,
|
||||
Err(version_idx) => version_idx - 1,
|
||||
};
|
||||
let version = &versions[version_idx];
|
||||
*lsn = version.lsn;
|
||||
Some(version.slot_idx)
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Search for a page in the cache using the given search key.
|
||||
///
|
||||
/// Like 'search_mapping, but performs an "exact" search. Used for
|
||||
/// allocating a new buffer.
|
||||
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
|
||||
match key {
|
||||
CacheKey::MaterializedPage { hash_key, lsn } => {
|
||||
let map = self.materialized_page_map.read().unwrap();
|
||||
let versions = map.get(hash_key)?;
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
|
||||
Some(versions[version_idx].slot_idx)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let map = self.immutable_page_map.read().unwrap();
|
||||
Some(*map.get(&(*file_id, *blkno))?)
|
||||
@@ -774,27 +490,6 @@ impl PageCache {
|
||||
///
|
||||
fn remove_mapping(&self, old_key: &CacheKey) {
|
||||
match old_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: old_hash_key,
|
||||
lsn: old_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
|
||||
let versions = old_entry.get_mut();
|
||||
|
||||
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
|
||||
versions.remove(version_idx);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.sub_page_sz(1);
|
||||
if versions.is_empty() {
|
||||
old_entry.remove_entry();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
panic!("could not find old key in mapping")
|
||||
}
|
||||
}
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
map.remove(&(*file_id, *blkno))
|
||||
@@ -811,30 +506,6 @@ impl PageCache {
|
||||
/// of the existing mapping and leaves it untouched.
|
||||
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
|
||||
match new_key {
|
||||
CacheKey::MaterializedPage {
|
||||
hash_key: new_key,
|
||||
lsn: new_lsn,
|
||||
} => {
|
||||
let mut map = self.materialized_page_map.write().unwrap();
|
||||
let versions = map.entry(new_key.clone()).or_default();
|
||||
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
|
||||
Ok(version_idx) => Some(versions[version_idx].slot_idx),
|
||||
Err(version_idx) => {
|
||||
versions.insert(
|
||||
version_idx,
|
||||
Version {
|
||||
lsn: *new_lsn,
|
||||
slot_idx,
|
||||
},
|
||||
);
|
||||
self.size_metrics
|
||||
.current_bytes_materialized_page
|
||||
.add_page_sz(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CacheKey::ImmutableFilePage { file_id, blkno } => {
|
||||
let mut map = self.immutable_page_map.write().unwrap();
|
||||
match map.entry((*file_id, *blkno)) {
|
||||
@@ -858,7 +529,6 @@ impl PageCache {
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let mut iters = 0;
|
||||
@@ -940,39 +610,29 @@ impl PageCache {
|
||||
fn new(num_pages: usize) -> Self {
|
||||
assert!(num_pages > 0, "page cache size must be > 0");
|
||||
|
||||
// We could use Vec::leak here, but that potentially also leaks
|
||||
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
|
||||
// this is avoided.
|
||||
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
|
||||
let slot_contents = Box::leak(vec![SlotContents::empty(); num_pages].into_boxed_slice());
|
||||
|
||||
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
|
||||
size_metrics.max_bytes.set_page_sz(num_pages);
|
||||
size_metrics.current_bytes_immutable.set_page_sz(0);
|
||||
size_metrics.current_bytes_materialized_page.set_page_sz(0);
|
||||
|
||||
let slots = page_buffer
|
||||
.chunks_exact_mut(PAGE_SZ)
|
||||
.map(|chunk| {
|
||||
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
|
||||
|
||||
Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf,
|
||||
permit: std::sync::Mutex::new(Weak::new()),
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
}
|
||||
let slots = slot_contents
|
||||
.into_iter()
|
||||
.map(|slot_contents| Slot {
|
||||
inner: tokio::sync::RwLock::new(SlotInner {
|
||||
key: None,
|
||||
buf: slot_contents,
|
||||
}),
|
||||
usage_count: AtomicU8::new(0),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Self {
|
||||
materialized_page_map: Default::default(),
|
||||
immutable_page_map: Default::default(),
|
||||
slots,
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,29 +104,29 @@ use crate::shutdown_pageserver;
|
||||
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
|
||||
// happen, but still.
|
||||
//
|
||||
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("compute request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create compute request runtime")
|
||||
});
|
||||
// pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
// tokio::runtime::Builder::new_multi_thread()
|
||||
// .thread_name("compute request worker")
|
||||
// .enable_all()
|
||||
// .build()
|
||||
// .expect("Failed to create compute request runtime")
|
||||
// });
|
||||
|
||||
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("mgmt request worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create mgmt request runtime")
|
||||
});
|
||||
// pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
// tokio::runtime::Builder::new_multi_thread()
|
||||
// .thread_name("mgmt request worker")
|
||||
// .enable_all()
|
||||
// .build()
|
||||
// .expect("Failed to create mgmt request runtime")
|
||||
// });
|
||||
|
||||
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
.thread_name("walreceiver worker")
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("Failed to create walreceiver runtime")
|
||||
});
|
||||
// pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
// tokio::runtime::Builder::new_multi_thread()
|
||||
// .thread_name("walreceiver worker")
|
||||
// .enable_all()
|
||||
// .build()
|
||||
// .expect("Failed to create walreceiver runtime")
|
||||
// });
|
||||
|
||||
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
@@ -150,6 +150,10 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
|
||||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
|
||||
});
|
||||
|
||||
pub static COMPUTE_REQUEST_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
|
||||
pub static MGMT_REQUEST_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
|
||||
pub static WALRECEIVER_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct PageserverTaskId(u64);
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ impl<'a> BlockCursor<'a> {
|
||||
offset: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<u8>, std::io::Error> {
|
||||
// TODO: used pooled allocation instead, used by ImageLayer::get_value_reconstruct_data
|
||||
let mut buf = Vec::new();
|
||||
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
|
||||
Ok(buf)
|
||||
|
||||
@@ -5,10 +5,11 @@
|
||||
use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use bytes::Bytes;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
|
||||
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
|
||||
/// blocks, using the page cache
|
||||
@@ -36,6 +37,7 @@ where
|
||||
/// Reference to an in-memory copy of an immutable on-disk block.
|
||||
pub enum BlockLease<'a> {
|
||||
PageReadGuard(PageReadGuard<'static>),
|
||||
BufferPool(Arc<crate::buffer_pool::Buffer>),
|
||||
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
|
||||
#[cfg(test)]
|
||||
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
|
||||
@@ -62,6 +64,7 @@ impl<'a> Deref for BlockLease<'a> {
|
||||
fn deref(&self) -> &Self::Target {
|
||||
match self {
|
||||
BlockLease::PageReadGuard(v) => v.deref(),
|
||||
BlockLease::BufferPool(buf) => buf.deref(),
|
||||
BlockLease::EphemeralFileMutableTail(v) => v,
|
||||
#[cfg(test)]
|
||||
BlockLease::Arc(v) => v.deref(),
|
||||
@@ -174,17 +177,6 @@ impl FileBlockReader {
|
||||
FileBlockReader { file_id, file }
|
||||
}
|
||||
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(
|
||||
&self,
|
||||
buf: PageWriteGuard<'static>,
|
||||
blkno: u32,
|
||||
) -> Result<PageWriteGuard<'static>, std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
/// Returns a "lease" object that can be used to
|
||||
@@ -195,21 +187,69 @@ impl FileBlockReader {
|
||||
blknum: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<BlockLease, std::io::Error> {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
match ctx.page_content_kind() {
|
||||
crate::context::PageContentKind::InMemoryLayer => {
|
||||
unreachable!("this happens in inmemory_layer.rs")
|
||||
}
|
||||
crate::context::PageContentKind::Unknown
|
||||
| crate::context::PageContentKind::DeltaLayerBtreeNode
|
||||
| crate::context::PageContentKind::ImageLayerBtreeNode => {
|
||||
let cache = page_cache::get();
|
||||
match cache
|
||||
.read_immutable_buf(self.file_id, blknum, ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Failed to read immutable buf: {e:#}"),
|
||||
)
|
||||
})? {
|
||||
ReadBufResult::Found(guard) => Ok(guard.into()),
|
||||
ReadBufResult::NotFound(write_guard) => {
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = async move {
|
||||
assert!(write_guard.len() == PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
.await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
}
|
||||
}
|
||||
}
|
||||
crate::context::PageContentKind::ImageLayerValue
|
||||
| crate::context::PageContentKind::DeltaLayerValue => {
|
||||
let cache_key = page_cache::CacheKey::ImmutableFilePage {
|
||||
file_id: self.file_id,
|
||||
blkno: blknum,
|
||||
};
|
||||
if let Some(cache) = &ctx.buf_cache {
|
||||
let mut cache = cache.lock().unwrap();
|
||||
if let Some(cached) = cache.get(&cache_key) {
|
||||
return Ok(BlockLease::BufferPool(Arc::clone(cached)));
|
||||
};
|
||||
}
|
||||
let buf = crate::buffer_pool::get();
|
||||
// Read the page from disk into the buffer
|
||||
let write_guard = self.fill_buffer(write_guard, blknum).await?;
|
||||
Ok(write_guard.mark_valid().into())
|
||||
let buf = async move {
|
||||
assert_eq!(buf.len(), PAGE_SZ);
|
||||
std::io::Result::Ok(
|
||||
self.file
|
||||
.read_exact_at(
|
||||
crate::buffer_pool::PageWriteGuardBuf::new(buf),
|
||||
blknum as u64 * PAGE_SZ as u64,
|
||||
)
|
||||
.await?
|
||||
.assume_init(),
|
||||
)
|
||||
}
|
||||
.await?;
|
||||
let buf = Arc::new(buf);
|
||||
if let Some(cache) = &ctx.buf_cache {
|
||||
cache.lock().unwrap().put(cache_key, Arc::clone(&buf));
|
||||
}
|
||||
Ok(BlockLease::BufferPool(buf))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,7 @@ where
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
pub(crate) scratch: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Return value from [`Layer::get_value_reconstruct_data`]
|
||||
|
||||
@@ -688,7 +688,14 @@ impl DeltaLayerInner {
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
let file = match VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.custom_flags(nix::libc::O_DIRECT),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
@@ -771,15 +778,17 @@ impl DeltaLayerInner {
|
||||
|
||||
// Ok, 'offsets' now contains the offsets of all the entries we need to read
|
||||
let cursor = file.block_cursor();
|
||||
let mut buf = Vec::new();
|
||||
for (entry_lsn, pos) in offsets {
|
||||
cursor
|
||||
.read_blob_into_buf(pos, &mut buf, ctx)
|
||||
.read_blob_into_buf(pos, &mut reconstruct_state.scratch, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to read blob from virtual file {}", file.file.path)
|
||||
})?;
|
||||
let val = Value::des(&buf).with_context(|| {
|
||||
// TODO: this one is super costly, it's allocating a Vec<> for the inner Bytes every time.
|
||||
// That's on avg 200 allocations.
|
||||
// Can we re-use the Vec from a buffer pool?
|
||||
let val = Value::des(&reconstruct_state.scratch).with_context(|| {
|
||||
format!(
|
||||
"Failed to deserialize file blob from virtual file {}",
|
||||
file.file.path
|
||||
|
||||
@@ -367,7 +367,14 @@ impl ImageLayerInner {
|
||||
summary: Option<Summary>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
|
||||
let file = match VirtualFile::open(path).await {
|
||||
let file = match VirtualFile::open_with_options(
|
||||
path,
|
||||
virtual_file::OpenOptions::new()
|
||||
.read(true)
|
||||
.custom_flags(nix::libc::O_DIRECT),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(file) => file,
|
||||
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
|
||||
};
|
||||
|
||||
@@ -199,7 +199,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
|
||||
|
||||
// Perhaps we did no work and the walredo process has been idle for some time:
|
||||
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10);
|
||||
tenant.walredo_mgr.maybe_quiesce(period * 10); // TODO: broken with compaction_period 0
|
||||
|
||||
// Sleep
|
||||
if tokio::time::timeout(sleep_duration, cancel.cancelled())
|
||||
|
||||
@@ -7,6 +7,8 @@ pub mod span;
|
||||
pub mod uninit;
|
||||
mod walreceiver;
|
||||
|
||||
pub(crate) static REQ_LRU_SIZE: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
use anyhow::{anyhow, bail, ensure, Context, Result};
|
||||
use bytes::Bytes;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
@@ -33,8 +35,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::*;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::pin::pin;
|
||||
use std::sync::atomic::Ordering as AtomicOrdering;
|
||||
use std::sync::{Arc, Mutex, RwLock, Weak};
|
||||
@@ -43,6 +43,14 @@ use std::{
|
||||
cmp::{max, min, Ordering},
|
||||
ops::ControlFlow,
|
||||
};
|
||||
use std::{
|
||||
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
|
||||
sync::atomic::AtomicUsize,
|
||||
};
|
||||
use std::{
|
||||
num::NonZeroUsize,
|
||||
ops::{Deref, Range},
|
||||
};
|
||||
|
||||
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
|
||||
use crate::tenant::{
|
||||
@@ -70,9 +78,7 @@ use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKin
|
||||
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
|
||||
use crate::metrics::{
|
||||
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
|
||||
};
|
||||
use crate::metrics::TimelineMetrics;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
use crate::tenant::config::TenantConfOpt;
|
||||
use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
|
||||
@@ -592,30 +598,10 @@ impl Timeline {
|
||||
ctx.task_kind()
|
||||
);
|
||||
|
||||
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
|
||||
// The cached image can be returned directly if there is no WAL between the cached image
|
||||
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
|
||||
// for redo.
|
||||
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
|
||||
Some((cached_lsn, cached_img)) => {
|
||||
match cached_lsn.cmp(&lsn) {
|
||||
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
|
||||
Ordering::Equal => {
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
|
||||
return Ok(cached_img); // exact LSN match, return the image
|
||||
}
|
||||
Ordering::Greater => {
|
||||
unreachable!("the returned lsn should never be after the requested lsn")
|
||||
}
|
||||
}
|
||||
Some((cached_lsn, cached_img))
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
let mut reconstruct_state = ValueReconstructState {
|
||||
records: Vec::new(),
|
||||
img: cached_page_img,
|
||||
img: None,
|
||||
scratch: Vec::with_capacity(2 * 8192), // for good measure
|
||||
};
|
||||
|
||||
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
|
||||
@@ -2313,6 +2299,16 @@ impl Timeline {
|
||||
reconstruct_state: &mut ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
|
||||
let mut ctx = RequestContextBuilder::extend(ctx).build();
|
||||
ctx.buf_cache = match REQ_LRU_SIZE.load(std::sync::atomic::Ordering::Relaxed) {
|
||||
0 => None,
|
||||
x => Some(Arc::new(Mutex::new(lru::LruCache::new(
|
||||
// SAFETY: we just checked for 0 above
|
||||
unsafe { NonZeroUsize::new_unchecked(x) },
|
||||
)))),
|
||||
};
|
||||
let ctx = &ctx;
|
||||
|
||||
// Start from the current timeline.
|
||||
let mut timeline_owned;
|
||||
let mut timeline = self;
|
||||
@@ -2351,7 +2347,6 @@ impl Timeline {
|
||||
ValueReconstructResult::Continue => {
|
||||
// If we reached an earlier cached page image, we're done.
|
||||
if cont_lsn == cached_lsn + 1 {
|
||||
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
|
||||
return Ok(traversal_path);
|
||||
}
|
||||
if prev_lsn <= cont_lsn {
|
||||
@@ -2555,26 +2550,6 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
/// # Cancel-safety
|
||||
///
|
||||
/// This method is cancellation-safe.
|
||||
async fn lookup_cached_page(
|
||||
&self,
|
||||
key: &Key,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Option<(Lsn, Bytes)> {
|
||||
let cache = page_cache::get();
|
||||
|
||||
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
|
||||
// We should look at the key to determine if it's a cacheable object
|
||||
let (lsn, read_guard) = cache
|
||||
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
|
||||
.await?;
|
||||
let img = Bytes::from(read_guard.to_vec());
|
||||
Some((lsn, img))
|
||||
}
|
||||
|
||||
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
|
||||
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
|
||||
format!(
|
||||
@@ -4398,8 +4373,6 @@ impl Timeline {
|
||||
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
|
||||
};
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
|
||||
@@ -4410,23 +4383,6 @@ impl Timeline {
|
||||
Err(e) => return Err(PageReconstructError::WalRedo(e)),
|
||||
};
|
||||
|
||||
if img.len() == page_cache::PAGE_SZ {
|
||||
let cache = page_cache::get();
|
||||
if let Err(e) = cache
|
||||
.memorize_materialized_page(
|
||||
self.tenant_shard_id,
|
||||
self.timeline_id,
|
||||
key,
|
||||
last_rec_lsn,
|
||||
&img,
|
||||
)
|
||||
.await
|
||||
.context("Materialized page memoization failed")
|
||||
{
|
||||
return Err(PageReconstructError::from(e));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
mod io_engine;
|
||||
pub(crate) mod io_engine;
|
||||
mod open_options;
|
||||
pub use io_engine::IoEngineKind;
|
||||
pub(crate) use open_options::*;
|
||||
|
||||
@@ -26,23 +26,31 @@ pub enum IoEngineKind {
|
||||
TokioEpollUring,
|
||||
}
|
||||
|
||||
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
|
||||
static IO_ENGINE: std::sync::RwLock<Option<IoEngineKind>> = std::sync::RwLock::new(None);
|
||||
|
||||
pub(crate) fn set(engine: IoEngineKind) {
|
||||
let mut guard = IO_ENGINE.write().unwrap();
|
||||
*guard = Some(engine);
|
||||
let metric = &crate::metrics::virtual_file_io_engine::KIND;
|
||||
metric.reset();
|
||||
metric.with_label_values(&[&format!("{engine}")]).set(1);
|
||||
drop(guard);
|
||||
}
|
||||
|
||||
#[cfg(not(test))]
|
||||
pub(super) fn init(engine: IoEngineKind) {
|
||||
if IO_ENGINE.set(engine).is_err() {
|
||||
panic!("called twice");
|
||||
}
|
||||
crate::metrics::virtual_file_io_engine::KIND
|
||||
.with_label_values(&[&format!("{engine}")])
|
||||
.set(1);
|
||||
set(engine);
|
||||
}
|
||||
|
||||
pub(super) fn get() -> &'static IoEngineKind {
|
||||
pub(super) fn get() -> IoEngineKind {
|
||||
#[cfg(test)]
|
||||
{
|
||||
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
|
||||
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
|
||||
let guard = IO_ENGINE.read().unwrap();
|
||||
if let Some(v) = guard.is_some() {
|
||||
return v;
|
||||
}
|
||||
*guard = Some(match std::env::var(env_var_name) {
|
||||
Ok(v) => match v.parse::<IoEngineKind>() {
|
||||
Ok(engine_kind) => engine_kind,
|
||||
Err(e) => {
|
||||
@@ -57,10 +65,13 @@ pub(super) fn get() -> &'static IoEngineKind {
|
||||
Err(std::env::VarError::NotUnicode(_)) => {
|
||||
panic!("env var {env_var_name} is not unicode");
|
||||
}
|
||||
})
|
||||
});
|
||||
}
|
||||
#[cfg(not(test))]
|
||||
IO_ENGINE.get().unwrap()
|
||||
IO_ENGINE
|
||||
.read()
|
||||
.unwrap()
|
||||
.expect("should have called set() or init() before")
|
||||
}
|
||||
|
||||
use std::os::unix::prelude::FileExt;
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
|
||||
|
||||
use nix::libc;
|
||||
|
||||
use super::IoEngineKind;
|
||||
use std::{os::fd::OwnedFd, path::Path};
|
||||
use std::{
|
||||
os::{fd::OwnedFd, unix::fs::OpenOptionsExt},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OpenOptions {
|
||||
@@ -92,6 +97,18 @@ impl OpenOptions {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn custom_flags(&mut self, custom_flags: libc::c_int) -> &mut OpenOptions {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.custom_flags(custom_flags);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(custom_flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
use anyhow::Context;
|
||||
use byteorder::{ByteOrder, LittleEndian};
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use nix::poll::*;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use serde::Serialize;
|
||||
use std::collections::VecDeque;
|
||||
@@ -31,10 +30,11 @@ use std::ops::{Deref, DerefMut};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::os::unix::prelude::CommandExt;
|
||||
use std::process::Stdio;
|
||||
use std::process::{Child, ChildStdin, ChildStdout, Command};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
|
||||
use std::process::{Child, Command};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tracing::*;
|
||||
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
|
||||
|
||||
@@ -73,12 +73,12 @@ pub(crate) struct BufferTag {
|
||||
}
|
||||
|
||||
struct ProcessInput {
|
||||
stdin: ChildStdin,
|
||||
stdin: tokio::process::ChildStdin,
|
||||
n_requests: usize,
|
||||
}
|
||||
|
||||
struct ProcessOutput {
|
||||
stdout: ChildStdout,
|
||||
stdout: tokio::process::ChildStdout,
|
||||
pending_responses: VecDeque<Option<Bytes>>,
|
||||
n_processed_responses: usize,
|
||||
}
|
||||
@@ -112,6 +112,8 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
|
||||
}
|
||||
}
|
||||
|
||||
mod writebuf_pool;
|
||||
|
||||
///
|
||||
/// Public interface of WAL redo manager
|
||||
///
|
||||
@@ -157,6 +159,7 @@ impl PostgresRedoManager {
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
};
|
||||
img = Some(result?);
|
||||
|
||||
@@ -177,6 +180,7 @@ impl PostgresRedoManager {
|
||||
self.conf.wal_redo_timeout,
|
||||
pg_version,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -217,7 +221,7 @@ impl PostgresRedoManager {
|
||||
/// Process one request for WAL redo using wal-redo postgres
|
||||
///
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn apply_batch_postgres(
|
||||
async fn apply_batch_postgres(
|
||||
&self,
|
||||
key: Key,
|
||||
lsn: Lsn,
|
||||
@@ -270,6 +274,7 @@ impl PostgresRedoManager {
|
||||
let buf_tag = BufferTag { rel, blknum };
|
||||
let result = proc
|
||||
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
|
||||
.await
|
||||
.context("apply_wal_records");
|
||||
|
||||
let duration = started_at.elapsed();
|
||||
@@ -647,8 +652,8 @@ struct WalRedoProcess {
|
||||
tenant_shard_id: TenantShardId,
|
||||
// Some() on construction, only becomes None on Drop.
|
||||
child: Option<NoLeakChild>,
|
||||
stdout: Mutex<ProcessOutput>,
|
||||
stdin: Mutex<ProcessInput>,
|
||||
stdout: tokio::sync::Mutex<ProcessOutput>,
|
||||
stdin: tokio::sync::Mutex<ProcessInput>,
|
||||
/// Counter to separate same sized walredo inputs failing at the same millisecond.
|
||||
#[cfg(feature = "testing")]
|
||||
dump_sequence: AtomicUsize,
|
||||
@@ -754,12 +759,12 @@ impl WalRedoProcess {
|
||||
conf,
|
||||
tenant_shard_id,
|
||||
child: Some(child),
|
||||
stdin: Mutex::new(ProcessInput {
|
||||
stdin,
|
||||
stdin: tokio::sync::Mutex::new(ProcessInput {
|
||||
stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling
|
||||
n_requests: 0,
|
||||
}),
|
||||
stdout: Mutex::new(ProcessOutput {
|
||||
stdout,
|
||||
stdout: tokio::sync::Mutex::new(ProcessOutput {
|
||||
stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling
|
||||
pending_responses: VecDeque::new(),
|
||||
n_processed_responses: 0,
|
||||
}),
|
||||
@@ -779,15 +784,13 @@ impl WalRedoProcess {
|
||||
// new page image.
|
||||
//
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
|
||||
fn apply_wal_records(
|
||||
async fn apply_wal_records(
|
||||
&self,
|
||||
tag: BufferTag,
|
||||
base_img: &Option<Bytes>,
|
||||
records: &[(Lsn, NeonWalRecord)],
|
||||
wal_redo_timeout: Duration,
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let input = self.stdin.lock().unwrap();
|
||||
|
||||
// Serialize all the messages to send the WAL redo process first.
|
||||
//
|
||||
// This could be problematic if there are millions of records to replay,
|
||||
@@ -797,7 +800,8 @@ impl WalRedoProcess {
|
||||
// Most requests start with a before-image with BLCKSZ bytes, followed by
|
||||
// by some other WAL records. Start with a buffer that can hold that
|
||||
// comfortably.
|
||||
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
|
||||
// TODO replace with allocation pool
|
||||
let mut writebuf: writebuf_pool::PooledVecU8 = writebuf_pool::get();
|
||||
build_begin_redo_for_block_msg(tag, &mut writebuf);
|
||||
if let Some(img) = base_img {
|
||||
build_push_page_msg(tag, img, &mut writebuf);
|
||||
@@ -816,7 +820,7 @@ impl WalRedoProcess {
|
||||
build_get_page_msg(tag, &mut writebuf);
|
||||
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
|
||||
|
||||
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
|
||||
let res = self.apply_wal_records0(&writebuf, wal_redo_timeout).await;
|
||||
|
||||
if res.is_err() {
|
||||
// not all of these can be caused by this particular input, however these are so rare
|
||||
@@ -827,38 +831,17 @@ impl WalRedoProcess {
|
||||
res
|
||||
}
|
||||
|
||||
fn apply_wal_records0(
|
||||
async fn apply_wal_records0(
|
||||
&self,
|
||||
writebuf: &[u8],
|
||||
input: MutexGuard<ProcessInput>,
|
||||
wal_redo_timeout: Duration,
|
||||
_wal_redo_timeout: Duration, // TODO respect
|
||||
) -> anyhow::Result<Bytes> {
|
||||
let input = self.stdin.lock().await;
|
||||
|
||||
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
|
||||
let mut nwrite = 0usize;
|
||||
|
||||
while nwrite < writebuf.len() {
|
||||
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
|
||||
let n = loop {
|
||||
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
proc.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling
|
||||
|
||||
if n == 0 {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If 'stdin' is writeable, do write.
|
||||
let in_revents = stdin_pollfds[0].revents().unwrap();
|
||||
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
|
||||
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
|
||||
}
|
||||
if in_revents.contains(PollFlags::POLLHUP) {
|
||||
// We still have more data to write, but the process closed the pipe.
|
||||
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
|
||||
}
|
||||
}
|
||||
let request_no = proc.n_requests;
|
||||
proc.n_requests += 1;
|
||||
drop(proc);
|
||||
@@ -875,40 +858,13 @@ impl WalRedoProcess {
|
||||
// pending responses ring buffer and truncate all empty elements from the front,
|
||||
// advancing processed responses number.
|
||||
|
||||
let mut output = self.stdout.lock().unwrap();
|
||||
let mut output = self.stdout.lock().await;
|
||||
let n_processed_responses = output.n_processed_responses;
|
||||
while n_processed_responses + output.pending_responses.len() <= request_no {
|
||||
// We expect the WAL redo process to respond with an 8k page image. We read it
|
||||
// into this buffer.
|
||||
let mut resultbuf = vec![0; BLCKSZ.into()];
|
||||
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
|
||||
while nresult < BLCKSZ.into() {
|
||||
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
|
||||
// We do two things simultaneously: reading response from stdout
|
||||
// and forward any logging information that the child writes to its stderr to the page server's log.
|
||||
let n = loop {
|
||||
match nix::poll::poll(
|
||||
&mut stdout_pollfds[..],
|
||||
wal_redo_timeout.as_millis() as i32,
|
||||
) {
|
||||
Err(nix::errno::Errno::EINTR) => continue,
|
||||
res => break res,
|
||||
}
|
||||
}?;
|
||||
|
||||
if n == 0 {
|
||||
anyhow::bail!("WAL redo timed out");
|
||||
}
|
||||
|
||||
// If we have some data in stdout, read it to the result buffer.
|
||||
let out_revents = stdout_pollfds[0].revents().unwrap();
|
||||
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
|
||||
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
|
||||
}
|
||||
if out_revents.contains(PollFlags::POLLHUP) {
|
||||
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
|
||||
}
|
||||
}
|
||||
output.stdout.read_exact(&mut resultbuf).await.unwrap();
|
||||
output
|
||||
.pending_responses
|
||||
.push_back(Some(Bytes::from(resultbuf)));
|
||||
|
||||
40
pageserver/src/walredo/writebuf_pool.rs
Normal file
40
pageserver/src/walredo/writebuf_pool.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use postgres_ffi::BLCKSZ;
|
||||
|
||||
pub struct PooledVecU8(Option<Vec<u8>>);
|
||||
|
||||
// Thread-local list of re-usable buffers.
|
||||
thread_local! {
|
||||
static POOL: RefCell<Vec<Vec<u8>>> = RefCell::new(Vec::new());
|
||||
}
|
||||
|
||||
pub(crate) fn get() -> PooledVecU8 {
|
||||
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
|
||||
match maybe {
|
||||
Some(buf) => PooledVecU8(Some(buf)),
|
||||
None => PooledVecU8(Some(Vec::with_capacity((BLCKSZ as usize) * 3))),
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PooledVecU8 {
|
||||
fn drop(&mut self) {
|
||||
let mut buf = self.0.take().unwrap();
|
||||
buf.clear();
|
||||
POOL.with(|rc| rc.borrow_mut().push(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PooledVecU8 {
|
||||
type Target = Vec<u8>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::DerefMut for PooledVecU8 {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
self.0.as_mut().unwrap()
|
||||
}
|
||||
}
|
||||
36
results.txt
Normal file
36
results.txt
Normal file
@@ -0,0 +1,36 @@
|
||||
run on i3en.3xlarge
|
||||
|
||||
admin@ip-172-31-13-23:[~/neon-main]: du -hs /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
|
||||
225G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
|
||||
|
||||
=> ~2.25x main memory
|
||||
|
||||
admin@ip-172-31-13-23:[~/neon-main]: NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
|
||||
|
||||
--------------------------------------------------------------------------------- Benchmark results ---------------------------------------------------------------------------------
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.STD_FS
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2321
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 8,785.440 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 20,234.239 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.TOKIO_EPOLL_URING
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2200
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 9,046.271 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 16,457.727 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 16,457.727 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 16,457.727 ms
|
||||
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 16,457.727 ms
|
||||
|
||||
=========================================================================== 2 passed in 142.33s (0:02:22) ===========================================================================
|
||||
@@ -20,10 +20,10 @@ fi
|
||||
|
||||
# do all the on-disk initialization work now instead of a background kernel thread
|
||||
# so that we're ready for benchmarking right after this line
|
||||
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
#sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
|
||||
|
||||
MOUNTPOINT=/instance_store
|
||||
sudo mkdir "$MOUNTPOINT"
|
||||
sudo rmdir "$MOUNTPOINT" || sudo mkdir "$MOUNTPOINT"
|
||||
sudo mount /dev/nvme1n1 "$MOUNTPOINT"
|
||||
sudo chown -R "$(id -u)":"$(id -g)" "$MOUNTPOINT"
|
||||
|
||||
@@ -40,7 +40,7 @@ To run your local neon.git build on the instance store volume,
|
||||
run the following commands from the top of the neon.git checkout
|
||||
|
||||
# raise file descriptor limit of your shell and its child processes
|
||||
sudo prlimit -p $$ --nofile=800000:800000
|
||||
sudo prlimit -p \$\$ --nofile=800000:800000
|
||||
|
||||
# test suite run
|
||||
export TEST_OUTPUT="$TEST_OUTPUT"
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
import enum
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Tuple
|
||||
|
||||
import toml
|
||||
|
||||
import fixtures.pageserver.many_tenants as many_tenants
|
||||
import pytest
|
||||
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
|
||||
@@ -17,6 +20,10 @@ from fixtures.utils import get_scale_for_db, humantime_to_ms
|
||||
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
|
||||
|
||||
class IoEngine(str, enum.Enum):
|
||||
STD_FS = "std-fs"
|
||||
TOKIO_EPOLL_URING = "tokio-epoll-uring"
|
||||
|
||||
# For reference, the space usage of the snapshots:
|
||||
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
|
||||
# 137G /instance_store/test_output/shared-snapshots
|
||||
@@ -27,9 +34,10 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
|
||||
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
|
||||
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
|
||||
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
|
||||
@pytest.mark.parametrize("ioengine", [IoEngine.STD_FS, IoEngine.TOKIO_EPOLL_URING])
|
||||
@pytest.mark.parametrize("duration", [30])
|
||||
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
|
||||
@pytest.mark.parametrize("n_tenants", [1, 10])
|
||||
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100]])
|
||||
@pytest.mark.parametrize("n_tenants", [1000])
|
||||
@pytest.mark.timeout(
|
||||
10000
|
||||
) # TODO: this value is just "a really high number"; have this per instance type
|
||||
@@ -40,6 +48,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
n_tenants: int,
|
||||
pgbench_scale: int,
|
||||
duration: int,
|
||||
ioengine: IoEngine,
|
||||
):
|
||||
def record(metric, **kwargs):
|
||||
zenbenchmark.record(
|
||||
@@ -60,9 +69,12 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
# configure cache sizes like in prod
|
||||
page_cache_size = 16384
|
||||
max_file_descriptors = 500000
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
|
||||
)
|
||||
pageserver_config_override = {
|
||||
"page_cache_size": f"{page_cache_size}",
|
||||
"max_file_descriptors": f"{max_file_descriptors}",
|
||||
"virtual_file_io_engine": f"\"{ioengine}\"",
|
||||
}
|
||||
neon_env_builder.pageserver_config_override = ";".join([f"{k}={v}" for k, v in pageserver_config_override.items()])
|
||||
params.update(
|
||||
{
|
||||
"pageserver_config_override.page_cache_size": (
|
||||
@@ -70,12 +82,17 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
|
||||
{"unit": "byte"},
|
||||
),
|
||||
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
|
||||
"pageserver_config.override.virtual_file_io_engine": (ioengine, {"unit": ""}),
|
||||
}
|
||||
)
|
||||
|
||||
for param, (value, kwargs) in params.items():
|
||||
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
|
||||
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
|
||||
ps_http =env.pageserver.http_client()
|
||||
for tenant_info in ps_http.tenant_list():
|
||||
tenant_id = tenant_info["id"]
|
||||
ps_http.patch_tenant_config_client_side(tenant_id, {"compaction_period": "10s"})
|
||||
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user