Compare commits

...

15 Commits

Author SHA1 Message Date
Christian Schwarz
f911050e31 Pull in squash of page_cache: find_victim: prevent starvation #5483
Squashed commit of the following:

commit 71bf9cf8ae
Author: Christian Schwarz <me@cschwarz.com>
Date:   Mon Oct 9 20:21:22 2023 +0000

    origin/problame/page-cache-forward-progress/3: trace spans and events only for tests

commit fd97c98dd9
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Oct 9 21:02:27 2023 +0200

    move into library

commit 05dbff7a18
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Oct 9 19:26:47 2023 +0200

    commented out the check for just-once-polled, works now, don't understand why though

commit 31632502aa
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Oct 9 17:54:44 2023 +0200

    fixes

commit 76d3e44588
Author: Christian Schwarz <christian@neon.tech>
Date:   Fri Oct 6 14:39:40 2023 +0200

    hand-roll it instead

commit a5912dcc1b
Author: Christian Schwarz <me@cschwarz.com>
Date:   Wed Oct 4 17:21:24 2023 +0000

    page_cache: find_victim: prevent starvation

commit da9a88a882
Author: Christian Schwarz <me@cschwarz.com>
Date:   Mon Oct 2 17:39:35 2023 +0000

    page_cache: ensure forward progress on cache miss
