Compare commits

...

25 Commits

Author SHA1 Message Date
Christian Schwarz
3cd4f8aa59 possibly found the place where we do all those allocations, will check tomorrow 2024-01-29 20:25:02 +00:00
Christian Schwarz
c98215674c avoid Vec::new() in walredo code path; still no dramatic improvement over before_scratch.svg 2024-01-29 20:10:03 +00:00
Christian Schwarz
0e3561f6d1 WIP: try to eliminate the raw_vec::finish_grow and bytes::promotable_even-drop
This one doesn't make a big difference.
2024-01-29 19:52:05 +00:00
Christian Schwarz
28a4247c97 rip out slot pinning, has about 5% speedup 2024-01-29 19:37:35 +00:00
Christian Schwarz
70bc01494c Revert "broken impl of a permit pool to shave off its allocations"
This reverts commit a1af2c7150.
2024-01-29 19:23:09 +00:00
Christian Schwarz
a1af2c7150 broken impl of a permit pool to shave off its allocations 2024-01-29 19:22:55 +00:00
Christian Schwarz
043ed5edea for posterity: RSS is about 18GB with previous bench at env.pageserver_config_override='page_cache_size=2097152;max_file_descriptors=500000;virtual_file_io_engine="tokio-epoll-uring"' 2024-01-29 18:35:07 +00:00
Christian Schwarz
6753ff089c results: req_lru_size=2 gives tokio-epoll-uring 16k GetPage/s@110kIOPs std-fs: 9.5 GetPage/s @ 65k IOPS
RUST_BACKTRACE=1 ./target/release/pagebench get-page-latest-lsn --mgmt-api-endpoint http://localhost:15011 --page-service-connstring=postgresql://localhost:15010  --keyspace-cache keyspace.cache   --limit-to-first-n-targets 1000 --set-io-engine tokio-epoll-uring --set-req-lru-size 2 --runtime 2m

Biggest gain with lru_size from 0 to 1, yay.
Adding one more gives another 1-2k

cgroup mem.high unlimited
made sure global page cache is large enough to not have any misses

MAKE SURE TO WARM UP, IT TAKES A WHILE, STILL DON'T KNOW WHY WARMUP IS
THAT BADLY NEEDED

std-fs: 50% cpu, lot of iowait
2024-01-29T18:25:52.923572Z  INFO all clients stopped
{
  "total": {
    "request_count": 1194213,
    "latency_mean": "68ms 343us",
    "latency_percentiles": {
      "p95": "152ms 63us",
      "p99": "201ms 215us",
      "p99.9": "260ms 991us",
      "p99.99": "314ms 623us"
    }
  }
}

tokio-epoll-uring: 100%cpu utilization
Disk isn't saturated.
We're CPU bound here.

{
  "total": {
    "request_count": 1927700,
    "latency_mean": "43ms 11us",
    "latency_percentiles": {
      "p95": "83ms 263us",
      "p99": "101ms 887us",
      "p99.9": "124ms 991us",
      "p99.99": "147ms 583us"
    }
  }
}
2024-01-29 18:33:04 +00:00
Christian Schwarz
49a5e411d6 implement request-scoped LRU cache 2024-01-29 18:22:00 +00:00
Christian Schwarz
21a11822e8 results: tokio-epoll-uring 3.3kGetPage/s@240k IOPS, std-fs: 1.2kGetPage/s@80k IOPS
We have immense read amplification, I think we read the same blk
multiple times during one getpage request.

Before the switch to O_DIRECT, we'd go to the kernel page cache
many times. std-fs has an edge there, it's more efficient than
tokio-epoll-uring for workloads that have a high kernel page cache hit
rate.

With O_DIRECT, we now go to the disk for each read, making the inefficiency apparent.
tokio-epoll-uring is mcuh better there, as we can see it can drive up to
240k IOPS, which is 2GiBs random 8k reads, which afaik is the max that
the EC2 NVMe allows.
CPU isn't near 100%.
SO, we're IO bound.

Idea to try out to reduce the read amplification: request-local page cache.
2024-01-29 16:21:22 +00:00
Christian Schwarz
aca2d7bdea use O_DIRECT for VirtualFile reads 2024-01-29 16:21:14 +00:00
Christian Schwarz
db44395ee2 rip out materialized page cache 2024-01-29 14:45:16 +00:00
Christian Schwarz
03874009ec add back page cache but not for DeltaLayerValue and ImageLayerValue 2024-01-29 14:44:55 +00:00
Christian Schwarz
0033b4c985 results: both tokio-epoll-uring and std-fs achieve about 4k GetPage/sec @ 60k IOPS 2024-01-29 13:53:13 +00:00
Christian Schwarz
a608667301 rip out page cache 2024-01-29 13:53:07 +00:00
Christian Schwarz
b9b7670a3a hack: use a single runtime in pageserver
doesn't seem to make a meaningful perf difference under
get-page-latest-lsn load
2024-01-29 12:23:45 +00:00
Christian Schwarz
62a3d87098 results under higher memory pressure show that tokio-epoll-uring pays off
setup:

sudo mkdir /sys/fs/cgroup/benchmark
admin@ip-172-31-13-23:[~/neon-main]: sudo mkdir /sys/fs/cgroup/benchmark
admin@ip-172-31-13-23:[~/neon-main]: sudo chown admin:admin /sys/fs/cgroup/benchmark
admin@ip-172-31-13-23:[~/neon-main]: sudo chown admin:admin /sys/fs/cgroup/benchmark/cgroup.procs
admin@ip-172-31-13-23:[~/neon-main]: echo THE_PID_OF_THE_SHELL_WHERE_WE_LAUNCH_PAGESERVER > /sys/fs/cgroup/benchmark/cgroup.procs

from another shell, that's not in the cgroup, run pagebench

admin@ip-172-31-13-23:[~/neon-main]: RUST_BACKTRACE=1 ./target/release/pagebench get-page-latest-lsn --mgmt-api-endpoint http://localhost:15011 --page-service-connstring=postgresql://localhost:15010  --keyspace-cache keyspace.cache  --per-target-rate-limit 2000 --limit-to-first-n-targets 500  --set-io-engine YOUR_IO_ENGINE --runtime 10s

tokio-epoll-uring:

{
  "total": {
    "request_count": 63780,
    "latency_mean": "77ms 993us",
    "latency_percentiles": {
      "p95": "120ms 703us",
      "p99": "143ms 743us",
      "p99.9": "171ms 775us",
      "p99.99": "195ms 583us"
    }
  }
}

Does ca 85-90k IOPS to the NVMe.

std-fs

{
  "total": {
    "request_count": 49303,
    "latency_mean": "100ms 669us",
    "latency_percentiles": {
      "p95": "214ms 399us",
      "p99": "268ms 799us",
      "p99.9": "335ms 359us",
      "p99.99": "399ms 615us"
    }
  }
}

Does ca 70k IOPS to the NVMe.

with higher memroy pre
2024-01-29 10:50:52 +00:00
Christian Schwarz
8d6ce71b29 hacky: ability to set io_engine via mgmt_api => pagebench 2024-01-29 10:50:20 +00:00
Christian Schwarz
d23ea718ee 2min 3 tenants, 2000 req/s each; that is 0 IOPS workload (all in PS/Kernel page cache)
Very comparable.
tokio-epoll-uring
{
  "total": {
    "request_count": 719999,
    "latency_mean": "375us",
    "latency_percentiles": {
      "p95": "576us",
      "p99": "649us",
      "p99.9": "823us",
      "p99.99": "1ms 636us"
    }
  }
}

