Compare commits

...

13 Commits

Author SHA1 Message Date
Christian Schwarz
af87cb54b7 persist benchmarked config 2023-12-08 13:29:30 +00:00
Christian Schwarz
3e8f78c1fa virtual_file metrics: expose max size of the fd cache
And also leave a comment on how to determine current size.

Kind of follow-up to #6066

refs https://github.com/neondatabase/cloud/issues/8351
refs https://github.com/neondatabase/neon/issues/5479
2023-12-08 13:21:34 +00:00
Christian Schwarz
8f02bdf96d page cache: improve eviction-related metrics
These changes help with identifying thrashing.

The existing `pageserver_page_cache_find_victim_iters_total` is already
useful, but, it doesn't tell us how many individual find_victim() calls
are happening, only how many clock-LRU steps happened in the entire system,
without info about whether we needed to actually evict other data vs
just scan for a long time, e.g., because the cache is large.

The changes in this PR allows us to
1. count each possible outcome separately, esp evictions
2. compute mean iterations/outcome

I don't think anyone except me was paying close attention to
`pageserver_page_cache_find_victim_iters_total` before, so,
I think the slight behavior change of also counting iterations
for the 'iters exceeded' case is fine.

refs https://github.com/neondatabase/cloud/issues/8351
refs https://github.com/neondatabase/neon/issues/5479
2023-12-08 12:57:34 +00:00
Christian Schwarz
c06279a40f Revert "revert recent VirtualFile asyncification changes (#5291)"
This reverts commit ab1f37e908.

fixes #5479
2023-12-08 12:07:35 +00:00
Christian Schwarz
4bb6d893c9 test results
34e69cfc93 18:04:59 - 18:31:15 => 05:01 + 21:15 = 26:16 duration

That's ca 5min slower than what we saw without tokio-epoll-uring
(scratches head)
2023-12-08 12:06:38 +00:00
Christian Schwarz
07496b12d0 improve instructions 2023-12-08 12:06:38 +00:00
Christian Schwarz
ba295c68b0 usage instructions for generator script 2023-12-08 12:06:38 +00:00
Christian Schwarz
31bf1570ff many_tenants script now works 2023-12-08 12:06:38 +00:00
Christian Schwarz
0ea9503264 update many tenants script to use the new method for duplicating tenants (copy-paste from benchmarking WIP PR) 2023-12-08 12:06:38 +00:00
Christian Schwarz
35dc783592 Squashed commit of the following:
commit de90ba56d4
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Nov 27 14:47:26 2023 +0000

    expose generation number in API

commit ae2c7589f9
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Nov 27 14:53:13 2023 +0000

    pagectl: add subcommand to rewrite layer file history
2023-12-08 12:06:38 +00:00
Christian Schwarz
2a5ae8e356 measured BACKGROUND_RUNTIME performance using wrk
Launch wrk from command line 3-4 seconds after the load starts.
=> blocking of executor threads is clearly visible, my branch
  performs _much_ better.

baseline: commit 15b8618d25 (HEAD -> problame/loadtest-baseline, origin/problame/loadtest-baseline, main)
neon-main (compaction semaphore disabled!)

admin@ip-172-31-13-23:[~/neon]: wrk --latency http://localhost:2342
Running 10s test @ http://localhost:2342
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    71.42ms   15.97ms 125.18ms   70.82%
    Req/Sec    41.44     28.85   101.00     57.35%
  Latency Distribution
     50%   72.53ms
     75%   82.07ms
     90%   91.44ms
     99%  116.56ms
  291 requests in 10.01s, 22.73KB read
  Socket errors: connect 0, read 0, write 0, timeout 10
Requests/sec:     29.07
Transfer/sec:      2.27KB

this branch (comapction semaphore also disabled!):

admin@ip-172-31-13-23:[~/neon]: wrk --latency http://localhost:2342
Running 10s test @ http://localhost:2342
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    45.74ms   64.13ms 293.44ms   83.27%
    Req/Sec   442.81    258.18     1.32k    69.79%
  Latency Distribution
     50%    2.92ms
     75%   75.52ms
     90%  148.03ms
     99%  248.50ms
  8641 requests in 10.01s, 675.08KB read