2023-11-29 11:51:24 +00:00
Christian Schwarz
2eb9f64978 results look good: with initial size calculation in background, we can revert the revert without incurrent significant overhead
with revert    49b63570c8 20:05:04 - 20:28:00 =>                22:56 duration
without revert e0ac820e87 11:46:38 - 12:09:05 => 13:22 + 9:05 = 22:30 duration
2023-11-29 11:37:26 +00:00
Christian Schwarz
e0ac820e87 Revert "Revert "revert recent VirtualFile asyncification changes (#5291)""
This reverts commit fef7018ec6.
2023-11-29 10:42:56 +00:00
Christian Schwarz
49b63570c8 idea: concurrency-limit initial logical size calculation
Before this patch, there was no concurrency limit on initial logical
size computations.

In an experiment with a PS with 20k tenants, 1 timeline each,
all tenants inactive in SKs / not present in storage broker,
all logical size calculations are spawned by MetricsCollection,
i.e., consumption metrics worker.

Before this patch, these timelines would all do their initial logical
size calculation in parallel, leading to extreme thrashing in page cache
and virtual file cache.

With this patch, the virtual file cache thrashing is reduced
signficantly (from 80k `open`-system-calls/second to ~500
`open`-system-calls/second during loading).

This patch uses the existing background tasks semaphore to limit
concurrency, which generally is the right call for background activity.
However, due to logical size's involvement in PageserverFeedback towards
safekeepers, I think we need a priority-boosting mechanism, e.g., if
we're still calculating but walreceiver is actively asking, skip the
semaphore. That's fairly easy to implement, but, want to some feedback
on the general idea first before implementing it.
See also the FIXME in the block comment added in this commit.

NB: when evaluating, keep in mind that consumption metrics worker
persists its interval across restarts; delete the state file on disk
to get predictable (and I believe worst-case in terms of concurrency
during PS restart) behavior.
2023-11-28 19:02:07 +00:00
Christian Schwarz
fef7018ec6 Revert "revert recent VirtualFile asyncification changes (#5291)"
This reverts commit ab1f37e908.

fixes #5479
2023-11-28 15:25:54 +00:00
Christian Schwarz
d16ff0d12c usage instructions for generator script 2023-11-28 15:24:58 +00:00
Christian Schwarz
b4b3f3e3b6 Revert "idea: concurrency-limit initial logical size calculation"
This reverts commit e94fdd9838c0abcb823b891c1e7389c2615e4f5a.
2023-11-28 15:24:58 +00:00
Christian Schwarz
537ef94146 idea: concurrency-limit initial logical size calculation
Before this patch, there was no concurrency limit on initial logical
size computations.

In an experiment with a PS with 20k tenants, 1 timeline each,
all tenants inactive in SKs / not present in storage broker,
all logical size calculations are spawned by MetricsCollection,
i.e., consumption metrics worker.

Before this patch, these timelines would all do their initial logical
size calculation in parallel, leading to extreme thrashing in page cache
and virtual file cache.

With this patch, the virtual file cache thrashing is reduced
signficantly (from 80k `open`-system-calls/second to ~500
`open`-system-calls/second during loading).

This patch uses the existing background tasks semaphore to limit
concurrency, which generally is the right call for background activity.
However, due to logical size's involvement in PageserverFeedback towards
safekeepers, I think we need a priority-boosting mechanism, e.g., if
we're still calculating but walreceiver is actively asking, skip the
semaphore. That's fairly easy to implement, but, want to some feedback
on the general idea first before implementing it.
See also the FIXME in the block comment added in this commit.

NB: when evaluating, keep in mind that consumption metrics worker
persists its interval across restarts; delete the state file on disk
to get predictable (and I believe worst-case in terms of concurrency
during PS restart) behavior.
2023-11-28 15:24:58 +00:00
Christian Schwarz
efe0d93bf5 many_tenants script now works 2023-11-27 16:11:05 +00:00
Christian Schwarz
306880081d test suite: add method for generation-aware detachment of a tenant 2023-11-27 16:06:54 +00:00
Christian Schwarz
9915597d3a update many tenants script to use the new method for duplicating tenants (copy-paste from benchmarking WIP PR) 2023-11-27 15:12:27 +00:00
Christian Schwarz
e01c0c989e Squashed commit of the following:
commit de90ba56d4
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Nov 27 14:47:26 2023 +0000

    expose generation number in API

commit ae2c7589f9
Author: Christian Schwarz <christian@neon.tech>
Date:   Mon Nov 27 14:53:13 2023 +0000

    pagectl: add subcommand to rewrite layer file history
2023-11-27 15:00:51 +00:00
Christian Schwarz
3a95fbcae9 measured BACKGROUND_RUNTIME performance using wrk
Launch wrk from command line 3-4 seconds after the load starts.
=> blocking of executor threads is clearly visible, my branch
  performs _much_ better.

baseline: commit 15b8618d25 (HEAD -> problame/loadtest-baseline, origin/problame/loadtest-baseline, main)
neon-main (compaction semaphore disabled!)

admin@ip-172-31-13-23:[~/neon]: wrk --latency http://localhost:2342
Running 10s test @ http://localhost:2342
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    71.42ms   15.97ms 125.18ms   70.82%
    Req/Sec    41.44     28.85   101.00     57.35%
  Latency Distribution
     50%   72.53ms
     75%   82.07ms
     90%   91.44ms
     99%  116.56ms
  291 requests in 10.01s, 22.73KB read
  Socket errors: connect 0, read 0, write 0, timeout 10
Requests/sec:     29.07
Transfer/sec:      2.27KB

this branch (comapction semaphore also disabled!):

admin@ip-172-31-13-23:[~/neon]: wrk --latency http://localhost:2342
Running 10s test @ http://localhost:2342
  2 threads and 10 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    45.74ms   64.13ms 293.44ms   83.27%
    Req/Sec   442.81    258.18     1.32k    69.79%
  Latency Distribution
     50%    2.92ms
     75%   75.52ms
     90%  148.03ms
     99%  248.50ms
  8641 requests in 10.01s, 675.08KB read
Requests/sec:    862.81
Transfer/sec:     67.41KB
2023-11-27 13:13:07 +00:00
Christian Schwarz
c9dc9e7d70 HACK: BACKGROUND_RUNTIME webserver to measure response time using wrk 2023-11-27 13:13:07 +00:00
Christian Schwarz
fc7403944e REPRO the problem: , uses 430GB of space; 4 seconds load time; constant 20kIOPS after ~20s 2023-11-27 13:13:07 +00:00
27 changed files with 857 additions and 83 deletions

12
Cargo.lock generated
View File

@@ -2610,6 +2610,17 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nostarve_queue"
version = "0.1.0"
dependencies = [
"futures",
"rand 0.8.5",
"scopeguard",
"tokio",
"tracing",
]
[[package]]
name = "notify"
version = "5.2.0"
@@ -2951,6 +2962,7 @@ dependencies = [
"itertools",
"metrics",
"nix 0.26.2",
"nostarve_queue",
"num-traits",
"num_cpus",
"once_cell",

View File

@@ -27,6 +27,7 @@ members = [
"libs/postgres_ffi/wal_craft",
"libs/vm_monitor",
"libs/walproposer",
"libs/nostarve_queue",
]
[workspace.package]
@@ -37,6 +38,7 @@ license = "Apache-2.0"
[workspace.dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-channel = "1.9.0"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.16"
azure_identity = "0.16"
@@ -191,6 +193,7 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
utils = { version = "0.1", path = "./libs/utils/" }
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
walproposer = { version = "0.1", path = "./libs/walproposer/" }
nostarve_queue = { path = "./libs/nostarve_queue" }
## Common library dependency
workspace_hack = { version = "0.1", path = "./workspace_hack/" }

View File

@@ -0,0 +1,39 @@
remote_storage ={local_path='/mnt/many_tenants/test_pageserver_startup_many_tenants/repo/local_fs_remote_storage/pageserver'}
id =1
pg_distrib_dir ='/home/admin/neon/pg_install'
http_auth_type ='Trust'
pg_auth_type ='Trust'
listen_http_addr ='localhost:15004'
listen_pg_addr ='localhost:15003'
broker_endpoint ='http://127.0.0.1:15001/'
control_plane_api ='http://127.0.0.1:15002/'
# Initial configuration file created by 'pageserver --init'
#listen_pg_addr = '127.0.0.1:64000'
#listen_http_addr = '127.0.0.1:9898'
#wait_lsn_timeout = '60 s'
#wal_redo_timeout = '60 s'
#max_file_descriptors = 100
# initial superuser role name to use when creating a new tenant
#initial_superuser_name = 'cloud_admin'
#broker_endpoint = 'http://127.0.0.1:50051'
#log_format = 'plain'
#concurrent_tenant_size_logical_size_queries = '1'
metric_collection_endpoint = "https://127.0.0.1:6666"
#metric_collection_interval = '10 min'
#cached_metric_collection_interval = '0s'
#synthetic_size_calculation_interval = '10 min'
#disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}
#background_task_maximum_delay = '10s'
[tenant_config]

View File

@@ -21,7 +21,7 @@ use pageserver_api::models::{
use pageserver_api::shard::TenantShardId;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
@@ -99,7 +99,7 @@ impl PageServerNode {
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
conf: conf.clone(),
env: env.clone(),
http_client: Client::new(),
http_client: ClientBuilder::new().timeout(None).build().unwrap(),
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
}
}

View File

@@ -0,0 +1,14 @@
[package]
name = "nostarve_queue"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
scopeguard.workspace = true
tracing.workspace = true
[dev-dependencies]
futures.workspace = true
rand.workspace = true
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] }

View File

@@ -0,0 +1,316 @@
//! Synchronization primitive to prevent starvation among concurrent tasks that do the same work.
use std::{
collections::VecDeque,
fmt,
future::poll_fn,
sync::Mutex,
task::{Poll, Waker},
};
pub struct Queue<T> {
inner: Mutex<Inner<T>>,
}
struct Inner<T> {
waiters: VecDeque<usize>,
free: VecDeque<usize>,
slots: Vec<Option<(Option<Waker>, Option<T>)>>,
}
#[derive(Clone, Copy)]
pub struct Position<'q, T> {
idx: usize,
queue: &'q Queue<T>,
}
impl<T> fmt::Debug for Position<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Position").field("idx", &self.idx).finish()
}
}
impl<T> Inner<T> {
#[cfg(not(test))]
#[inline]
fn integrity_check(&self) {}
#[cfg(test)]
fn integrity_check(&self) {
use std::collections::HashSet;
let waiters = self.waiters.iter().copied().collect::<HashSet<_>>();
let free = self.free.iter().copied().collect::<HashSet<_>>();
for (slot_idx, slot) in self.slots.iter().enumerate() {
match slot {
None => {
assert!(!waiters.contains(&slot_idx));
assert!(free.contains(&slot_idx));
}
Some((None, None)) => {
assert!(waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
Some((Some(_), Some(_))) => {
assert!(!waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
Some((Some(_), None)) => {
assert!(waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
Some((None, Some(_))) => {
assert!(!waiters.contains(&slot_idx));
assert!(!free.contains(&slot_idx));
}
}
}
}
}
impl<T> Queue<T> {
pub fn new(size: usize) -> Self {
Queue {
inner: Mutex::new(Inner {
waiters: VecDeque::new(),
free: (0..size).collect(),
slots: {
let mut v = Vec::with_capacity(size);
v.resize_with(size, || None);
v
},
}),
}
}
pub fn begin(&self) -> Result<Position<T>, ()> {
#[cfg(test)]
tracing::trace!("get in line locking inner");
let mut inner = self.inner.lock().unwrap();
inner.integrity_check();
let my_waitslot_idx = inner
.free
.pop_front()
.expect("can't happen, len(slots) = len(waiters");
inner.waiters.push_back(my_waitslot_idx);
let prev = inner.slots[my_waitslot_idx].replace((None, None));
assert!(prev.is_none());
inner.integrity_check();
Ok(Position {
idx: my_waitslot_idx,
queue: &self,
})
}
}
impl<'q, T> Position<'q, T> {
pub fn complete_and_wait(self, datum: T) -> impl std::future::Future<Output = T> + 'q {
#[cfg(test)]
tracing::trace!("found victim locking waiters");
let mut inner = self.queue.inner.lock().unwrap();
inner.integrity_check();
let winner_idx = inner.waiters.pop_front().expect("we put ourselves in");
#[cfg(test)]
tracing::trace!(winner_idx, "putting victim into next waiters slot");
let winner_slot = inner.slots[winner_idx].as_mut().unwrap();
let prev = winner_slot.1.replace(datum);
assert!(
prev.is_none(),
"ensure we didn't mess up this simple ring buffer structure"
);
if let Some(waker) = winner_slot.0.take() {
#[cfg(test)]
tracing::trace!(winner_idx, "waking up winner");
waker.wake()
}
inner.integrity_check();
drop(inner); // the poll_fn locks it again
let mut poll_num = 0;
let mut drop_guard = Some(scopeguard::guard((), |()| {
panic!("must not drop this future until Ready");
}));
// take the victim that was found by someone else
poll_fn(move |cx| {
let my_waitslot_idx = self.idx;
poll_num += 1;
#[cfg(test)]
tracing::trace!(poll_num, "poll_fn locking waiters");
let mut inner = self.queue.inner.lock().unwrap();
inner.integrity_check();
let my_waitslot = inner.slots[self.idx].as_mut().unwrap();
// assert!(
// poll_num <= 2,
// "once we place the waker in the slot, next wakeup should have a result: {}",
// my_waitslot.1.is_some()
// );
if let Some(res) = my_waitslot.1.take() {
#[cfg(test)]
tracing::trace!(poll_num, "have cache slot");
// above .take() resets the waiters slot to None
debug_assert!(my_waitslot.0.is_none());
debug_assert!(my_waitslot.1.is_none());
inner.slots[my_waitslot_idx] = None;
inner.free.push_back(my_waitslot_idx);
let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap());
inner.integrity_check();
return Poll::Ready(res);
}
// assert_eq!(poll_num, 1);
if !my_waitslot
.0
.as_ref()
.map(|existing| cx.waker().will_wake(existing))
.unwrap_or(false)
{
let prev = my_waitslot.0.replace(cx.waker().clone());
#[cfg(test)]
tracing::trace!(poll_num, prev_is_some = prev.is_some(), "updating waker");
}
inner.integrity_check();
#[cfg(test)]
tracing::trace!(poll_num, "waiting to be woken up");
Poll::Pending
})
}
}
#[cfg(test)]
mod test {
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::Poll,
time::Duration,
};
use rand::RngCore;
#[tokio::test]
async fn in_order_completion_and_wait() {
let queue = super::Queue::new(2);
let q1 = queue.begin().unwrap();
let q2 = queue.begin().unwrap();
assert_eq!(q1.complete_and_wait(23).await, 23);
assert_eq!(q2.complete_and_wait(42).await, 42);
}
#[tokio::test]
async fn out_of_order_completion_and_wait() {
let queue = super::Queue::new(2);
let q1 = queue.begin().unwrap();
let q2 = queue.begin().unwrap();
let mut q2compfut = q2.complete_and_wait(23);
match futures::poll!(&mut q2compfut) {
Poll::Pending => {}
Poll::Ready(_) => panic!("should not be ready yet, it's queued after q1"),
}
let q1res = q1.complete_and_wait(42).await;
assert_eq!(q1res, 23);
let q2res = q2compfut.await;
assert_eq!(q2res, 42);
}
#[tokio::test]
async fn in_order_completion_out_of_order_wait() {
let queue = super::Queue::new(2);
let q1 = queue.begin().unwrap();
let q2 = queue.begin().unwrap();
let mut q1compfut = q1.complete_and_wait(23);
let mut q2compfut = q2.complete_and_wait(42);
match futures::poll!(&mut q2compfut) {
Poll::Pending => {
unreachable!("q2 should be ready, it wasn't first but q1 is serviced already")
}
Poll::Ready(x) => assert_eq!(x, 42),
}
assert_eq!(futures::poll!(&mut q1compfut), Poll::Ready(23));
}
#[tokio::test(flavor = "multi_thread")]
async fn stress() {
let ntasks = 8;
let queue_size = 8;
let queue = Arc::new(super::Queue::new(queue_size));
let stop = Arc::new(AtomicBool::new(false));
let mut tasks = vec![];
for i in 0..ntasks {
let jh = tokio::spawn({
let queue = Arc::clone(&queue);
let stop = Arc::clone(&stop);
async move {
while !stop.load(Ordering::Relaxed) {
let q = queue.begin().unwrap();
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
std::hint::spin_loop();
}
q.complete_and_wait(i).await;
tokio::task::yield_now().await;
}
}
});
tasks.push(jh);
}
tokio::time::sleep(Duration::from_secs(10)).await;
stop.store(true, Ordering::Relaxed);
for t in tasks {
t.await.unwrap();
}
}
#[test]
fn stress_two_runtimes_shared_queue() {
std::thread::scope(|s| {
let ntasks = 8;
let queue_size = 8;
let queue = Arc::new(super::Queue::new(queue_size));
let stop = Arc::new(AtomicBool::new(false));
for i in 0..ntasks {
s.spawn({
let queue = Arc::clone(&queue);
let stop = Arc::clone(&stop);
move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
while !stop.load(Ordering::Relaxed) {
let q = queue.begin().unwrap();
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
std::hint::spin_loop();
}
q.complete_and_wait(i).await;
tokio::task::yield_now().await;
}
});
}
});
}
std::thread::sleep(Duration::from_secs(10));
stop.store(true, Ordering::Relaxed);
});
}
}