std-fs
{
  "total": {
    "request_count": 719997,
    "latency_mean": "341us",
    "latency_percentiles": {
      "p95": "543us",
      "p99": "618us",
      "p99.9": "748us",
      "p99.99": "1ms 358us"
    }
  }
}
2024-01-27 12:59:15 +00:00
Christian Schwarz
73a7ca38b3 same config, but, rate limit of 2/sec per tenant => bursty due to ticker behavior
RUST_BACKTRACE=1 ./target/release/pagebench get-page-latest-lsn --mgmt-api-endpoint http://localhost:15011 --page-service-connstring=postgresql://localhost:15010  --keyspace-cache keyspace.cache  --per-target-rate-limit 2 --runtime 2m

std-fs

{
  "total": {
    "request_count": 240001,
    "latency_mean": "73ms 562us",
    "latency_percentiles": {
      "p95": "101ms 311us",
      "p99": "106ms 431us",
      "p99.9": "115ms 455us",
      "p99.99": "129ms 407us"
    }
  }
}

tokio-epoll-uring

{
  "total": {
    "request_count": 240000,
    "latency_mean": "84ms 517us",
    "latency_percentiles": {
      "p95": "116ms 671us",
      "p99": "125ms 759us",
      "p99.9": "138ms 239us",
      "p99.99": "148ms 223us"
    }
  }
}
2024-01-27 12:51:11 +00:00
Christian Schwarz
7eb1d4cfa6 manual 2min test run including warmup
RUST_BACKTRACE=1 ./target/release/pagebench get-page-latest-lsn --mgmt-api-endpoint http://localhost:15011 --page-service-connstring=postgresql://localhost:15010  --keyspace-cache keyspace.cache --runtime 2m

2min std-fs
{
    "total": {
      "request_count": 1213184,
      "latency_mean": "67ms 793us",
      "latency_percentiles": {
        "p95": "153ms 471us",
        "p99": "197ms 247us",
        "p99.9": "246ms 399us",
        "p99.99": "288ms 255us"
      }
    }
  }

2min tokio-eoll-uring

{
  "total": {
    "request_count": 825637,
    "latency_mean": "108ms 702us",
    "latency_percentiles": {
      "p95": "136ms 959us",
      "p99": "191ms 615us",
      "p99.9": "9s 977ms 855us",
      "p99.99": "16s 334ms 847us"
    }
  }
}
2024-01-27 12:48:32 +00:00
Christian Schwarz
6ebd683327 TODO/workaround: walredo quiescing broken with compaction_period=0 2024-01-27 12:48:27 +00:00
Christian Schwarz
b1ecdfe099 WIP: async walredo 2024-01-27 12:47:29 +00:00
Christian Schwarz
82a74d0e77 pagebench: fix percentiles reporting 2024-01-27 12:46:36 +00:00
Christian Schwarz
49b43c75e2 run test_pageserver_max_throughput_getpage_at_latest_lsn with 1k tenants, compare std-fs with tokio-epoll-uring 2024-01-26 16:49:12 +00:00
27 changed files with 509 additions and 647 deletions

10
Cargo.lock generated
View File

@@ -2716,6 +2716,15 @@ version = "0.4.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
[[package]]
name = "lru"
version = "0.12.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db2c024b41519440580066ba82aab04092b333e09066a5eb86c7c4890df31f22"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@@ -3337,6 +3346,7 @@ dependencies = [
"humantime-serde",
"hyper",
"itertools",
"lru",
"md5",
"metrics",
"nix 0.27.1",

View File

@@ -86,6 +86,7 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
lru = "0.12.2"
[dev-dependencies]
criterion.workspace = true

View File

@@ -275,4 +275,22 @@ impl Client {
.await
.map_err(Error::ReceiveBody)
}
pub async fn set_io_engine(&self, engine_str: &str) -> Result<()> {
let uri = format!("{}/v1/set_io_engine", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, engine_str)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
pub async fn set_request_lru_size(&self, size: usize) -> Result<()> {
let uri = format!("{}/v1/set_req_lru_size", self.mgmt_api_endpoint);
self.request(Method::PUT, uri, size)
.await?
.json()
.await
.map_err(Error::ReceiveBody)
}
}

View File

@@ -51,6 +51,10 @@ pub(crate) struct Args {
/// It doesn't get invalidated if the keyspace changes under the hood, e.g., due to new ingested data or compaction.
#[clap(long)]
keyspace_cache: Option<Utf8PathBuf>,
#[clap(long)]
set_io_engine: Option<String>,
#[clap(long)]
set_req_lru_size: Option<usize>,
targets: Option<Vec<TenantTimelineId>>,
}
@@ -103,6 +107,14 @@ async fn main_impl(
args.pageserver_jwt.as_deref(),
));
if let Some(engine_str) = &args.set_io_engine {
mgmt_api_client.set_io_engine(engine_str).await?;
}
if let Some(req_lru_size) = &args.set_req_lru_size {
mgmt_api_client.set_request_lru_size(*req_lru_size).await?;
}
// discover targets
let timelines: Vec<TenantTimelineId> = crate::util::cli::targets::discover(
&mgmt_api_client,

View File

@@ -66,12 +66,12 @@ impl serde::Serialize for LatencyPercentiles {
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for p in LATENCY_PERCENTILES {
for (i, p) in LATENCY_PERCENTILES.iter().enumerate() {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[0])
&humantime::format_duration(self.latency_percentiles[i])
),
)?;
}

View File

@@ -0,0 +1,91 @@
use std::cell::RefCell;
use crate::tenant::disk_btree::PAGE_SZ;
#[repr(C, align(8192))]
struct BufferContent([u8; PAGE_SZ]);
impl BufferContent {
fn empty() -> Self {
BufferContent(std::array::from_fn(|_| 0))
}
}
pub struct Buffer(Option<Box<BufferContent>>);
// Thread-local list of re-usable buffers.
thread_local! {
static POOL: RefCell<Vec<Box<BufferContent>>> = RefCell::new(Vec::new());
}
pub(crate) fn get() -> Buffer {
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
match maybe {
Some(buf) => Buffer(Some(buf)),
None => Buffer(Some(Box::new(BufferContent::empty()))),
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let buf = self.0.take().unwrap();
POOL.with(|rc| rc.borrow_mut().push(buf))
}
}
impl std::ops::Deref for Buffer {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
&self.0.as_ref().unwrap().as_ref().0
}
}
impl std::ops::DerefMut for Buffer {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0.as_mut().unwrap().as_mut().0
}
}
pub(crate) struct PageWriteGuardBuf {
page: Buffer,
init_up_to: usize,
}
impl PageWriteGuardBuf {
pub fn new(buf: Buffer) -> Self {
PageWriteGuardBuf {
page: buf,
init_up_to: 0,
}
}
pub fn assume_init(self) -> Buffer {
assert_eq!(self.init_up_to, PAGE_SZ);
self.page
}
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.page.len()
}
}
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
// hence it's safe to hand out the `stable_mut_ptr()`.
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.page.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}