Requests/sec:    862.81
Transfer/sec:     67.41KB
2023-12-08 12:06:38 +00:00
Christian Schwarz
fa81f8507a HACK: BACKGROUND_RUNTIME webserver to measure response time using wrk 2023-12-08 12:06:38 +00:00
Christian Schwarz
8e59f200dc REPRO the problem: , uses 430GB of space; 4 seconds load time; constant 20kIOPS after ~20s 2023-12-08 12:06:38 +00:00
16 changed files with 459 additions and 93 deletions

View File

@@ -0,0 +1,39 @@
remote_storage ={local_path='/mnt/many_tenants/test_pageserver_startup_many_tenants/repo/local_fs_remote_storage/pageserver'}
id =1
pg_distrib_dir ='/home/admin/neon/pg_install'
http_auth_type ='Trust'
pg_auth_type ='Trust'
listen_http_addr ='localhost:15004'
listen_pg_addr ='localhost:15003'
broker_endpoint ='http://127.0.0.1:15001/'
control_plane_api ='http://127.0.0.1:15002/'
# Initial configuration file created by 'pageserver --init'
#listen_pg_addr = '127.0.0.1:64000'
#listen_http_addr = '127.0.0.1:9898'
#wait_lsn_timeout = '60 s'
#wal_redo_timeout = '60 s'
#max_file_descriptors = 100
# initial superuser role name to use when creating a new tenant
#initial_superuser_name = 'cloud_admin'
#broker_endpoint = 'http://127.0.0.1:50051'
#log_format = 'plain'
#concurrent_tenant_size_logical_size_queries = '1'
metric_collection_endpoint = "https://127.0.0.1:6666"
#metric_collection_interval = '10 min'
#cached_metric_collection_interval = '0s'
#synthetic_size_calculation_interval = '10 min'
#disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}
#background_task_maximum_delay = '10s'
[tenant_config]

View File