View File

@@ -371,6 +371,8 @@ pub struct TenantInfo {
/// If a layer is present in both local FS and S3, it counts only once.
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
pub attachment_status: TenantAttachmentStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation: Option<u32>,
}
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
@@ -832,6 +834,7 @@ mod tests {
state: TenantState::Active,
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_active = json!({
"id": original_active.id.to_string(),
@@ -852,6 +855,7 @@ mod tests {
},
current_physical_size: Some(42),
attachment_status: TenantAttachmentStatus::Attached,
generation: None,
};
let expected_broken = json!({
"id": original_broken.id.to_string(),

View File

@@ -37,6 +37,7 @@ humantime-serde.workspace = true
hyper.workspace = true
itertools.workspace = true
nix.workspace = true
nostarve_queue.workspace = true
# hack to get the number of worker threads tokio uses
num_cpus = { version = "1.15" }
num-traits.workspace = true

View File

@@ -1,13 +1,15 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::Utf8Path;
use camino::{Utf8Path, Utf8PathBuf};
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use pageserver::{page_cache, virtual_file};
use pageserver::{
@@ -20,6 +22,7 @@ use pageserver::{
};
use std::fs;
use utils::bin_ser::BeSer;
use utils::id::{TenantId, TimelineId};
use crate::layer_map_analyzer::parse_filename;
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
/// The id from list-layer command
id: usize,
},
RewriteSummary {
layer_file_path: Utf8PathBuf,
#[clap(long)]
new_tenant_id: Option<TenantId>,
#[clap(long)]
new_timeline_id: Option<TimelineId>,
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
println!("- timeline {}", timeline.file_name().to_string_lossy());
}
}
Ok(())
}
LayerCmd::ListLayer {
path,
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::DumpLayer {
path,
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
idx += 1;
}
}
Ok(())
}
LayerCmd::RewriteSummary {
layer_file_path,
new_tenant_id,
new_timeline_id,
} => {
pageserver::virtual_file::init(10);
pageserver::page_cache::init(100);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
macro_rules! rewrite_closure {
($($summary_ty:tt)*) => {{
|summary| $($summary_ty)* {
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
..summary
}
}};
}
let res = ImageLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(image_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of image layer {layer_file_path}");
return Ok(());
}
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(image_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
let res = DeltaLayer::rewrite_summary(
layer_file_path,
rewrite_closure!(delta_layer::Summary),
&ctx,
)
.await;
match res {
Ok(()) => {
println!("Successfully rewrote summary of delta layer {layer_file_path}");
return Ok(());
}
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
Err(delta_layer::RewriteSummaryError::Other(e)) => {
return Err(e);
}
}
anyhow::bail!("not an image or delta layer: {layer_file_path}");
}
}
Ok(())
}