View File

@@ -86,7 +86,9 @@
//! [`RequestContext`] argument. Functions in the middle of the call chain
//! only need to pass it on.
use crate::task_mgr::TaskKind;
use std::sync::{Arc, Mutex};
use crate::{buffer_pool, page_cache, task_mgr::TaskKind};
// The main structure of this module, see module-level comment.
#[derive(Clone, Debug)]
@@ -95,6 +97,8 @@ pub struct RequestContext {
download_behavior: DownloadBehavior,
access_stats_behavior: AccessStatsBehavior,
page_content_kind: PageContentKind,
pub(crate) buf_cache:
Option<Arc<Mutex<lru::LruCache<page_cache::CacheKey, Arc<buffer_pool::Buffer>>>>>,
}
/// The kind of access to the page cache.
@@ -150,6 +154,7 @@ impl RequestContextBuilder {
download_behavior: DownloadBehavior::Download,
access_stats_behavior: AccessStatsBehavior::Update,
page_content_kind: PageContentKind::Unknown,
buf_cache: None,
},
}
}
@@ -163,6 +168,7 @@ impl RequestContextBuilder {
download_behavior: original.download_behavior,
access_stats_behavior: original.access_stats_behavior,
page_content_kind: original.page_content_kind,
buf_cache: original.buf_cache.as_ref().map(Arc::clone),
},
}
}

View File

@@ -2052,5 +2052,28 @@ pub fn make_router(
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/keyspace",
|r| testing_api_handler("read out the keyspace", r, timeline_collect_keyspace),
)
.put("/v1/set_io_engine", |r| {
async fn set_io_engine_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let kind: crate::virtual_file::IoEngineKind = json_request(&mut r).await?;
crate::virtual_file::io_engine::set(kind);
json_response(StatusCode::OK, ())
}
api_handler(r, set_io_engine_handler)
})
.put("/v1/set_req_lru_size", |r| {
async fn set_req_lru_size_handler(
mut r: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let size: usize = json_request(&mut r).await?;
crate::tenant::timeline::REQ_LRU_SIZE
.store(size, std::sync::atomic::Ordering::Relaxed);
json_response(StatusCode::OK, ())
}
api_handler(r, set_req_lru_size_handler)
})
.any(handler_404))
}

View File

@@ -12,6 +12,7 @@ pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub use pageserver_api::keyspace;
pub(crate) mod buffer_pool;
pub mod metrics;
pub mod page_cache;
pub mod page_service;

View File