@@ -364,6 +364,8 @@ pub struct TenantInfo {
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -827,6 +829,7 @@ mod tests {
state: TenantState::Active,
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -847,6 +850,7 @@ mod tests {
},
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -641,6 +641,31 @@ fn start_pageserver(
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::BackgroundRuntimeTurnaroundMeasure,
None,
None,
"background runtime turnaround measure",
true,
async move {
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
let server = server
.serve(hyper::service::make_service_fn(|_| async move {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
move |_: hyper::Request<hyper::Body>| async move {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
hyper::Body::from(format!("alive")),
))
},
))
}))
.with_graceful_shutdown(task_mgr::shutdown_watcher());
server.await?;
Ok(())
},
);
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -269,7 +269,7 @@ async fn calculate_synthetic_size_worker(
}
};
for (tenant_id, tenant_state) in tenants {
for (tenant_id, tenant_state, _gen) in tenants {
if tenant_state != TenantState::Active {
continue;
}

View File

@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
if state != TenantState::Active {
None
} else {

View File

@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
for (tenant_id, _state, _gen) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}

View File

@@ -790,11 +790,12 @@ async fn tenant_list_handler(
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state)| TenantInfo {
.map(|(id, state, gen)| TenantInfo {
id: *id,
state: state.clone(),
current_physical_size: None,
attachment_status: state.attachment_status(),
generation: (*gen).into(),
})
.collect::<Vec<TenantInfo>>();
@@ -823,6 +824,7 @@ async fn tenant_status(
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
})
}
.instrument(info_span!("tenant_status_handler", %tenant_id))

View File

@@ -285,6 +285,63 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});
pub(crate) mod page_cache_eviction_metrics {
use std::num::NonZeroUsize;
use metrics::{register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
#[derive(Clone, Copy)]
pub(crate) enum Outcome {
FoundSlotUnused { iters: NonZeroUsize },
FoundSlotEvicted { iters: NonZeroUsize },
ItersExceeded { iters: NonZeroUsize },
}
static ITERS_TOTAL_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
&["outcome"],
)
.expect("failed to define a metric")
});
static CALLS_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_find_victim_calls",
"Incremented at the end of each find_victim() call.\
Filter by outcome to get e.g., eviction rate.",
&["outcome"]
)
.unwrap()
});
pub(crate) fn observe(outcome: Outcome) {
macro_rules! dry {
($label:literal, $iters:expr) => {{
static LABEL: &'static str = $label;
static ITERS_TOTAL: Lazy<IntCounter> =
Lazy::new(|| ITERS_TOTAL_VEC.with_label_values(&[LABEL]));
static CALLS: Lazy<IntCounter> =
Lazy::new(|| CALLS_VEC.with_label_values(&[LABEL]));
ITERS_TOTAL.inc_by(($iters.get()) as u64);
CALLS.inc();
}};
}
match outcome {
Outcome::FoundSlotUnused { iters } => dry!("found_empty", iters),
Outcome::FoundSlotEvicted { iters } => {
dry!("found_evicted", iters)
}
Outcome::ItersExceeded { iters } => {
dry!("err_iters_exceeded", iters);
super::page_cache_errors_inc(super::PageCacheErrorKind::EvictIterLimit);
}
}
}
}
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
@@ -294,14 +351,6 @@ pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
)
.expect("failed to define a metric")
});
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"page_cache_errors_total",
@@ -842,6 +891,25 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.expect("failed to define a metric")
});
pub(crate) mod virtual_file_descriptor_cache {
use super::*;
pub(crate) static SIZE_MAX: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_virtual_file_descriptor_cache_size_max",
"Maximum number of open file descriptors in the cache."
).unwrap()
});
// SIZE_CURRENT: derive it like so:
// ```
// sum (pageserver_io_operations_seconds_count{operation=~"^(open|open-after-replace)$")
// -ignoring(operation)
// sum(pageserver_io_operations_seconds_count{operation=~"^(close|close-by-replace)$"}
// ```
}
#[derive(Debug)]
struct GlobalAndPerTimelineHistogram {
global: Histogram,

View File

@@ -88,7 +88,11 @@ use utils::{
lsn::Lsn,
};
use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key};
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
repository::Key,
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
@@ -897,8 +901,10 @@ impl PageCache {
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::ItersExceeded {
iters: iters.try_into().unwrap(),
},
);
anyhow::bail!("exceeded evict iter limit");
}
@@ -909,8 +915,18 @@ impl PageCache {
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.key = None;
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
iters: iters.try_into().unwrap(),
},
);
} else {
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::FoundSlotUnused {
iters: iters.try_into().unwrap(),
},
);
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
}
}

View File

@@ -293,6 +293,8 @@ pub enum TaskKind {
DebugTool,
BackgroundRuntimeTurnaroundMeasure,
#[cfg(test)]
UnitTest,
}

View File

@@ -1756,6 +1756,10 @@ impl Tenant {
self.current_state() == TenantState::Active
}
pub fn generation(&self) -> Generation {
self.generation
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup

View File

@@ -1542,7 +1542,8 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub(crate) async fn list_tenants(
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1550,12 +1551,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
TenantSlot::Attached(tenant) => Some((id, tenant.current_state(), tenant.generation())),
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})
// TODO(sharding): make callers of this function shard-aware
.map(|(k, v)| (k.tenant_id, v))
.map(|(a, b, c)| (a.tenant_id, b, c))
.collect())
}

View File