View File

@@ -668,6 +668,31 @@ fn start_pageserver(
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::BackgroundRuntimeTurnaroundMeasure,
None,
None,
"background runtime turnaround measure",
true,
async move {
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
let server = server
.serve(hyper::service::make_service_fn(|_| async move {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
move |_: hyper::Request<hyper::Body>| async move {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
hyper::Body::from(format!("alive")),
))
},
))
}))
.with_graceful_shutdown(task_mgr::shutdown_watcher());
server.await?;
Ok(())
},
);
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -269,7 +269,7 @@ async fn calculate_synthetic_size_worker(
}
};
for (tenant_id, tenant_state) in tenants {
for (tenant_id, tenant_state, _gen) in tenants {
if tenant_state != TenantState::Active {
continue;
}

View File

@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
}
};
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
if state != TenantState::Active {
None
} else {

View File

@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
let mut candidates = Vec::new();
for (tenant_id, _state) in &tenants {
for (tenant_id, _state, _gen) in &tenants {
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}

View File

@@ -768,11 +768,12 @@ async fn tenant_list_handler(
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
})?
.iter()
.map(|(id, state)| TenantInfo {
.map(|(id, state, gen)| TenantInfo {
id: *id,
state: state.clone(),
current_physical_size: None,
attachment_status: state.attachment_status(),
generation: (*gen).into(),
})
.collect::<Vec<TenantInfo>>();
@@ -801,6 +802,7 @@ async fn tenant_status(
state: state.clone(),
current_physical_size: Some(current_physical_size),
attachment_status: state.attachment_status(),
generation: tenant.generation().into(),
})
}
.instrument(info_span!("tenant_status_handler", %tenant_id))

View File

@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
AcquirePinnedSlotTimeout,
EvictIterLimit,
}
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {

View File

@@ -83,6 +83,7 @@ use std::{
use anyhow::Context;
use once_cell::sync::OnceCell;
use tracing::instrument;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
@@ -252,6 +253,9 @@ pub struct PageCache {
next_evict_slot: AtomicUsize,
size_metrics: &'static PageCacheSizeMetrics,
find_victim_waiters:
nostarve_queue::Queue<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
}
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
@@ -430,8 +434,9 @@ impl PageCache {
///
/// Store an image of the given page in the cache.
///
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(%key, %lsn)))]
pub async fn memorize_materialized_page(
&self,
&'static self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
@@ -522,8 +527,9 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(?file_id, ?blkno)))]
pub async fn read_immutable_buf(
&self,
&'static self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
@@ -629,7 +635,7 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&self,
&'static self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
@@ -851,10 +857,15 @@ impl PageCache {
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
let nostarve_position = self.find_victim_waiters.begin()
.expect("we initialize the nostarve queue to the same size as the slots semaphore, and the caller is presenting a permit");
let span = tracing::info_span!("find_victim", ?nostarve_position);
let _enter = span.enter();
let mut iters = 0;
loop {
iters += 1;
@@ -866,41 +877,8 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > iter_limit {
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
// any particular number of iterations: other threads might race ahead and acquire and
// release pins just as we're scanning the array.
//
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
// slots. There are two threads running concurrently, A and B. A has just
// acquired the permit from the semaphore.
//
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
// B: Acquire permit.
// B: Look at slot 2, decrement its usage_count to zero and continue the search
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
// B: Acquire permit.
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
//
// Now we're back in the starting situation that both slots have
// usage_count 1, but A has now been through one iteration of the
// find_victim() loop. This can repeat indefinitely and on each
// iteration, A's iteration count increases by one.
//
// So, even though the semaphore for the permits is fair, the victim search
// itself happens in parallel and is not fair.
// Hence even with a permit, a task can theoretically be starved.
// To avoid this, we'd need tokio to give priority to tasks that are holding
// permits for longer.
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
}
continue;
}
@@ -911,7 +889,8 @@ impl PageCache {
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
return Ok(nostarve_position.complete_and_wait((slot_idx, inner)).await);
}
}
}
@@ -955,6 +934,7 @@ impl PageCache {
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
find_victim_waiters: ::nostarve_queue::Queue::new(num_pages),
}
}
}