@@ -125,14 +125,6 @@ impl ReconstructTimeMetrics {
}
}
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
"Number of cache hits from materialized page cache without redo",
)
.expect("failed to define a metric")
});
pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_getpage_get_reconstruct_data_seconds",
@@ -142,14 +134,6 @@ pub(crate) static GET_RECONSTRUCT_DATA_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_total",
"Number of cache hits from materialized page cache",
)
.expect("failed to define a metric")
});
pub(crate) struct GetVectoredLatency {
map: EnumMap<TaskKind, Option<Histogram>>,
}
@@ -188,12 +172,8 @@ pub(crate) static GET_VECTORED_LATENCY: Lazy<GetVectoredLatency> = Lazy::new(||
});
pub(crate) struct PageCacheMetricsForTaskKind {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
}
pub(crate) struct PageCacheMetrics {
@@ -226,16 +206,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
let content_kind = <PageContentKind as enum_map::Enum>::from_usize(content_kind);
let content_kind: &'static str = content_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[task_kind, "immutable", content_kind])
@@ -247,28 +217,6 @@ pub(crate) static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMet
.get_metric_with_label_values(&[task_kind, "immutable", content_kind, "-"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"exact",
])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[
task_kind,
"materialized_page",
content_kind,
"older_lsn",
])
.unwrap()
},
}
}))
})),
@@ -386,7 +334,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
#[derive(IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
@@ -2402,8 +2349,6 @@ pub fn preinitialize_metrics() {
// counters
[
&MATERIALIZED_PAGE_CACHE_HIT,
&MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
&UNEXPECTED_ONDEMAND_DOWNLOADS,
&WALRECEIVER_STARTED_CONNECTIONS,
&WALRECEIVER_BROKER_UPDATES,

View File

@@ -74,17 +74,13 @@
use std::{
collections::{hash_map::Entry, HashMap},
convert::TryInto,
sync::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
time::Duration,
sync::atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
};
use anyhow::Context;
use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use utils::{id::TimelineId, lsn::Lsn};
use utils::id::TimelineId;
use crate::{
context::RequestContext,
@@ -137,17 +133,10 @@ pub fn next_file_id() -> FileId {
///
/// CacheKey uniquely identifies a "thing" to cache in the page cache.
///
#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
#[allow(clippy::enum_variant_names)]
enum CacheKey {
MaterializedPage {
hash_key: MaterializedPageHashKey,
lsn: Lsn,
},
ImmutableFilePage {
file_id: FileId,
blkno: u32,
},
pub(crate) enum CacheKey {
ImmutableFilePage { file_id: FileId, blkno: u32 },
}
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
@@ -163,12 +152,6 @@ struct MaterializedPageHashKey {
key: Key,
}
#[derive(Clone)]
struct Version {
lsn: Lsn,
slot_idx: usize,
}
struct Slot {
inner: tokio::sync::RwLock<SlotInner>,
usage_count: AtomicU8,
@@ -177,8 +160,17 @@ struct Slot {
struct SlotInner {
key: Option<CacheKey>,
// for `coalesce_readers_permit`
permit: std::sync::Mutex<Weak<PinnedSlotsPermit>>,
buf: &'static mut [u8; PAGE_SZ],
buf: &'static mut SlotContents,
}
#[derive(Clone)]
#[repr(C, align(8192))]
struct SlotContents([u8; PAGE_SZ]);
impl SlotContents {
fn empty() -> Self {
Self(std::array::from_fn(|_| 0))
}
}
impl Slot {
@@ -220,41 +212,12 @@ impl Slot {
}
}
impl SlotInner {
/// If there is aready a reader, drop our permit and share its permit, just like we share read access.
fn coalesce_readers_permit(&self, permit: PinnedSlotsPermit) -> Arc<PinnedSlotsPermit> {
let mut guard = self.permit.lock().unwrap();
if let Some(existing_permit) = guard.upgrade() {
drop(guard);
drop(permit);
existing_permit
} else {
let permit = Arc::new(permit);
*guard = Arc::downgrade(&permit);
permit
}
}
}
pub struct PageCache {
/// This contains the mapping from the cache key to buffer slot that currently
/// contains the page, if any.
///
/// TODO: This is protected by a single lock. If that becomes a bottleneck,
/// this HashMap can be replaced with a more concurrent version, there are
/// plenty of such crates around.
///
/// If you add support for caching different kinds of objects, each object kind
/// can have a separate mapping map, next to this field.
materialized_page_map: std::sync::RwLock<HashMap<MaterializedPageHashKey, Vec<Version>>>,
immutable_page_map: std::sync::RwLock<HashMap<(FileId, u32), usize>>,
/// The actual buffers with their metadata.
slots: Box<[Slot]>,
pinned_slots: Arc<tokio::sync::Semaphore>,
/// Index of the next candidate to evict, for the Clock replacement algorithm.
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
@@ -262,14 +225,11 @@ pub struct PageCache {
size_metrics: &'static PageCacheSizeMetrics,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
///
/// PageReadGuard is a "lease" on a buffer, for reading. The page is kept locked
/// until the guard is dropped.
///
pub struct PageReadGuard<'i> {
_permit: Arc<PinnedSlotsPermit>,
slot_guard: tokio::sync::RwLockReadGuard<'i, SlotInner>,
}
@@ -277,13 +237,13 @@ impl std::ops::Deref for PageReadGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.slot_guard.buf
&self.slot_guard.buf.0
}
}
impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
fn as_ref(&self) -> &[u8; PAGE_SZ] {
self.slot_guard.buf
&self.slot_guard.buf.0
}
}
@@ -301,7 +261,6 @@ pub struct PageWriteGuard<'i> {
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
}
@@ -309,7 +268,7 @@ enum PageWriteGuardState<'i> {
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner } => &mut inner.buf.0,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -320,7 +279,7 @@ impl std::ops::Deref for PageWriteGuard<'_> {
fn deref(&self) -> &Self::Target {
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => inner.buf,
PageWriteGuardState::Invalid { inner } => &inner.buf.0,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
@@ -332,10 +291,9 @@ impl<'a> PageWriteGuard<'a> {
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
PageWriteGuardState::Invalid { inner } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
@@ -352,7 +310,7 @@ impl Drop for PageWriteGuard<'_> {
///
fn drop(&mut self) {
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
PageWriteGuardState::Invalid { inner } => {
assert!(inner.key.is_some());
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
@@ -370,166 +328,6 @@ pub enum ReadBufResult<'a> {
}
impl PageCache {
//
// Section 1.1: Public interface functions for looking up and memorizing materialized page
// versions in the page cache
//
/// Look up a materialized page version.
///
/// The 'lsn' is an upper bound, this will return the latest version of
/// the given block, but not newer than 'lsn'. Returns the actual LSN of the
/// returned page.
pub async fn lookup_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
let Ok(permit) = self.try_get_pinned_slot_permit().await else {
return None;
};
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_accesses_materialized_page
.inc();
let mut cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key: *key,
},
lsn,
};
if let Some(guard) = self
.try_lock_for_read(&mut cache_key, &mut Some(permit))
.await
{
if let CacheKey::MaterializedPage {
hash_key: _,
lsn: available_lsn,
} = cache_key
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_ctx(ctx)
.read_hits_materialized_page_older_lsn
.inc();
}
Some((available_lsn, guard))
} else {
panic!("unexpected key type in slot");
}
} else {
None
}
}
///
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
key: Key,
lsn: Lsn,
img: &[u8],
) -> anyhow::Result<()> {
let cache_key = CacheKey::MaterializedPage {
hash_key: MaterializedPageHashKey {
tenant_shard_id,
timeline_id,
key,
},
lsn,
};
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
if let Some(slot_idx) = self.search_mapping_exact(&cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
if inner.key.as_ref() == Some(&cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
debug_assert_eq!(inner.buf.len(), img.len());
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(inner.buf == img);
return Ok(());
}
}
debug_assert!(permit.is_some());
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.await
.context("Failed to find evict victim")?;
// Insert mapping for this. At this point, we may find that another
// thread did the same thing concurrently. In that case, we evicted
// our victim buffer unnecessarily. Put it into the free list and
// continue with the slot that the other thread chose.
if let Some(_existing_slot_idx) = self.try_insert_mapping(&cache_key, slot_idx) {
// TODO: put to free list
// We now just loop back to start from beginning. This is not
// optimal, we'll perform the lookup in the mapping again, which
// is not really necessary because we already got
// 'existing_slot_idx'. But this shouldn't happen often enough
// to matter much.
continue;
}
// Make the slot ready
let slot = &self.slots[slot_idx];
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
// Create a write guard for the slot so we go through the expected motions.
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
let mut write_guard = PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
};
write_guard.copy_from_slice(img);
let _ = write_guard.mark_valid();
return Ok(());
}
}
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
file_id: FileId,
@@ -549,27 +347,6 @@ impl PageCache {
// "mappings" after this section. But the routines in this section should
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
Duration::from_secs(10),
Arc::clone(&self.pinned_slots).acquire_owned(),
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
Err(_timeout) => {
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
);
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
}
}
}
/// Look up a page in the cache.
///
/// If the search criteria is not exact, *cache_key is updated with the key
@@ -579,11 +356,7 @@ impl PageCache {
///
/// If no page is found, returns None and *cache_key is left unmodified.
///
async fn try_lock_for_read(
&self,
cache_key: &mut CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageReadGuard> {
async fn try_lock_for_read(&self, cache_key: &mut CacheKey) -> Option<PageReadGuard> {
let cache_key_orig = cache_key.clone();
if let Some(slot_idx) = self.search_mapping(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
@@ -593,10 +366,7 @@ impl PageCache {
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
return Some(PageReadGuard { slot_guard: inner });
} else {
// search_mapping might have modified the search key; restore it.
*cache_key = cache_key_orig;
@@ -639,12 +409,7 @@ impl PageCache {
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE
.for_ctx(ctx)
@@ -657,19 +422,17 @@ impl PageCache {
let mut is_first_iteration = true;
loop {
// First check if the key already exists in the cache.
if let Some(read_guard) = self.try_lock_for_read(cache_key, &mut permit).await {
debug_assert!(permit.is_none());
if let Some(read_guard) = self.try_lock_for_read(cache_key).await {
if is_first_iteration {
hit.inc();
}
return Ok(ReadBufResult::Found(read_guard));
}
debug_assert!(permit.is_some());
is_first_iteration = false;
// Not found. Find a victim buffer
let (slot_idx, mut inner) = self
.find_victim(permit.as_ref().unwrap())
.find_victim()
.await
.context("Failed to find evict victim")?;
@@ -693,19 +456,8 @@ impl PageCache {
inner.key = Some(cache_key.clone());
slot.set_usage_count(1);
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
state: PageWriteGuardState::Invalid { inner },
}));
}
}
@@ -726,42 +478,6 @@ impl PageCache {
///
fn search_mapping(&self, cache_key: &mut CacheKey) -> Option<usize> {
match cache_key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
let version_idx = match versions.binary_search_by_key(lsn, |v| v.lsn) {
Ok(version_idx) => version_idx,
Err(0) => return None,
Err(version_idx) => version_idx - 1,
};
let version = &versions[version_idx];
*lsn = version.lsn;
Some(version.slot_idx)
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
}
}
}
/// Search for a page in the cache using the given search key.
///
/// Like 'search_mapping, but performs an "exact" search. Used for
/// allocating a new buffer.
fn search_mapping_exact(&self, key: &CacheKey) -> Option<usize> {
match key {
CacheKey::MaterializedPage { hash_key, lsn } => {
let map = self.materialized_page_map.read().unwrap();
let versions = map.get(hash_key)?;
if let Ok(version_idx) = versions.binary_search_by_key(lsn, |v| v.lsn) {
Some(versions[version_idx].slot_idx)
} else {
None
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let map = self.immutable_page_map.read().unwrap();
Some(*map.get(&(*file_id, *blkno))?)
@@ -774,27 +490,6 @@ impl PageCache {
///
fn remove_mapping(&self, old_key: &CacheKey) {
match old_key {
CacheKey::MaterializedPage {
hash_key: old_hash_key,
lsn: old_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
if let Entry::Occupied(mut old_entry) = map.entry(old_hash_key.clone()) {
let versions = old_entry.get_mut();
if let Ok(version_idx) = versions.binary_search_by_key(old_lsn, |v| v.lsn) {
versions.remove(version_idx);
self.size_metrics
.current_bytes_materialized_page
.sub_page_sz(1);
if versions.is_empty() {
old_entry.remove_entry();
}
}
} else {
panic!("could not find old key in mapping")
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
map.remove(&(*file_id, *blkno))
@@ -811,30 +506,6 @@ impl PageCache {
/// of the existing mapping and leaves it untouched.
fn try_insert_mapping(&self, new_key: &CacheKey, slot_idx: usize) -> Option<usize> {
match new_key {
CacheKey::MaterializedPage {
hash_key: new_key,
lsn: new_lsn,
} => {
let mut map = self.materialized_page_map.write().unwrap();
let versions = map.entry(new_key.clone()).or_default();
match versions.binary_search_by_key(new_lsn, |v| v.lsn) {
Ok(version_idx) => Some(versions[version_idx].slot_idx),
Err(version_idx) => {
versions.insert(
version_idx,
Version {
lsn: *new_lsn,
slot_idx,
},
);
self.size_metrics
.current_bytes_materialized_page
.add_page_sz(1);
None
}
}
}
CacheKey::ImmutableFilePage { file_id, blkno } => {
let mut map = self.immutable_page_map.write().unwrap();
match map.entry((*file_id, *blkno)) {
@@ -858,7 +529,6 @@ impl PageCache {
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let mut iters = 0;
@@ -940,39 +610,29 @@ impl PageCache {
fn new(num_pages: usize) -> Self {
assert!(num_pages > 0, "page cache size must be > 0");
// We could use Vec::leak here, but that potentially also leaks
// uninitialized reserved capacity. With into_boxed_slice and Box::leak
// this is avoided.
let page_buffer = Box::leak(vec![0u8; num_pages * PAGE_SZ].into_boxed_slice());
let slot_contents = Box::leak(vec![SlotContents::empty(); num_pages].into_boxed_slice());
let size_metrics = &crate::metrics::PAGE_CACHE_SIZE;
size_metrics.max_bytes.set_page_sz(num_pages);
size_metrics.current_bytes_immutable.set_page_sz(0);
size_metrics.current_bytes_materialized_page.set_page_sz(0);
let slots = page_buffer
.chunks_exact_mut(PAGE_SZ)
.map(|chunk| {
let buf: &mut [u8; PAGE_SZ] = chunk.try_into().unwrap();
Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf,
permit: std::sync::Mutex::new(Weak::new()),
}),
usage_count: AtomicU8::new(0),
}
let slots = slot_contents
.into_iter()
.map(|slot_contents| Slot {
inner: tokio::sync::RwLock::new(SlotInner {
key: None,
buf: slot_contents,
}),
usage_count: AtomicU8::new(0),
})
.collect();
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
slots,
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
}
}
}

View File

@@ -104,29 +104,29 @@ use crate::shutdown_pageserver;
// other operations, if the upload tasks e.g. get blocked on locks. It shouldn't
// happen, but still.
//
pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("compute request worker")
.enable_all()
.build()
.expect("Failed to create compute request runtime")
});
// pub static COMPUTE_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
// tokio::runtime::Builder::new_multi_thread()
// .thread_name("compute request worker")
// .enable_all()
// .build()
// .expect("Failed to create compute request runtime")
// });
pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("mgmt request worker")
.enable_all()
.build()
.expect("Failed to create mgmt request runtime")
});
// pub static MGMT_REQUEST_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
// tokio::runtime::Builder::new_multi_thread()
// .thread_name("mgmt request worker")
// .enable_all()
// .build()
// .expect("Failed to create mgmt request runtime")
// });
pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
.thread_name("walreceiver worker")
.enable_all()
.build()
.expect("Failed to create walreceiver runtime")
});
// pub static WALRECEIVER_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
// tokio::runtime::Builder::new_multi_thread()
// .thread_name("walreceiver worker")
// .enable_all()
// .build()
// .expect("Failed to create walreceiver runtime")
// });
pub static BACKGROUND_RUNTIME: Lazy<Runtime> = Lazy::new(|| {
tokio::runtime::Builder::new_multi_thread()
@@ -150,6 +150,10 @@ pub(crate) static BACKGROUND_RUNTIME_WORKER_THREADS: Lazy<usize> = Lazy::new(||
.unwrap_or_else(|_e| usize::max(2, num_cpus::get()))
});
pub static COMPUTE_REQUEST_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
pub static MGMT_REQUEST_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
pub static WALRECEIVER_RUNTIME: &once_cell::sync::Lazy<Runtime> = &BACKGROUND_RUNTIME;
#[derive(Debug, Clone, Copy)]
pub struct PageserverTaskId(u64);

View File

@@ -25,6 +25,7 @@ impl<'a> BlockCursor<'a> {
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
// TODO: used pooled allocation instead, used by ImageLayer::get_value_reconstruct_data
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)

View File

@@ -5,10 +5,11 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::Deref;
use std::sync::Arc;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -36,6 +37,7 @@ where
/// Reference to an in-memory copy of an immutable on-disk block.
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
BufferPool(Arc<crate::buffer_pool::Buffer>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
@@ -62,6 +64,7 @@ impl<'a> Deref for BlockLease<'a> {
fn deref(&self) -> &Self::Target {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::BufferPool(buf) => buf.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
@@ -174,17 +177,6 @@ impl FileBlockReader {
FileBlockReader { file_id, file }
}
/// Read a page from the underlying file into given buffer.
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
///
/// Returns a "lease" object that can be used to
@@ -195,21 +187,69 @@ impl FileBlockReader {
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
match ctx.page_content_kind() {
crate::context::PageContentKind::InMemoryLayer => {
unreachable!("this happens in inmemory_layer.rs")
}
crate::context::PageContentKind::Unknown
| crate::context::PageContentKind::DeltaLayerBtreeNode
| crate::context::PageContentKind::ImageLayerBtreeNode => {
let cache = page_cache::get();
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
let write_guard = async move {
assert!(write_guard.len() == PAGE_SZ);
self.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await
}
.await?;
Ok(write_guard.mark_valid().into())
}
}
}
crate::context::PageContentKind::ImageLayerValue
| crate::context::PageContentKind::DeltaLayerValue => {
let cache_key = page_cache::CacheKey::ImmutableFilePage {
file_id: self.file_id,
blkno: blknum,
};
if let Some(cache) = &ctx.buf_cache {
let mut cache = cache.lock().unwrap();
if let Some(cached) = cache.get(&cache_key) {
return Ok(BlockLease::BufferPool(Arc::clone(cached)));
};
}
let buf = crate::buffer_pool::get();
// Read the page from disk into the buffer
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
let buf = async move {
assert_eq!(buf.len(), PAGE_SZ);
std::io::Result::Ok(
self.file
.read_exact_at(
crate::buffer_pool::PageWriteGuardBuf::new(buf),
blknum as u64 * PAGE_SZ as u64,
)
.await?
.assume_init(),
)
}
.await?;
let buf = Arc::new(buf);
if let Some(cache) = &ctx.buf_cache {
cache.lock().unwrap().put(cache_key, Arc::clone(&buf));
}
Ok(BlockLease::BufferPool(buf))
}
}
}

View File

@@ -65,6 +65,7 @@ where
pub struct ValueReconstructState {
pub records: Vec<(Lsn, NeonWalRecord)>,
pub img: Option<(Lsn, Bytes)>,
pub(crate) scratch: Vec<u8>,
}
/// Return value from [`Layer::get_value_reconstruct_data`]

View File

@@ -688,7 +688,14 @@ impl DeltaLayerInner {
summary: Option<Summary>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new()
.read(true)
.custom_flags(nix::libc::O_DIRECT),
)
.await
{
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
@@ -771,15 +778,17 @@ impl DeltaLayerInner {
// Ok, 'offsets' now contains the offsets of all the entries we need to read
let cursor = file.block_cursor();
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor
.read_blob_into_buf(pos, &mut buf, ctx)
.read_blob_into_buf(pos, &mut reconstruct_state.scratch, ctx)
.await
.with_context(|| {
format!("Failed to read blob from virtual file {}", file.file.path)
})?;
let val = Value::des(&buf).with_context(|| {
// TODO: this one is super costly, it's allocating a Vec<> for the inner Bytes every time.
// That's on avg 200 allocations.
// Can we re-use the Vec from a buffer pool?
let val = Value::des(&reconstruct_state.scratch).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path

View File

@@ -367,7 +367,14 @@ impl ImageLayerInner {
summary: Option<Summary>,
ctx: &RequestContext,
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
let file = match VirtualFile::open_with_options(
path,
virtual_file::OpenOptions::new()
.read(true)
.custom_flags(nix::libc::O_DIRECT),
)
.await
{
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};

View File

@@ -199,7 +199,7 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
tenant.walredo_mgr.maybe_quiesce(period * 10);
tenant.walredo_mgr.maybe_quiesce(period * 10); // TODO: broken with compaction_period 0
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())

View File

@@ -7,6 +7,8 @@ pub mod span;
pub mod uninit;
mod walreceiver;
pub(crate) static REQ_LRU_SIZE: AtomicUsize = AtomicUsize::new(0);
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
@@ -33,8 +35,6 @@ use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::sync::gate::Gate;
use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
use std::ops::{Deref, Range};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -43,6 +43,14 @@ use std::{
cmp::{max, min, Ordering},
ops::ControlFlow,
};
use std::{
collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
sync::atomic::AtomicUsize,
};
use std::{
num::NonZeroUsize,
ops::{Deref, Range},
};
use crate::tenant::timeline::logical_size::CurrentLogicalSize;
use crate::tenant::{
@@ -70,9 +78,7 @@ use crate::{pgdatadir_mapping::LsnForTimestamp, tenant::tasks::BackgroundLoopKin
use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
};
use crate::metrics::TimelineMetrics;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use crate::tenant::config::TenantConfOpt;
use pageserver_api::key::{is_inherited_key, is_rel_fsm_block_key, is_rel_vm_block_key};
@@ -592,30 +598,10 @@ impl Timeline {
ctx.task_kind()
);
// Check the page cache. We will get back the most recent page with lsn <= `lsn`.
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
Ordering::Equal => {
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.inc();
return Ok(cached_img); // exact LSN match, return the image
}
Ordering::Greater => {
unreachable!("the returned lsn should never be after the requested lsn")
}
}
Some((cached_lsn, cached_img))
}
None => None,
};
let mut reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
img: None,
scratch: Vec::with_capacity(2 * 8192), // for good measure
};
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
@@ -2313,6 +2299,16 @@ impl Timeline {
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
let mut ctx = RequestContextBuilder::extend(ctx).build();
ctx.buf_cache = match REQ_LRU_SIZE.load(std::sync::atomic::Ordering::Relaxed) {
0 => None,
x => Some(Arc::new(Mutex::new(lru::LruCache::new(
// SAFETY: we just checked for 0 above
unsafe { NonZeroUsize::new_unchecked(x) },
)))),
};
let ctx = &ctx;
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2351,7 +2347,6 @@ impl Timeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(traversal_path);
}
if prev_lsn <= cont_lsn {
@@ -2555,26 +2550,6 @@ impl Timeline {
}
}
/// # Cancel-safety
///
/// This method is cancellation-safe.
async fn lookup_cached_page(
&self,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_shard_id, self.timeline_id, key, lsn, ctx)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
}
fn get_ancestor_timeline(&self) -> anyhow::Result<Arc<Timeline>> {
let ancestor = self.ancestor_timeline.as_ref().with_context(|| {
format!(
@@ -4398,8 +4373,6 @@ impl Timeline {
trace!("found {} WAL records that will init the page for {} at {}, performing WAL redo", data.records.len(), key, request_lsn);
};
let last_rec_lsn = data.records.last().unwrap().0;
let img = match self
.walredo_mgr
.request_redo(key, request_lsn, data.img, data.records, self.pg_version)
@@ -4410,23 +4383,6 @@ impl Timeline {
Err(e) => return Err(PageReconstructError::WalRedo(e)),
};
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();
if let Err(e) = cache
.memorize_materialized_page(
self.tenant_shard_id,
self.timeline_id,
key,
last_rec_lsn,
&img,
)
.await
.context("Materialized page memoization failed")
{
return Err(PageReconstructError::from(e));
}
}
Ok(img)
}
}

View File

@@ -28,7 +28,7 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
mod io_engine;
pub(crate) mod io_engine;
mod open_options;
pub use io_engine::IoEngineKind;
pub(crate) use open_options::*;

View File

@@ -26,23 +26,31 @@ pub enum IoEngineKind {
TokioEpollUring,
}
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
static IO_ENGINE: std::sync::RwLock<Option<IoEngineKind>> = std::sync::RwLock::new(None);
pub(crate) fn set(engine: IoEngineKind) {
let mut guard = IO_ENGINE.write().unwrap();
*guard = Some(engine);
let metric = &crate::metrics::virtual_file_io_engine::KIND;
metric.reset();
metric.with_label_values(&[&format!("{engine}")]).set(1);
drop(guard);
}
#[cfg(not(test))]
pub(super) fn init(engine: IoEngineKind) {
if IO_ENGINE.set(engine).is_err() {
panic!("called twice");
}
crate::metrics::virtual_file_io_engine::KIND
.with_label_values(&[&format!("{engine}")])
.set(1);
set(engine);
}
pub(super) fn get() -> &'static IoEngineKind {
pub(super) fn get() -> IoEngineKind {
#[cfg(test)]
{
let env_var_name = "NEON_PAGESERVER_UNIT_TEST_VIRTUAL_FILE_IOENGINE";
IO_ENGINE.get_or_init(|| match std::env::var(env_var_name) {
let guard = IO_ENGINE.read().unwrap();
if let Some(v) = guard.is_some() {
return v;
}
*guard = Some(match std::env::var(env_var_name) {
Ok(v) => match v.parse::<IoEngineKind>() {
Ok(engine_kind) => engine_kind,
Err(e) => {
@@ -57,10 +65,13 @@ pub(super) fn get() -> &'static IoEngineKind {
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var {env_var_name} is not unicode");
}
})
});
}
#[cfg(not(test))]
IO_ENGINE.get().unwrap()
IO_ENGINE
.read()
.unwrap()
.expect("should have called set() or init() before")
}
use std::os::unix::prelude::FileExt;

View File

@@ -1,7 +1,12 @@
//! Enum-dispatch to the `OpenOptions` type of the respective [`super::IoEngineKind`];
use nix::libc;
use super::IoEngineKind;
use std::{os::fd::OwnedFd, path::Path};
use std::{
os::{fd::OwnedFd, unix::fs::OpenOptionsExt},
path::Path,
};
#[derive(Debug, Clone)]
pub enum OpenOptions {
@@ -92,6 +97,18 @@ impl OpenOptions {
self
}
pub fn custom_flags(&mut self, custom_flags: libc::c_int) -> &mut OpenOptions {
match self {
OpenOptions::StdFs(x) => {
let _ = x.custom_flags(custom_flags);
}
OpenOptions::TokioEpollUring(x) => {
let _ = x.custom_flags(custom_flags);
}
}
self
}
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
match self {
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),

View File

@@ -21,7 +21,6 @@
use anyhow::Context;
use byteorder::{ByteOrder, LittleEndian};
use bytes::{BufMut, Bytes, BytesMut};
use nix::poll::*;
use pageserver_api::shard::TenantShardId;
use serde::Serialize;
use std::collections::VecDeque;
@@ -31,10 +30,11 @@ use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::CommandExt;
use std::process::Stdio;
use std::process::{Child, ChildStdin, ChildStdout, Command};
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use std::process::{Child, Command};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::Instant;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::*;
use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
@@ -73,12 +73,12 @@ pub(crate) struct BufferTag {
}
struct ProcessInput {
stdin: ChildStdin,
stdin: tokio::process::ChildStdin,
n_requests: usize,
}
struct ProcessOutput {
stdout: ChildStdout,
stdout: tokio::process::ChildStdout,
pending_responses: VecDeque<Option<Bytes>>,
n_processed_responses: usize,
}
@@ -112,6 +112,8 @@ fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
}
}
mod writebuf_pool;
///
/// Public interface of WAL redo manager
///
@@ -157,6 +159,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
};
img = Some(result?);
@@ -177,6 +180,7 @@ impl PostgresRedoManager {
self.conf.wal_redo_timeout,
pg_version,
)
.await
}
}
}
@@ -217,7 +221,7 @@ impl PostgresRedoManager {
/// Process one request for WAL redo using wal-redo postgres
///
#[allow(clippy::too_many_arguments)]
fn apply_batch_postgres(
async fn apply_batch_postgres(
&self,
key: Key,
lsn: Lsn,
@@ -270,6 +274,7 @@ impl PostgresRedoManager {
let buf_tag = BufferTag { rel, blknum };
let result = proc
.apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
.await
.context("apply_wal_records");
let duration = started_at.elapsed();
@@ -647,8 +652,8 @@ struct WalRedoProcess {
tenant_shard_id: TenantShardId,
// Some() on construction, only becomes None on Drop.
child: Option<NoLeakChild>,
stdout: Mutex<ProcessOutput>,
stdin: Mutex<ProcessInput>,
stdout: tokio::sync::Mutex<ProcessOutput>,
stdin: tokio::sync::Mutex<ProcessInput>,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
@@ -754,12 +759,12 @@ impl WalRedoProcess {
conf,
tenant_shard_id,
child: Some(child),
stdin: Mutex::new(ProcessInput {
stdin,
stdin: tokio::sync::Mutex::new(ProcessInput {
stdin: tokio::process::ChildStdin::from_std(stdin).unwrap(), // TODO error handling
n_requests: 0,
}),
stdout: Mutex::new(ProcessOutput {
stdout,
stdout: tokio::sync::Mutex::new(ProcessOutput {
stdout: tokio::process::ChildStdout::from_std(stdout).unwrap(), // TODO error handling
pending_responses: VecDeque::new(),
n_processed_responses: 0,
}),
@@ -779,15 +784,13 @@ impl WalRedoProcess {
// new page image.
//
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
fn apply_wal_records(
async fn apply_wal_records(
&self,
tag: BufferTag,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().unwrap();
// Serialize all the messages to send the WAL redo process first.
//
// This could be problematic if there are millions of records to replay,
@@ -797,7 +800,8 @@ impl WalRedoProcess {
// Most requests start with a before-image with BLCKSZ bytes, followed by
// by some other WAL records. Start with a buffer that can hold that
// comfortably.
let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
// TODO replace with allocation pool
let mut writebuf: writebuf_pool::PooledVecU8 = writebuf_pool::get();
build_begin_redo_for_block_msg(tag, &mut writebuf);
if let Some(img) = base_img {
build_push_page_msg(tag, img, &mut writebuf);
@@ -816,7 +820,7 @@ impl WalRedoProcess {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
let res = self.apply_wal_records0(&writebuf, wal_redo_timeout).await;
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
@@ -827,38 +831,17 @@ impl WalRedoProcess {
res
}
fn apply_wal_records0(
async fn apply_wal_records0(
&self,
writebuf: &[u8],
input: MutexGuard<ProcessInput>,
wal_redo_timeout: Duration,
_wal_redo_timeout: Duration, // TODO respect
) -> anyhow::Result<Bytes> {
let input = self.stdin.lock().await;
let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
let mut nwrite = 0usize;
while nwrite < writebuf.len() {
let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
let n = loop {
match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
proc.stdin.write_all(writebuf).await.unwrap(); // TODO: bring back timeout & error handling
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If 'stdin' is writeable, do write.
let in_revents = stdin_pollfds[0].revents().unwrap();
if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
nwrite += proc.stdin.write(&writebuf[nwrite..])?;
}
if in_revents.contains(PollFlags::POLLHUP) {
// We still have more data to write, but the process closed the pipe.
anyhow::bail!("WAL redo process closed its stdin unexpectedly");
}
}
let request_no = proc.n_requests;
proc.n_requests += 1;
drop(proc);
@@ -875,40 +858,13 @@ impl WalRedoProcess {
// pending responses ring buffer and truncate all empty elements from the front,
// advancing processed responses number.
let mut output = self.stdout.lock().unwrap();
let mut output = self.stdout.lock().await;
let n_processed_responses = output.n_processed_responses;
while n_processed_responses + output.pending_responses.len() <= request_no {
// We expect the WAL redo process to respond with an 8k page image. We read it
// into this buffer.
let mut resultbuf = vec![0; BLCKSZ.into()];
let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
while nresult < BLCKSZ.into() {
let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
// We do two things simultaneously: reading response from stdout
// and forward any logging information that the child writes to its stderr to the page server's log.
let n = loop {
match nix::poll::poll(
&mut stdout_pollfds[..],
wal_redo_timeout.as_millis() as i32,
) {
Err(nix::errno::Errno::EINTR) => continue,
res => break res,
}
}?;
if n == 0 {
anyhow::bail!("WAL redo timed out");
}
// If we have some data in stdout, read it to the result buffer.
let out_revents = stdout_pollfds[0].revents().unwrap();
if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
nresult += output.stdout.read(&mut resultbuf[nresult..])?;
}
if out_revents.contains(PollFlags::POLLHUP) {
anyhow::bail!("WAL redo process closed its stdout unexpectedly");
}
}
output.stdout.read_exact(&mut resultbuf).await.unwrap();
output
.pending_responses
.push_back(Some(Bytes::from(resultbuf)));

View File

@@ -0,0 +1,40 @@
use std::cell::RefCell;
use postgres_ffi::BLCKSZ;
pub struct PooledVecU8(Option<Vec<u8>>);
// Thread-local list of re-usable buffers.
thread_local! {
static POOL: RefCell<Vec<Vec<u8>>> = RefCell::new(Vec::new());
}
pub(crate) fn get() -> PooledVecU8 {
let maybe = POOL.with(|rc| rc.borrow_mut().pop());
match maybe {
Some(buf) => PooledVecU8(Some(buf)),
None => PooledVecU8(Some(Vec::with_capacity((BLCKSZ as usize) * 3))),
}
}
impl Drop for PooledVecU8 {
fn drop(&mut self) {
let mut buf = self.0.take().unwrap();
buf.clear();
POOL.with(|rc| rc.borrow_mut().push(buf))
}
}
impl std::ops::Deref for PooledVecU8 {
type Target = Vec<u8>;
fn deref(&self) -> &Self::Target {
self.0.as_ref().unwrap()
}
}
impl std::ops::DerefMut for PooledVecU8 {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.as_mut().unwrap()
}
}

36
results.txt Normal file
View File

@@ -0,0 +1,36 @@
run on i3en.3xlarge
admin@ip-172-31-13-23:[~/neon-main]: du -hs /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
225G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-1000-6/snapshot/local_fs_remote_storage/pageserver/tenants
=> ~2.25x main memory
admin@ip-172-31-13-23:[~/neon-main]: NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS=1 DEFAULT_PG_VERSION=15 BUILD_TYPE=release ./scripts/pytest test_runner/performance/pageserver/pagebench/test_pageserver_max_throughput_getpage_at_latest_lsn.py
--------------------------------------------------------------------------------- Benchmark results ---------------------------------------------------------------------------------
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.STD_FS
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2321
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 8,785.440 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-std-fs].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 20,234.239 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.n_tenants: 1000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pgbench_scale: 6
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.duration: 30 s
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.page_cache_size: 134217728 byte
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config_override.max_file_descriptors: 500000
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.pageserver_config.override.virtual_file_io_engine: IoEngine.TOKIO_EPOLL_URING
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.request_count: 2200
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_mean: 9,046.271 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p95: 16,457.727 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99: 16,457.727 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.9: 16,457.727 ms
test_pageserver_max_throughput_getpage_at_latest_lsn[1000-6-30-tokio-epoll-uring].pageserver_max_throughput_getpage_at_latest_lsn.latency_percentiles.p99.99: 16,457.727 ms
=========================================================================== 2 passed in 142.33s (0:02:22) ===========================================================================

View File

@@ -20,10 +20,10 @@ fi
# do all the on-disk initialization work now instead of a background kernel thread
# so that we're ready for benchmarking right after this line
sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
#sudo mkfs.ext4 -E lazy_itable_init=0,lazy_journal_init=0 /dev/nvme1n1
MOUNTPOINT=/instance_store
sudo mkdir "$MOUNTPOINT"
sudo rmdir "$MOUNTPOINT" || sudo mkdir "$MOUNTPOINT"
sudo mount /dev/nvme1n1 "$MOUNTPOINT"
sudo chown -R "$(id -u)":"$(id -g)" "$MOUNTPOINT"
@@ -40,7 +40,7 @@ To run your local neon.git build on the instance store volume,
run the following commands from the top of the neon.git checkout
# raise file descriptor limit of your shell and its child processes
sudo prlimit -p $$ --nofile=800000:800000
sudo prlimit -p \$\$ --nofile=800000:800000
# test suite run
export TEST_OUTPUT="$TEST_OUTPUT"

View File

@@ -1,7 +1,10 @@
import enum
import json
from pathlib import Path
from typing import Any, Dict, Tuple
import toml
import fixtures.pageserver.many_tenants as many_tenants
import pytest
from fixtures.benchmark_fixture import MetricReport, NeonBenchmarker
@@ -17,6 +20,10 @@ from fixtures.utils import get_scale_for_db, humantime_to_ms
from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
class IoEngine(str, enum.Enum):
STD_FS = "std-fs"
TOKIO_EPOLL_URING = "tokio-epoll-uring"
# For reference, the space usage of the snapshots:
# admin@ip-172-31-13-23:[~/neon-main]: sudo du -hs /instance_store/test_output/shared-snapshots
# 137G /instance_store/test_output/shared-snapshots
@@ -27,9 +34,10 @@ from performance.pageserver.util import ensure_pageserver_ready_for_benchmarking
# 5.1G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-10-6
# 76G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-13
# 46G /instance_store/test_output/shared-snapshots/max_throughput_latest_lsn-100-6
@pytest.mark.parametrize("ioengine", [IoEngine.STD_FS, IoEngine.TOKIO_EPOLL_URING])
@pytest.mark.parametrize("duration", [30])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100, 200]])
@pytest.mark.parametrize("n_tenants", [1, 10])
@pytest.mark.parametrize("pgbench_scale", [get_scale_for_db(s) for s in [100]])
@pytest.mark.parametrize("n_tenants", [1000])
@pytest.mark.timeout(
10000
) # TODO: this value is just "a really high number"; have this per instance type
@@ -40,6 +48,7 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
n_tenants: int,
pgbench_scale: int,
duration: int,
ioengine: IoEngine,
):
def record(metric, **kwargs):
zenbenchmark.record(
@@ -60,9 +69,12 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
# configure cache sizes like in prod
page_cache_size = 16384
max_file_descriptors = 500000
neon_env_builder.pageserver_config_override = (
f"page_cache_size={page_cache_size}; max_file_descriptors={max_file_descriptors}"
)
pageserver_config_override = {
"page_cache_size": f"{page_cache_size}",
"max_file_descriptors": f"{max_file_descriptors}",
"virtual_file_io_engine": f"\"{ioengine}\"",
}
neon_env_builder.pageserver_config_override = ";".join([f"{k}={v}" for k, v in pageserver_config_override.items()])
params.update(
{
"pageserver_config_override.page_cache_size": (
@@ -70,12 +82,17 @@ def test_pageserver_max_throughput_getpage_at_latest_lsn(
{"unit": "byte"},
),
"pageserver_config_override.max_file_descriptors": (max_file_descriptors, {"unit": ""}),
"pageserver_config.override.virtual_file_io_engine": (ioengine, {"unit": ""}),
}
)
for param, (value, kwargs) in params.items():
record(param, metric_value=value, report=MetricReport.TEST_PARAM, **kwargs)
env = setup_pageserver_with_pgbench_tenants(neon_env_builder, pg_bin, n_tenants, pgbench_scale)
ps_http =env.pageserver.http_client()
for tenant_info in ps_http.tenant_list():
tenant_id = tenant_info["id"]
ps_http.patch_tenant_config_client_side(tenant_id, {"compaction_period": "10s"})
run_benchmark_max_throughput_latest_lsn(env, pg_bin, record, duration)