@@ -708,11 +708,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -390,11 +390,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -18,7 +18,8 @@ use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
///
@@ -111,7 +112,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -143,7 +144,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
@@ -154,7 +155,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -250,6 +251,29 @@ impl<T> MaybeFatalIo<T> for std::io::Result<T> {
}
}
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
@@ -286,14 +310,12 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
// where our caller doesn't get to use the returned VirtualFile before its
// slot gets re-used by someone else.
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -366,22 +388,24 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
let open_files = get_open_files();
let mut handle_guard = {
@@ -391,27 +415,23 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
let mut handle = *self.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
let handle_guard = self.handle.write().await;
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -425,20 +445,16 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Re-open the physical file.
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
// case from StorageIoOperation::Open. This helps with identifying thrashing
// of the virtual file descriptor cache.
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::OpenAfterReplace)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
let file = observe_duration!(
StorageIoOperation::OpenAfterReplace,
self.open_options.open(&self.path)
)?;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -446,7 +462,9 @@ impl VirtualFile {
*handle_guard = handle;
Ok(result)
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
}
pub fn remove(self) {
@@ -461,11 +479,9 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -553,9 +569,9 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
@@ -565,9 +581,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -577,6 +593,18 @@ impl VirtualFile {
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -609,22 +637,41 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
let handle = self.handle.get_mut();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
if let Some(fd) = slot_guard.file.take() {
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(fd));
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
if let Some(fd) = slot_guard.file.take() {
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(fd));
}
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
}
}
@@ -654,6 +701,7 @@ pub fn init(num_slots: usize) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;

View File

@@ -0,0 +1,157 @@
import queue
import shutil
import subprocess
import threading
from pathlib import Path
from typing import List, Optional
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId
def duplicate_tenant(
env: NeonEnv, remote_storage: LocalFsStorage, template_tenant: TenantId, new_tenant: TenantId
):
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
assert isinstance(remote_storage, LocalFsStorage)
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
cmd: List[str] = [
str(
env.neon_binpath / "pagectl"
), # TODO: abstract this like the other binaries
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
subprocess.run(cmd, check=True)
else:
# index_part etc need no patching
pass
return None
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Usage
TEST_OUTPUT=/mnt/many_tenants NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
Then
export NEON_REPO_DIR=/mnt/many_tenants/test_pageserver_startup_many_tenants/repo
# edit $NEON_REPO_DIR/pageserver_1/pageserver.toml to use metric collection,
# with intervals from prod:
#
# metric_collection_endpoint = "https://127.0.0.1:6666"
# metric_collection_interval: 10min
# cached_metric_collection_interval: 0s
# run a fake metric collection endpoint in some other terminal using
# python3 -m http.server 6666 > /dev/null 2>&1
# then start pageserver
ulimit -SH -n 100000
./target/release/neon_local start
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.enable_generations = True
env = neon_env_builder.init_start()
remote_storage = env.pageserver_remote_storage
assert isinstance(remote_storage, LocalFsStorage)
# cleanup initial tenant
env.pageserver.tenant_detach(env.initial_tenant)
# create our template tenant
tenant_config_mgmt_api = {
"gc_period": "0s",
"checkpoint_timeout": "3650 day",
"compaction_period": "20 s",
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
ps_http = env.pageserver.http_client()
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
ep.safe_psql("create table foo(b text)")
for _i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ep.stop_and_destroy()
env.pageserver.tenant_detach(template_tenant)
# duplicate the tenant in remote storage
def worker(queue: queue.Queue[Optional[TenantId]]):
while True:
tenant_id = queue.get()
if tenant_id is None:
return
assert isinstance(remote_storage, LocalFsStorage)
duplicate_tenant(env, remote_storage, template_tenant, tenant_id)
new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, 20_000)]
duplications: queue.Queue[Optional[TenantId]] = queue.Queue()
for t in new_tenants:
duplications.put(t)
workers = []
for _ in range(0, 8):
w = threading.Thread(target=worker, args=[duplications])
workers.append(w)
w.start()
duplications.put(None)
for w in workers:
w.join()
# for evaluation, use the same background loop periods as in prod
benchmark_tenant_config = {k: v for k, v in tenant_config_mgmt_api.items()}
del benchmark_tenant_config["compaction_period"]
del benchmark_tenant_config["gc_period"]
benchmark_tenant_config["eviction_policy"] = {
"kind": "LayerAccessThreshold",
"period": "10m",
# don't do evictions
"threshold": "1000d",
}
assert ps_http.tenant_list() == []
for tenant in new_tenants:
env.pageserver.tenant_attach(tenant, config=benchmark_tenant_config)
for tenant in new_tenants:
wait_until_tenant_active(ps_http, tenant)
# ensure all layers are resident for predictiable performance
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
for tenant in new_tenants:
ps_http.download_all_layers(tenant, template_timeline)