View File

@@ -293,6 +293,8 @@ pub enum TaskKind {
DebugTool,
BackgroundRuntimeTurnaroundMeasure,
#[cfg(test)]
UnitTest,
}

View File

@@ -1714,6 +1714,10 @@ impl Tenant {
self.current_state() == TenantState::Active
}
pub fn generation(&self) -> Generation {
self.generation
}
/// Changes tenant status to active, unless shutdown was already requested.
///
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup

View File

@@ -1397,7 +1397,8 @@ pub(crate) enum TenantMapListError {
///
/// Get list of tenants, for the mgmt API
///
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
pub(crate) async fn list_tenants(
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
let tenants = TENANTS.read().unwrap();
let m = match &*tenants {
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
@@ -1405,12 +1406,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
};
Ok(m.iter()
.filter_map(|(id, tenant)| match tenant {
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
TenantSlot::Attached(tenant) => Some((id, tenant.current_state(), tenant.generation())),
TenantSlot::Secondary => None,
TenantSlot::InProgress(_) => None,
})
// TODO(sharding): make callers of this function shard-aware
.map(|(k, v)| (k.tenant_id, v))
.map(|(a, b, c)| (a.tenant_id, b, c))
.collect())
}

View File

@@ -2,7 +2,7 @@
pub mod delta_layer;
mod filename;
mod image_layer;
pub mod image_layer;
mod inmemory_layer;
mod layer;
mod layer_desc;

View File

@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Summary {
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn_range: Range<Lsn>,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn_range: Range<Lsn>,
/// Block number where the 'index' part of the file begins.
pub index_start_blk: u32,
@@ -611,6 +611,61 @@ impl Drop for DeltaLayerWriter {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl DeltaLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != DELTA_FILE_MAGIC {
return Err(RewriteSummaryError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in DeltaLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl DeltaLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
@@ -640,11 +695,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
/// the 'index' starts at the block indicated by 'index_start_blk'
///
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub(super) struct Summary {
pub struct Summary {
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
magic: u16,
format_version: u16,
pub magic: u16,
pub format_version: u16,
tenant_id: TenantId,
timeline_id: TimelineId,
key_range: Range<Key>,
lsn: Lsn,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub key_range: Range<Key>,
pub lsn: Lsn,
/// Block number where the 'index' part of the file begins.
index_start_blk: u32,
pub index_start_blk: u32,
/// Block within the 'index', where the B-tree root page is stored
index_root_blk: u32,
pub index_root_blk: u32,
// the 'values' part starts after the summary header, on block 1.
}
@@ -296,6 +296,61 @@ impl ImageLayer {
}
}
#[derive(thiserror::Error, Debug)]
pub enum RewriteSummaryError {
#[error("magic mismatch")]
MagicMismatch,
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<std::io::Error> for RewriteSummaryError {
fn from(e: std::io::Error) -> Self {
Self::Other(anyhow::anyhow!(e))
}
}
impl ImageLayer {
pub async fn rewrite_summary<F>(
path: &Utf8Path,
rewrite: F,
ctx: &RequestContext,
) -> Result<(), RewriteSummaryError>
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
let mut file = file.file;
if actual_summary.magic != IMAGE_FILE_MAGIC {
return Err(RewriteSummaryError::MagicMismatch);
}
let new_summary = rewrite(actual_summary);
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
if buf.spilled() {
// The code in ImageLayerWriterInner just warn!()s for this.
// It should probably error out as well.
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
"Used more than one page size for summary buffer: {}",
buf.len()
)));
}
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&buf).await?;
Ok(())
}
}
impl ImageLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
@@ -329,11 +384,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}

View File

@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
Eviction,
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
}
impl BackgroundLoopKind {

View File

@@ -1822,6 +1822,29 @@ impl Timeline {
// delay will be terminated by a timeout regardless.
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
// In prod, initial logical size calucalation is spawned either by
// WalReceiverConnectionHandler if the timeline is active according to storage broker,
// or by the first consumption metrics worker (MetricsCollection).
// The latter runs every `metric_collection_interval` and checkpoints to disk, i.e.,
// if pageserver gets restarted, the consumption metrics worker will resume waiting
// for the correct remaining time, as if the pageserver had not been restarted.
//
// FIXME: with the current code, walreceiver requests would also hit this semaphore
// and get queued behind other background operations. That's bad because walreceiver_connection
// will push the not-precise value as `current_timeline_size` in the `PageserverFeedback`
// while this calculation is stuck.
// We need some way to priority-boost the initial size calculation if walreceiver is asking.
// Or, should we maybe revisit the use of logical size in `PageserverFeedback`?
// It seems broken the way it is.
//
// Example query to show different causes of initial size calculation spawning:
//
// https://neonprod.grafana.net/explore?panes=%7B%22wSx%22:%7B%22datasource%22:%22grafanacloud-logs%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22sum%20by%20%28task_kind%29%20%28count_over_time%28%7Bneon_service%3D%5C%22pageserver%5C%22,%20neon_region%3D%5C%22us-west-2%5C%22%7D%20%7C%3D%20%60logical%20size%20computation%20from%20context%20of%20task%20kind%60%20%7C%20regexp%20%60logical%20size%20computation%20from%20context%20of%20task%20kind%20%28%3FP%3Ctask_kind%3E.%2A%29%60%20%5B1m%5D%29%29%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22grafanacloud-logs%22%7D,%22editorMode%22:%22code%22,%22step%22:%221m%22%7D%5D,%22range%22:%7B%22from%22:%221700637500615%22,%22to%22:%221700639648743%22%7D%7D%7D&schemaVersion=1&orgId=1
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(BackgroundLoopKind::InitialLogicalSizeCalculation,&background_ctx, &cancel).await {
Ok(permit) => permit,
Err(RateLimitError::Cancelled) => return Ok(()),
};
let calculated_size = match self_clone
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
.await

View File

@@ -1572,7 +1572,7 @@ class NeonAttachmentService:
self.running = False
return self
def attach_hook(self, tenant_id: TenantId, pageserver_id: int) -> int:
def attach_hook_issue(self, tenant_id: TenantId, pageserver_id: int) -> int:
response = requests.post(
f"{self.env.control_plane_api}/attach-hook",
json={"tenant_id": str(tenant_id), "node_id": pageserver_id},
@@ -1582,6 +1582,13 @@ class NeonAttachmentService:
assert isinstance(gen, int)
return gen
def attach_hook_drop(self, tenant_id: TenantId):
response = requests.post(
f"{self.env.control_plane_api}/attach-hook",
json={"tenant_id": str(tenant_id), "node_id": None},
)
response.raise_for_status()
def __enter__(self) -> "NeonAttachmentService":
return self
@@ -1781,13 +1788,20 @@ class NeonPageserver(PgProtocol):
to call into the pageserver HTTP client.
"""
if self.env.attachment_service is not None:
generation = self.env.attachment_service.attach_hook(tenant_id, self.id)
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
else:
generation = None
client = self.http_client()
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
def tenant_detach(self, tenant_id: TenantId):
if self.env.attachment_service is not None:
self.env.attachment_service.attach_hook_drop(tenant_id)
client = self.http_client()
return client.tenant_detach(tenant_id)
def append_pageserver_param_overrides(
params_to_update: List[str],

View File

@@ -0,0 +1,156 @@
import queue
import shutil
import subprocess
import threading
from pathlib import Path
from typing import List, Optional
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PgBin,
last_flush_lsn_upload,
)
from fixtures.pageserver.utils import wait_until_tenant_active
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.types import TenantId
def duplicate_tenant(
env: NeonEnv, remote_storage: LocalFsStorage, template_tenant: TenantId, new_tenant: TenantId
):
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
assert isinstance(remote_storage, LocalFsStorage)
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
for tl in src_timelines_dir.iterdir():
src_tl_dir = src_timelines_dir / tl.name
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
dst_tl_dir = dst_timelines_dir / tl.name
dst_tl_dir.mkdir(parents=False, exist_ok=False)
for file in tl.iterdir():
shutil.copy2(file, dst_tl_dir)
if "__" in file.name:
cmd: List[str] = [
str(
env.neon_binpath / "pagectl"
), # TODO: abstract this like the other binaries
"layer",
"rewrite-summary",
str(dst_tl_dir / file.name),
"--new-tenant-id",
str(new_tenant),
]
subprocess.run(cmd, check=True)
else:
# index_part etc need no patching
pass
return None
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
Usage
TEST_OUTPUT=/mnt/many_tenants NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
Then
export NEON_REPO_DIR=/mnt/many_tenants/test_pageserver_startup_many_tenants/repo
# edit $NEON_REPO_DIR/pageserver_1/pageserver.toml to use metric collection,
# with intervals from prod:
#
# metric_collection_endpoint = "https://127.0.0.1:6666"
# metric_collection_interval: 10min
# cached_metric_collection_interval: 0s
# run a fake metric collection endpoint in some other terminal using
# python3 -m http.server 6666
# then start pageserver
./target/release/neon_local start
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
neon_env_builder.enable_generations = True
env = neon_env_builder.init_start()
remote_storage = env.pageserver_remote_storage
assert isinstance(remote_storage, LocalFsStorage)
# cleanup initial tenant
env.pageserver.tenant_detach(env.initial_tenant)
# create our template tenant
tenant_config_mgmt_api = {
"gc_period": "0s",
"checkpoint_timeout": "3650 day",
"compaction_period": "20 s",
"compaction_threshold": 10,
"compaction_target_size": 134217728,
"checkpoint_distance": 268435456,
"image_creation_threshold": 3,
}
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
ps_http = env.pageserver.http_client()
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
ep.safe_psql("create table foo(b text)")
for _i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
ep.stop_and_destroy()
env.pageserver.tenant_detach(template_tenant)
# duplicate the tenant in remote storage
def worker(queue: queue.Queue[Optional[TenantId]]):
while True:
tenant_id = queue.get()
if tenant_id is None:
return
assert isinstance(remote_storage, LocalFsStorage)
duplicate_tenant(env, remote_storage, template_tenant, tenant_id)
new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, 20_000)]
duplications: queue.Queue[Optional[TenantId]] = queue.Queue()
for t in new_tenants:
duplications.put(t)
workers = []
for _ in range(0, 8):
w = threading.Thread(target=worker, args=[duplications])
workers.append(w)
w.start()
duplications.put(None)
for w in workers:
w.join()
# for evaluation, use the same background loop periods as in prod
benchmark_tenant_config = {k: v for k, v in tenant_config_mgmt_api.items()}
del benchmark_tenant_config["compaction_period"]
del benchmark_tenant_config["gc_period"]
benchmark_tenant_config["eviction_policy"] = {
"kind": "LayerAccessThreshold",
"period": "10m",
# don't do evictions
"threshold": "1000d",
}
assert ps_http.tenant_list() == []
for tenant in new_tenants:
env.pageserver.tenant_attach(tenant, config=benchmark_tenant_config)
for tenant in new_tenants:
wait_until_tenant_active(ps_http, tenant)
# ensure all layers are resident for predictiable performance
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
for tenant in new_tenants:
ps_http.download_all_layers(tenant, template_timeline)

View File

@@ -282,7 +282,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
# Now advance the generation in the control plane: subsequent validations
# from the running pageserver will fail. No more deletions should happen.
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
generate_uploads_and_deletions(env, init=False)
assert_deletion_queue(ps_http, lambda n: n > 0)
@@ -397,7 +397,7 @@ def test_deletion_queue_recovery(
if keep_attachment == KeepAttachment.LOSE:
some_other_pageserver = 101010
assert env.attachment_service is not None
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
env.pageserver.start()