mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 22:20:37 +00:00
Compare commits
15 Commits
parameteri
...
problame/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f911050e31 | ||
|
|
2eb9f64978 | ||
|
|
e0ac820e87 | ||
|
|
49b63570c8 | ||
|
|
fef7018ec6 | ||
|
|
d16ff0d12c | ||
|
|
b4b3f3e3b6 | ||
|
|
537ef94146 | ||
|
|
efe0d93bf5 | ||
|
|
306880081d | ||
|
|
9915597d3a | ||
|
|
e01c0c989e | ||
|
|
3a95fbcae9 | ||
|
|
c9dc9e7d70 | ||
|
|
fc7403944e |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -2610,6 +2610,17 @@ dependencies = [
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nostarve_queue"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"rand 0.8.5",
|
||||
"scopeguard",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "notify"
|
||||
version = "5.2.0"
|
||||
@@ -2951,6 +2962,7 @@ dependencies = [
|
||||
"itertools",
|
||||
"metrics",
|
||||
"nix 0.26.2",
|
||||
"nostarve_queue",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"once_cell",
|
||||
|
||||
@@ -27,6 +27,7 @@ members = [
|
||||
"libs/postgres_ffi/wal_craft",
|
||||
"libs/vm_monitor",
|
||||
"libs/walproposer",
|
||||
"libs/nostarve_queue",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -37,6 +38,7 @@ license = "Apache-2.0"
|
||||
[workspace.dependencies]
|
||||
anyhow = { version = "1.0", features = ["backtrace"] }
|
||||
arc-swap = "1.6"
|
||||
async-channel = "1.9.0"
|
||||
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
|
||||
azure_core = "0.16"
|
||||
azure_identity = "0.16"
|
||||
@@ -191,6 +193,7 @@ tracing-utils = { version = "0.1", path = "./libs/tracing-utils/" }
|
||||
utils = { version = "0.1", path = "./libs/utils/" }
|
||||
vm_monitor = { version = "0.1", path = "./libs/vm_monitor/" }
|
||||
walproposer = { version = "0.1", path = "./libs/walproposer/" }
|
||||
nostarve_queue = { path = "./libs/nostarve_queue" }
|
||||
|
||||
## Common library dependency
|
||||
workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
|
||||
39
benchmarked_ps_config.toml
Normal file
39
benchmarked_ps_config.toml
Normal file
@@ -0,0 +1,39 @@
|
||||
remote_storage ={local_path='/mnt/many_tenants/test_pageserver_startup_many_tenants/repo/local_fs_remote_storage/pageserver'}
|
||||
id =1
|
||||
pg_distrib_dir ='/home/admin/neon/pg_install'
|
||||
http_auth_type ='Trust'
|
||||
pg_auth_type ='Trust'
|
||||
listen_http_addr ='localhost:15004'
|
||||
listen_pg_addr ='localhost:15003'
|
||||
broker_endpoint ='http://127.0.0.1:15001/'
|
||||
control_plane_api ='http://127.0.0.1:15002/'
|
||||
|
||||
# Initial configuration file created by 'pageserver --init'
|
||||
#listen_pg_addr = '127.0.0.1:64000'
|
||||
#listen_http_addr = '127.0.0.1:9898'
|
||||
|
||||
#wait_lsn_timeout = '60 s'
|
||||
#wal_redo_timeout = '60 s'
|
||||
|
||||
#max_file_descriptors = 100
|
||||
|
||||
# initial superuser role name to use when creating a new tenant
|
||||
#initial_superuser_name = 'cloud_admin'
|
||||
|
||||
#broker_endpoint = 'http://127.0.0.1:50051'
|
||||
|
||||
#log_format = 'plain'
|
||||
|
||||
#concurrent_tenant_size_logical_size_queries = '1'
|
||||
|
||||
metric_collection_endpoint = "https://127.0.0.1:6666"
|
||||
#metric_collection_interval = '10 min'
|
||||
#cached_metric_collection_interval = '0s'
|
||||
#synthetic_size_calculation_interval = '10 min'
|
||||
|
||||
#disk_usage_based_eviction = { max_usage_pct = .., min_avail_bytes = .., period = "10s"}
|
||||
|
||||
#background_task_maximum_delay = '10s'
|
||||
|
||||
[tenant_config]
|
||||
|
||||
@@ -21,7 +21,7 @@ use pageserver_api::models::{
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use postgres_backend::AuthType;
|
||||
use postgres_connection::{parse_host_port, PgConnectionConfig};
|
||||
use reqwest::blocking::{Client, RequestBuilder, Response};
|
||||
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
|
||||
use reqwest::{IntoUrl, Method};
|
||||
use thiserror::Error;
|
||||
use utils::auth::{Claims, Scope};
|
||||
@@ -99,7 +99,7 @@ impl PageServerNode {
|
||||
pg_connection_config: PgConnectionConfig::new_host_port(host, port),
|
||||
conf: conf.clone(),
|
||||
env: env.clone(),
|
||||
http_client: Client::new(),
|
||||
http_client: ClientBuilder::new().timeout(None).build().unwrap(),
|
||||
http_base_url: format!("http://{}/v1", conf.listen_http_addr),
|
||||
}
|
||||
}
|
||||
|
||||
14
libs/nostarve_queue/Cargo.toml
Normal file
14
libs/nostarve_queue/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "nostarve_queue"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
scopeguard.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
futures.workspace = true
|
||||
rand.workspace = true
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time"] }
|
||||
316
libs/nostarve_queue/src/lib.rs
Normal file
316
libs/nostarve_queue/src/lib.rs
Normal file
@@ -0,0 +1,316 @@
|
||||
//! Synchronization primitive to prevent starvation among concurrent tasks that do the same work.
|
||||
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
fmt,
|
||||
future::poll_fn,
|
||||
sync::Mutex,
|
||||
task::{Poll, Waker},
|
||||
};
|
||||
|
||||
pub struct Queue<T> {
|
||||
inner: Mutex<Inner<T>>,
|
||||
}
|
||||
|
||||
struct Inner<T> {
|
||||
waiters: VecDeque<usize>,
|
||||
free: VecDeque<usize>,
|
||||
slots: Vec<Option<(Option<Waker>, Option<T>)>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Position<'q, T> {
|
||||
idx: usize,
|
||||
queue: &'q Queue<T>,
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for Position<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Position").field("idx", &self.idx).finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Inner<T> {
|
||||
#[cfg(not(test))]
|
||||
#[inline]
|
||||
fn integrity_check(&self) {}
|
||||
|
||||
#[cfg(test)]
|
||||
fn integrity_check(&self) {
|
||||
use std::collections::HashSet;
|
||||
let waiters = self.waiters.iter().copied().collect::<HashSet<_>>();
|
||||
let free = self.free.iter().copied().collect::<HashSet<_>>();
|
||||
for (slot_idx, slot) in self.slots.iter().enumerate() {
|
||||
match slot {
|
||||
None => {
|
||||
assert!(!waiters.contains(&slot_idx));
|
||||
assert!(free.contains(&slot_idx));
|
||||
}
|
||||
Some((None, None)) => {
|
||||
assert!(waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
Some((Some(_), Some(_))) => {
|
||||
assert!(!waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
Some((Some(_), None)) => {
|
||||
assert!(waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
Some((None, Some(_))) => {
|
||||
assert!(!waiters.contains(&slot_idx));
|
||||
assert!(!free.contains(&slot_idx));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Queue<T> {
|
||||
pub fn new(size: usize) -> Self {
|
||||
Queue {
|
||||
inner: Mutex::new(Inner {
|
||||
waiters: VecDeque::new(),
|
||||
free: (0..size).collect(),
|
||||
slots: {
|
||||
let mut v = Vec::with_capacity(size);
|
||||
v.resize_with(size, || None);
|
||||
v
|
||||
},
|
||||
}),
|
||||
}
|
||||
}
|
||||
pub fn begin(&self) -> Result<Position<T>, ()> {
|
||||
#[cfg(test)]
|
||||
tracing::trace!("get in line locking inner");
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.integrity_check();
|
||||
let my_waitslot_idx = inner
|
||||
.free
|
||||
.pop_front()
|
||||
.expect("can't happen, len(slots) = len(waiters");
|
||||
inner.waiters.push_back(my_waitslot_idx);
|
||||
let prev = inner.slots[my_waitslot_idx].replace((None, None));
|
||||
assert!(prev.is_none());
|
||||
inner.integrity_check();
|
||||
Ok(Position {
|
||||
idx: my_waitslot_idx,
|
||||
queue: &self,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<'q, T> Position<'q, T> {
|
||||
pub fn complete_and_wait(self, datum: T) -> impl std::future::Future<Output = T> + 'q {
|
||||
#[cfg(test)]
|
||||
tracing::trace!("found victim locking waiters");
|
||||
let mut inner = self.queue.inner.lock().unwrap();
|
||||
inner.integrity_check();
|
||||
let winner_idx = inner.waiters.pop_front().expect("we put ourselves in");
|
||||
#[cfg(test)]
|
||||
tracing::trace!(winner_idx, "putting victim into next waiters slot");
|
||||
let winner_slot = inner.slots[winner_idx].as_mut().unwrap();
|
||||
let prev = winner_slot.1.replace(datum);
|
||||
assert!(
|
||||
prev.is_none(),
|
||||
"ensure we didn't mess up this simple ring buffer structure"
|
||||
);
|
||||
if let Some(waker) = winner_slot.0.take() {
|
||||
#[cfg(test)]
|
||||
tracing::trace!(winner_idx, "waking up winner");
|
||||
waker.wake()
|
||||
}
|
||||
inner.integrity_check();
|
||||
drop(inner); // the poll_fn locks it again
|
||||
|
||||
let mut poll_num = 0;
|
||||
let mut drop_guard = Some(scopeguard::guard((), |()| {
|
||||
panic!("must not drop this future until Ready");
|
||||
}));
|
||||
|
||||
// take the victim that was found by someone else
|
||||
poll_fn(move |cx| {
|
||||
let my_waitslot_idx = self.idx;
|
||||
poll_num += 1;
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, "poll_fn locking waiters");
|
||||
let mut inner = self.queue.inner.lock().unwrap();
|
||||
inner.integrity_check();
|
||||
let my_waitslot = inner.slots[self.idx].as_mut().unwrap();
|
||||
// assert!(
|
||||
// poll_num <= 2,
|
||||
// "once we place the waker in the slot, next wakeup should have a result: {}",
|
||||
// my_waitslot.1.is_some()
|
||||
// );
|
||||
if let Some(res) = my_waitslot.1.take() {
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, "have cache slot");
|
||||
// above .take() resets the waiters slot to None
|
||||
debug_assert!(my_waitslot.0.is_none());
|
||||
debug_assert!(my_waitslot.1.is_none());
|
||||
inner.slots[my_waitslot_idx] = None;
|
||||
inner.free.push_back(my_waitslot_idx);
|
||||
let _ = scopeguard::ScopeGuard::into_inner(drop_guard.take().unwrap());
|
||||
inner.integrity_check();
|
||||
return Poll::Ready(res);
|
||||
}
|
||||
// assert_eq!(poll_num, 1);
|
||||
if !my_waitslot
|
||||
.0
|
||||
.as_ref()
|
||||
.map(|existing| cx.waker().will_wake(existing))
|
||||
.unwrap_or(false)
|
||||
{
|
||||
let prev = my_waitslot.0.replace(cx.waker().clone());
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, prev_is_some = prev.is_some(), "updating waker");
|
||||
}
|
||||
inner.integrity_check();
|
||||
#[cfg(test)]
|
||||
tracing::trace!(poll_num, "waiting to be woken up");
|
||||
Poll::Pending
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::Poll,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use rand::RngCore;
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_order_completion_and_wait() {
|
||||
let queue = super::Queue::new(2);
|
||||
|
||||
let q1 = queue.begin().unwrap();
|
||||
let q2 = queue.begin().unwrap();
|
||||
|
||||
assert_eq!(q1.complete_and_wait(23).await, 23);
|
||||
assert_eq!(q2.complete_and_wait(42).await, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn out_of_order_completion_and_wait() {
|
||||
let queue = super::Queue::new(2);
|
||||
|
||||
let q1 = queue.begin().unwrap();
|
||||
let q2 = queue.begin().unwrap();
|
||||
|
||||
let mut q2compfut = q2.complete_and_wait(23);
|
||||
|
||||
match futures::poll!(&mut q2compfut) {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(_) => panic!("should not be ready yet, it's queued after q1"),
|
||||
}
|
||||
|
||||
let q1res = q1.complete_and_wait(42).await;
|
||||
assert_eq!(q1res, 23);
|
||||
|
||||
let q2res = q2compfut.await;
|
||||
assert_eq!(q2res, 42);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_order_completion_out_of_order_wait() {
|
||||
let queue = super::Queue::new(2);
|
||||
|
||||
let q1 = queue.begin().unwrap();
|
||||
let q2 = queue.begin().unwrap();
|
||||
|
||||
let mut q1compfut = q1.complete_and_wait(23);
|
||||
|
||||
let mut q2compfut = q2.complete_and_wait(42);
|
||||
|
||||
match futures::poll!(&mut q2compfut) {
|
||||
Poll::Pending => {
|
||||
unreachable!("q2 should be ready, it wasn't first but q1 is serviced already")
|
||||
}
|
||||
Poll::Ready(x) => assert_eq!(x, 42),
|
||||
}
|
||||
|
||||
assert_eq!(futures::poll!(&mut q1compfut), Poll::Ready(23));
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn stress() {
|
||||
let ntasks = 8;
|
||||
let queue_size = 8;
|
||||
let queue = Arc::new(super::Queue::new(queue_size));
|
||||
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let mut tasks = vec![];
|
||||
for i in 0..ntasks {
|
||||
let jh = tokio::spawn({
|
||||
let queue = Arc::clone(&queue);
|
||||
let stop = Arc::clone(&stop);
|
||||
async move {
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
let q = queue.begin().unwrap();
|
||||
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
q.complete_and_wait(i).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
}
|
||||
});
|
||||
tasks.push(jh);
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
||||
|
||||
stop.store(true, Ordering::Relaxed);
|
||||
|
||||
for t in tasks {
|
||||
t.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stress_two_runtimes_shared_queue() {
|
||||
std::thread::scope(|s| {
|
||||
let ntasks = 8;
|
||||
let queue_size = 8;
|
||||
let queue = Arc::new(super::Queue::new(queue_size));
|
||||
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
|
||||
for i in 0..ntasks {
|
||||
s.spawn({
|
||||
let queue = Arc::clone(&queue);
|
||||
let stop = Arc::clone(&stop);
|
||||
move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(async move {
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
let q = queue.begin().unwrap();
|
||||
for _ in 0..(rand::thread_rng().next_u32() % 10_000) {
|
||||
std::hint::spin_loop();
|
||||
}
|
||||
q.complete_and_wait(i).await;
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
std::thread::sleep(Duration::from_secs(10));
|
||||
|
||||
stop.store(true, Ordering::Relaxed);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -371,6 +371,8 @@ pub struct TenantInfo {
|
||||
/// If a layer is present in both local FS and S3, it counts only once.
|
||||
pub current_physical_size: Option<u64>, // physical size is only included in `tenant_status` endpoint
|
||||
pub attachment_status: TenantAttachmentStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub generation: Option<u32>,
|
||||
}
|
||||
|
||||
/// This represents the output of the "timeline_detail" and "timeline_list" API calls.
|
||||
@@ -832,6 +834,7 @@ mod tests {
|
||||
state: TenantState::Active,
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_active = json!({
|
||||
"id": original_active.id.to_string(),
|
||||
@@ -852,6 +855,7 @@ mod tests {
|
||||
},
|
||||
current_physical_size: Some(42),
|
||||
attachment_status: TenantAttachmentStatus::Attached,
|
||||
generation: None,
|
||||
};
|
||||
let expected_broken = json!({
|
||||
"id": original_broken.id.to_string(),
|
||||
|
||||
@@ -37,6 +37,7 @@ humantime-serde.workspace = true
|
||||
hyper.workspace = true
|
||||
itertools.workspace = true
|
||||
nix.workspace = true
|
||||
nostarve_queue.workspace = true
|
||||
# hack to get the number of worker threads tokio uses
|
||||
num_cpus = { version = "1.15" }
|
||||
num-traits.workspace = true
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use anyhow::Result;
|
||||
use camino::Utf8Path;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use clap::Subcommand;
|
||||
use pageserver::context::{DownloadBehavior, RequestContext};
|
||||
use pageserver::task_mgr::TaskKind;
|
||||
use pageserver::tenant::block_io::BlockCursor;
|
||||
use pageserver::tenant::disk_btree::DiskBtreeReader;
|
||||
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
|
||||
use pageserver::tenant::storage_layer::{delta_layer, image_layer};
|
||||
use pageserver::tenant::storage_layer::{DeltaLayer, ImageLayer};
|
||||
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
|
||||
use pageserver::{page_cache, virtual_file};
|
||||
use pageserver::{
|
||||
@@ -20,6 +22,7 @@ use pageserver::{
|
||||
};
|
||||
use std::fs;
|
||||
use utils::bin_ser::BeSer;
|
||||
use utils::id::{TenantId, TimelineId};
|
||||
|
||||
use crate::layer_map_analyzer::parse_filename;
|
||||
|
||||
@@ -45,6 +48,13 @@ pub(crate) enum LayerCmd {
|
||||
/// The id from list-layer command
|
||||
id: usize,
|
||||
},
|
||||
RewriteSummary {
|
||||
layer_file_path: Utf8PathBuf,
|
||||
#[clap(long)]
|
||||
new_tenant_id: Option<TenantId>,
|
||||
#[clap(long)]
|
||||
new_timeline_id: Option<TimelineId>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
|
||||
@@ -100,6 +110,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
println!("- timeline {}", timeline.file_name().to_string_lossy());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::ListLayer {
|
||||
path,
|
||||
@@ -128,6 +139,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::DumpLayer {
|
||||
path,
|
||||
@@ -168,7 +180,63 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
LayerCmd::RewriteSummary {
|
||||
layer_file_path,
|
||||
new_tenant_id,
|
||||
new_timeline_id,
|
||||
} => {
|
||||
pageserver::virtual_file::init(10);
|
||||
pageserver::page_cache::init(100);
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
|
||||
|
||||
macro_rules! rewrite_closure {
|
||||
($($summary_ty:tt)*) => {{
|
||||
|summary| $($summary_ty)* {
|
||||
tenant_id: new_tenant_id.unwrap_or(summary.tenant_id),
|
||||
timeline_id: new_timeline_id.unwrap_or(summary.timeline_id),
|
||||
..summary
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
let res = ImageLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(image_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of image layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(image_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
|
||||
Err(image_layer::RewriteSummaryError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let res = DeltaLayer::rewrite_summary(
|
||||
layer_file_path,
|
||||
rewrite_closure!(delta_layer::Summary),
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Ok(()) => {
|
||||
println!("Successfully rewrote summary of delta layer {layer_file_path}");
|
||||
return Ok(());
|
||||
}
|
||||
Err(delta_layer::RewriteSummaryError::MagicMismatch) => (), // fallthrough
|
||||
Err(delta_layer::RewriteSummaryError::Other(e)) => {
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
anyhow::bail!("not an image or delta layer: {layer_file_path}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -668,6 +668,31 @@ fn start_pageserver(
|
||||
);
|
||||
}
|
||||
|
||||
task_mgr::spawn(
|
||||
BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::BackgroundRuntimeTurnaroundMeasure,
|
||||
None,
|
||||
None,
|
||||
"background runtime turnaround measure",
|
||||
true,
|
||||
async move {
|
||||
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
|
||||
let server = server
|
||||
.serve(hyper::service::make_service_fn(|_| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
|
||||
move |_: hyper::Request<hyper::Body>| async move {
|
||||
Ok::<_, std::convert::Infallible>(hyper::Response::new(
|
||||
hyper::Body::from(format!("alive")),
|
||||
))
|
||||
},
|
||||
))
|
||||
}))
|
||||
.with_graceful_shutdown(task_mgr::shutdown_watcher());
|
||||
server.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
|
||||
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
|
||||
|
||||
// All started up! Now just sit and wait for shutdown signal.
|
||||
|
||||
@@ -269,7 +269,7 @@ async fn calculate_synthetic_size_worker(
|
||||
}
|
||||
};
|
||||
|
||||
for (tenant_id, tenant_state) in tenants {
|
||||
for (tenant_id, tenant_state, _gen) in tenants {
|
||||
if tenant_state != TenantState::Active {
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -197,7 +197,7 @@ pub(super) async fn collect_all_metrics(
|
||||
}
|
||||
};
|
||||
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state)| async move {
|
||||
let tenants = futures::stream::iter(tenants).filter_map(|(id, state, _)| async move {
|
||||
if state != TenantState::Active {
|
||||
None
|
||||
} else {
|
||||
|
||||
@@ -541,7 +541,7 @@ async fn collect_eviction_candidates(
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
|
||||
for (tenant_id, _state) in &tenants {
|
||||
for (tenant_id, _state, _gen) in &tenants {
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
|
||||
@@ -768,11 +768,12 @@ async fn tenant_list_handler(
|
||||
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".into())
|
||||
})?
|
||||
.iter()
|
||||
.map(|(id, state)| TenantInfo {
|
||||
.map(|(id, state, gen)| TenantInfo {
|
||||
id: *id,
|
||||
state: state.clone(),
|
||||
current_physical_size: None,
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: (*gen).into(),
|
||||
})
|
||||
.collect::<Vec<TenantInfo>>();
|
||||
|
||||
@@ -801,6 +802,7 @@ async fn tenant_status(
|
||||
state: state.clone(),
|
||||
current_physical_size: Some(current_physical_size),
|
||||
attachment_status: state.attachment_status(),
|
||||
generation: tenant.generation().into(),
|
||||
})
|
||||
}
|
||||
.instrument(info_span!("tenant_status_handler", %tenant_id))
|
||||
|
||||
@@ -314,7 +314,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum PageCacheErrorKind {
|
||||
AcquirePinnedSlotTimeout,
|
||||
EvictIterLimit,
|
||||
}
|
||||
|
||||
pub(crate) fn page_cache_errors_inc(error_kind: PageCacheErrorKind) {
|
||||
|
||||
@@ -83,6 +83,7 @@ use std::{
|
||||
|
||||
use anyhow::Context;
|
||||
use once_cell::sync::OnceCell;
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
lsn::Lsn,
|
||||
@@ -252,6 +253,9 @@ pub struct PageCache {
|
||||
next_evict_slot: AtomicUsize,
|
||||
|
||||
size_metrics: &'static PageCacheSizeMetrics,
|
||||
|
||||
find_victim_waiters:
|
||||
nostarve_queue::Queue<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
|
||||
}
|
||||
|
||||
struct PinnedSlotsPermit(tokio::sync::OwnedSemaphorePermit);
|
||||
@@ -430,8 +434,9 @@ impl PageCache {
|
||||
///
|
||||
/// Store an image of the given page in the cache.
|
||||
///
|
||||
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(%key, %lsn)))]
|
||||
pub async fn memorize_materialized_page(
|
||||
&self,
|
||||
&'static self,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key: Key,
|
||||
@@ -522,8 +527,9 @@ impl PageCache {
|
||||
|
||||
// Section 1.2: Public interface functions for working with immutable file pages.
|
||||
|
||||
#[cfg_attr(test, instrument(skip_all, level = "trace", fields(?file_id, ?blkno)))]
|
||||
pub async fn read_immutable_buf(
|
||||
&self,
|
||||
&'static self,
|
||||
file_id: FileId,
|
||||
blkno: u32,
|
||||
ctx: &RequestContext,
|
||||
@@ -629,7 +635,7 @@ impl PageCache {
|
||||
/// ```
|
||||
///
|
||||
async fn lock_for_read(
|
||||
&self,
|
||||
&'static self,
|
||||
cache_key: &mut CacheKey,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ReadBufResult> {
|
||||
@@ -851,10 +857,15 @@ impl PageCache {
|
||||
///
|
||||
/// On return, the slot is empty and write-locked.
|
||||
async fn find_victim(
|
||||
&self,
|
||||
&'static self,
|
||||
_permit_witness: &PinnedSlotsPermit,
|
||||
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
|
||||
let iter_limit = self.slots.len() * 10;
|
||||
let nostarve_position = self.find_victim_waiters.begin()
|
||||
.expect("we initialize the nostarve queue to the same size as the slots semaphore, and the caller is presenting a permit");
|
||||
|
||||
let span = tracing::info_span!("find_victim", ?nostarve_position);
|
||||
let _enter = span.enter();
|
||||
|
||||
let mut iters = 0;
|
||||
loop {
|
||||
iters += 1;
|
||||
@@ -866,41 +877,8 @@ impl PageCache {
|
||||
let mut inner = match slot.inner.try_write() {
|
||||
Ok(inner) => inner,
|
||||
Err(_err) => {
|
||||
if iters > iter_limit {
|
||||
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
|
||||
// any particular number of iterations: other threads might race ahead and acquire and
|
||||
// release pins just as we're scanning the array.
|
||||
//
|
||||
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
|
||||
// slots. There are two threads running concurrently, A and B. A has just
|
||||
// acquired the permit from the semaphore.
|
||||
//
|
||||
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2, decrement its usage_count to zero and continue the search
|
||||
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
// B: Acquire permit.
|
||||
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
|
||||
// B: Release pin and permit again
|
||||
//
|
||||
// Now we're back in the starting situation that both slots have
|
||||
// usage_count 1, but A has now been through one iteration of the
|
||||
// find_victim() loop. This can repeat indefinitely and on each
|
||||
// iteration, A's iteration count increases by one.
|
||||
//
|
||||
// So, even though the semaphore for the permits is fair, the victim search
|
||||
// itself happens in parallel and is not fair.
|
||||
// Hence even with a permit, a task can theoretically be starved.
|
||||
// To avoid this, we'd need tokio to give priority to tasks that are holding
|
||||
// permits for longer.
|
||||
// Note that just yielding to tokio during iteration without such
|
||||
// priority boosting is likely counter-productive. We'd just give more opportunities
|
||||
// for B to bump usage count, further starving A.
|
||||
crate::metrics::page_cache_errors_inc(
|
||||
crate::metrics::PageCacheErrorKind::EvictIterLimit,
|
||||
);
|
||||
anyhow::bail!("exceeded evict iter limit");
|
||||
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
|
||||
unreachable!("find_victim_waiters prevents starvation");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -911,7 +889,8 @@ impl PageCache {
|
||||
inner.key = None;
|
||||
}
|
||||
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
|
||||
return Ok((slot_idx, inner));
|
||||
|
||||
return Ok(nostarve_position.complete_and_wait((slot_idx, inner)).await);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -955,6 +934,7 @@ impl PageCache {
|
||||
next_evict_slot: AtomicUsize::new(0),
|
||||
size_metrics,
|
||||
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
|
||||
find_victim_waiters: ::nostarve_queue::Queue::new(num_pages),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -293,6 +293,8 @@ pub enum TaskKind {
|
||||
|
||||
DebugTool,
|
||||
|
||||
BackgroundRuntimeTurnaroundMeasure,
|
||||
|
||||
#[cfg(test)]
|
||||
UnitTest,
|
||||
}
|
||||
|
||||
@@ -1714,6 +1714,10 @@ impl Tenant {
|
||||
self.current_state() == TenantState::Active
|
||||
}
|
||||
|
||||
pub fn generation(&self) -> Generation {
|
||||
self.generation
|
||||
}
|
||||
|
||||
/// Changes tenant status to active, unless shutdown was already requested.
|
||||
///
|
||||
/// `background_jobs_can_start` is an optional barrier set to a value during pageserver startup
|
||||
|
||||
@@ -1397,7 +1397,8 @@ pub(crate) enum TenantMapListError {
|
||||
///
|
||||
/// Get list of tenants, for the mgmt API
|
||||
///
|
||||
pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, TenantMapListError> {
|
||||
pub(crate) async fn list_tenants(
|
||||
) -> Result<Vec<(TenantId, TenantState, Generation)>, TenantMapListError> {
|
||||
let tenants = TENANTS.read().unwrap();
|
||||
let m = match &*tenants {
|
||||
TenantsMap::Initializing => return Err(TenantMapListError::Initializing),
|
||||
@@ -1405,12 +1406,12 @@ pub(crate) async fn list_tenants() -> Result<Vec<(TenantId, TenantState)>, Tenan
|
||||
};
|
||||
Ok(m.iter()
|
||||
.filter_map(|(id, tenant)| match tenant {
|
||||
TenantSlot::Attached(tenant) => Some((id, tenant.current_state())),
|
||||
TenantSlot::Attached(tenant) => Some((id, tenant.current_state(), tenant.generation())),
|
||||
TenantSlot::Secondary => None,
|
||||
TenantSlot::InProgress(_) => None,
|
||||
})
|
||||
// TODO(sharding): make callers of this function shard-aware
|
||||
.map(|(k, v)| (k.tenant_id, v))
|
||||
.map(|(a, b, c)| (a.tenant_id, b, c))
|
||||
.collect())
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
pub mod delta_layer;
|
||||
mod filename;
|
||||
mod image_layer;
|
||||
pub mod image_layer;
|
||||
mod inmemory_layer;
|
||||
mod layer;
|
||||
mod layer_desc;
|
||||
|
||||
@@ -69,13 +69,13 @@ use super::{AsLayerDesc, LayerAccessStats, PersistentLayerDesc, ResidentLayer};
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon delta file. Always DELTA_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn_range: Range<Lsn>,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn_range: Range<Lsn>,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
pub index_start_blk: u32,
|
||||
@@ -611,6 +611,61 @@ impl Drop for DeltaLayerWriter {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteSummaryError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != DELTA_FILE_MAGIC {
|
||||
return Err(RewriteSummaryError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in DeltaLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl DeltaLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
@@ -640,11 +695,11 @@ impl DeltaLayerInner {
|
||||
expected_summary.index_start_blk = actual_summary.index_start_blk;
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -67,20 +67,20 @@ use super::{AsLayerDesc, Layer, PersistentLayerDesc, ResidentLayer};
|
||||
/// the 'index' starts at the block indicated by 'index_start_blk'
|
||||
///
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub(super) struct Summary {
|
||||
pub struct Summary {
|
||||
/// Magic value to identify this as a neon image file. Always IMAGE_FILE_MAGIC.
|
||||
magic: u16,
|
||||
format_version: u16,
|
||||
pub magic: u16,
|
||||
pub format_version: u16,
|
||||
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
key_range: Range<Key>,
|
||||
lsn: Lsn,
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
pub key_range: Range<Key>,
|
||||
pub lsn: Lsn,
|
||||
|
||||
/// Block number where the 'index' part of the file begins.
|
||||
index_start_blk: u32,
|
||||
pub index_start_blk: u32,
|
||||
/// Block within the 'index', where the B-tree root page is stored
|
||||
index_root_blk: u32,
|
||||
pub index_root_blk: u32,
|
||||
// the 'values' part starts after the summary header, on block 1.
|
||||
}
|
||||
|
||||
@@ -296,6 +296,61 @@ impl ImageLayer {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum RewriteSummaryError {
|
||||
#[error("magic mismatch")]
|
||||
MagicMismatch,
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for RewriteSummaryError {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::Other(anyhow::anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayer {
|
||||
pub async fn rewrite_summary<F>(
|
||||
path: &Utf8Path,
|
||||
rewrite: F,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), RewriteSummaryError>
|
||||
where
|
||||
F: Fn(Summary) -> Summary,
|
||||
{
|
||||
let file = VirtualFile::open_with_options(
|
||||
path,
|
||||
&*std::fs::OpenOptions::new().read(true).write(true),
|
||||
)
|
||||
.await
|
||||
.with_context(|| format!("Failed to open file '{}'", path))?;
|
||||
let file = FileBlockReader::new(file);
|
||||
let summary_blk = file.read_blk(0, ctx).await?;
|
||||
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
|
||||
let mut file = file.file;
|
||||
if actual_summary.magic != IMAGE_FILE_MAGIC {
|
||||
return Err(RewriteSummaryError::MagicMismatch);
|
||||
}
|
||||
|
||||
let new_summary = rewrite(actual_summary);
|
||||
|
||||
let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new();
|
||||
Summary::ser_into(&new_summary, &mut buf).context("serialize")?;
|
||||
if buf.spilled() {
|
||||
// The code in ImageLayerWriterInner just warn!()s for this.
|
||||
// It should probably error out as well.
|
||||
return Err(RewriteSummaryError::Other(anyhow::anyhow!(
|
||||
"Used more than one page size for summary buffer: {}",
|
||||
buf.len()
|
||||
)));
|
||||
}
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&buf).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImageLayerInner {
|
||||
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
|
||||
/// - inner has the success or transient failure
|
||||
@@ -329,11 +384,11 @@ impl ImageLayerInner {
|
||||
expected_summary.index_root_blk = actual_summary.index_root_blk;
|
||||
|
||||
if actual_summary != expected_summary {
|
||||
bail!(
|
||||
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
actual_summary,
|
||||
expected_summary
|
||||
);
|
||||
// bail!(
|
||||
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
|
||||
// actual_summary,
|
||||
// expected_summary
|
||||
// );
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -44,6 +44,7 @@ pub(crate) enum BackgroundLoopKind {
|
||||
Eviction,
|
||||
ConsumptionMetricsCollectMetrics,
|
||||
ConsumptionMetricsSyntheticSizeWorker,
|
||||
InitialLogicalSizeCalculation,
|
||||
}
|
||||
|
||||
impl BackgroundLoopKind {
|
||||
|
||||
@@ -1822,6 +1822,29 @@ impl Timeline {
|
||||
// delay will be terminated by a timeout regardless.
|
||||
let _completion = { self_clone.initial_logical_size_attempt.lock().expect("unexpected initial_logical_size_attempt poisoned").take() };
|
||||
|
||||
// In prod, initial logical size calucalation is spawned either by
|
||||
// WalReceiverConnectionHandler if the timeline is active according to storage broker,
|
||||
// or by the first consumption metrics worker (MetricsCollection).
|
||||
// The latter runs every `metric_collection_interval` and checkpoints to disk, i.e.,
|
||||
// if pageserver gets restarted, the consumption metrics worker will resume waiting
|
||||
// for the correct remaining time, as if the pageserver had not been restarted.
|
||||
//
|
||||
// FIXME: with the current code, walreceiver requests would also hit this semaphore
|
||||
// and get queued behind other background operations. That's bad because walreceiver_connection
|
||||
// will push the not-precise value as `current_timeline_size` in the `PageserverFeedback`
|
||||
// while this calculation is stuck.
|
||||
// We need some way to priority-boost the initial size calculation if walreceiver is asking.
|
||||
// Or, should we maybe revisit the use of logical size in `PageserverFeedback`?
|
||||
// It seems broken the way it is.
|
||||
//
|
||||
// Example query to show different causes of initial size calculation spawning:
|
||||
//
|
||||
// https://neonprod.grafana.net/explore?panes=%7B%22wSx%22:%7B%22datasource%22:%22grafanacloud-logs%22,%22queries%22:%5B%7B%22refId%22:%22A%22,%22expr%22:%22sum%20by%20%28task_kind%29%20%28count_over_time%28%7Bneon_service%3D%5C%22pageserver%5C%22,%20neon_region%3D%5C%22us-west-2%5C%22%7D%20%7C%3D%20%60logical%20size%20computation%20from%20context%20of%20task%20kind%60%20%7C%20regexp%20%60logical%20size%20computation%20from%20context%20of%20task%20kind%20%28%3FP%3Ctask_kind%3E.%2A%29%60%20%5B1m%5D%29%29%22,%22queryType%22:%22range%22,%22datasource%22:%7B%22type%22:%22loki%22,%22uid%22:%22grafanacloud-logs%22%7D,%22editorMode%22:%22code%22,%22step%22:%221m%22%7D%5D,%22range%22:%7B%22from%22:%221700637500615%22,%22to%22:%221700639648743%22%7D%7D%7D&schemaVersion=1&orgId=1
|
||||
let _permit = match crate::tenant::tasks::concurrent_background_tasks_rate_limit(BackgroundLoopKind::InitialLogicalSizeCalculation,&background_ctx, &cancel).await {
|
||||
Ok(permit) => permit,
|
||||
Err(RateLimitError::Cancelled) => return Ok(()),
|
||||
};
|
||||
|
||||
let calculated_size = match self_clone
|
||||
.logical_size_calculation_task(lsn, LogicalSizeCalculationCause::Initial, &background_ctx)
|
||||
.await
|
||||
|
||||
@@ -1572,7 +1572,7 @@ class NeonAttachmentService:
|
||||
self.running = False
|
||||
return self
|
||||
|
||||
def attach_hook(self, tenant_id: TenantId, pageserver_id: int) -> int:
|
||||
def attach_hook_issue(self, tenant_id: TenantId, pageserver_id: int) -> int:
|
||||
response = requests.post(
|
||||
f"{self.env.control_plane_api}/attach-hook",
|
||||
json={"tenant_id": str(tenant_id), "node_id": pageserver_id},
|
||||
@@ -1582,6 +1582,13 @@ class NeonAttachmentService:
|
||||
assert isinstance(gen, int)
|
||||
return gen
|
||||
|
||||
def attach_hook_drop(self, tenant_id: TenantId):
|
||||
response = requests.post(
|
||||
f"{self.env.control_plane_api}/attach-hook",
|
||||
json={"tenant_id": str(tenant_id), "node_id": None},
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
def __enter__(self) -> "NeonAttachmentService":
|
||||
return self
|
||||
|
||||
@@ -1781,13 +1788,20 @@ class NeonPageserver(PgProtocol):
|
||||
to call into the pageserver HTTP client.
|
||||
"""
|
||||
if self.env.attachment_service is not None:
|
||||
generation = self.env.attachment_service.attach_hook(tenant_id, self.id)
|
||||
generation = self.env.attachment_service.attach_hook_issue(tenant_id, self.id)
|
||||
else:
|
||||
generation = None
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_attach(tenant_id, config, config_null, generation=generation)
|
||||
|
||||
def tenant_detach(self, tenant_id: TenantId):
|
||||
if self.env.attachment_service is not None:
|
||||
self.env.attachment_service.attach_hook_drop(tenant_id)
|
||||
|
||||
client = self.http_client()
|
||||
return client.tenant_detach(tenant_id)
|
||||
|
||||
|
||||
def append_pageserver_param_overrides(
|
||||
params_to_update: List[str],
|
||||
|
||||
156
test_runner/performance/test_pageserver_startup_many_tenants.py
Normal file
156
test_runner/performance/test_pageserver_startup_many_tenants.py
Normal file
@@ -0,0 +1,156 @@
|
||||
import queue
|
||||
import shutil
|
||||
import subprocess
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PgBin,
|
||||
last_flush_lsn_upload,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
|
||||
from fixtures.types import TenantId
|
||||
|
||||
|
||||
def duplicate_tenant(
|
||||
env: NeonEnv, remote_storage: LocalFsStorage, template_tenant: TenantId, new_tenant: TenantId
|
||||
):
|
||||
src_timelines_dir: Path = remote_storage.tenant_path(template_tenant) / "timelines"
|
||||
assert src_timelines_dir.is_dir(), f"{src_timelines_dir} is not a directory"
|
||||
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
dst_timelines_dir: Path = remote_storage.tenant_path(new_tenant) / "timelines"
|
||||
dst_timelines_dir.parent.mkdir(parents=False, exist_ok=False)
|
||||
dst_timelines_dir.mkdir(parents=False, exist_ok=False)
|
||||
|
||||
for tl in src_timelines_dir.iterdir():
|
||||
src_tl_dir = src_timelines_dir / tl.name
|
||||
assert src_tl_dir.is_dir(), f"{src_tl_dir} is not a directory"
|
||||
dst_tl_dir = dst_timelines_dir / tl.name
|
||||
dst_tl_dir.mkdir(parents=False, exist_ok=False)
|
||||
for file in tl.iterdir():
|
||||
shutil.copy2(file, dst_tl_dir)
|
||||
if "__" in file.name:
|
||||
cmd: List[str] = [
|
||||
str(
|
||||
env.neon_binpath / "pagectl"
|
||||
), # TODO: abstract this like the other binaries
|
||||
"layer",
|
||||
"rewrite-summary",
|
||||
str(dst_tl_dir / file.name),
|
||||
"--new-tenant-id",
|
||||
str(new_tenant),
|
||||
]
|
||||
subprocess.run(cmd, check=True)
|
||||
else:
|
||||
# index_part etc need no patching
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"""
|
||||
Usage
|
||||
|
||||
TEST_OUTPUT=/mnt/many_tenants NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
|
||||
|
||||
Then
|
||||
|
||||
export NEON_REPO_DIR=/mnt/many_tenants/test_pageserver_startup_many_tenants/repo
|
||||
|
||||
# edit $NEON_REPO_DIR/pageserver_1/pageserver.toml to use metric collection,
|
||||
# with intervals from prod:
|
||||
#
|
||||
# metric_collection_endpoint = "https://127.0.0.1:6666"
|
||||
# metric_collection_interval: 10min
|
||||
# cached_metric_collection_interval: 0s
|
||||
|
||||
# run a fake metric collection endpoint in some other terminal using
|
||||
# python3 -m http.server 6666
|
||||
|
||||
# then start pageserver
|
||||
./target/release/neon_local start
|
||||
|
||||
|
||||
|
||||
"""
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
neon_env_builder.enable_generations = True
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
remote_storage = env.pageserver_remote_storage
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
|
||||
# cleanup initial tenant
|
||||
env.pageserver.tenant_detach(env.initial_tenant)
|
||||
|
||||
# create our template tenant
|
||||
tenant_config_mgmt_api = {
|
||||
"gc_period": "0s",
|
||||
"checkpoint_timeout": "3650 day",
|
||||
"compaction_period": "20 s",
|
||||
"compaction_threshold": 10,
|
||||
"compaction_target_size": 134217728,
|
||||
"checkpoint_distance": 268435456,
|
||||
"image_creation_threshold": 3,
|
||||
}
|
||||
tenant_config_cli = {k: str(v) for k, v in tenant_config_mgmt_api.items()}
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
template_tenant, template_timeline = env.neon_cli.create_tenant(conf=tenant_config_cli)
|
||||
ep = env.endpoints.create_start("main", tenant_id=template_tenant)
|
||||
ep.safe_psql("create table foo(b text)")
|
||||
for _i in range(0, 8):
|
||||
ep.safe_psql("insert into foo(b) values ('some text')")
|
||||
last_flush_lsn_upload(env, ep, template_tenant, template_timeline)
|
||||
ep.stop_and_destroy()
|
||||
env.pageserver.tenant_detach(template_tenant)
|
||||
|
||||
# duplicate the tenant in remote storage
|
||||
def worker(queue: queue.Queue[Optional[TenantId]]):
|
||||
while True:
|
||||
tenant_id = queue.get()
|
||||
if tenant_id is None:
|
||||
return
|
||||
assert isinstance(remote_storage, LocalFsStorage)
|
||||
duplicate_tenant(env, remote_storage, template_tenant, tenant_id)
|
||||
|
||||
new_tenants: List[TenantId] = [TenantId.generate() for _ in range(0, 20_000)]
|
||||
duplications: queue.Queue[Optional[TenantId]] = queue.Queue()
|
||||
for t in new_tenants:
|
||||
duplications.put(t)
|
||||
workers = []
|
||||
for _ in range(0, 8):
|
||||
w = threading.Thread(target=worker, args=[duplications])
|
||||
workers.append(w)
|
||||
w.start()
|
||||
duplications.put(None)
|
||||
for w in workers:
|
||||
w.join()
|
||||
|
||||
# for evaluation, use the same background loop periods as in prod
|
||||
benchmark_tenant_config = {k: v for k, v in tenant_config_mgmt_api.items()}
|
||||
del benchmark_tenant_config["compaction_period"]
|
||||
del benchmark_tenant_config["gc_period"]
|
||||
benchmark_tenant_config["eviction_policy"] = {
|
||||
"kind": "LayerAccessThreshold",
|
||||
"period": "10m",
|
||||
# don't do evictions
|
||||
"threshold": "1000d",
|
||||
}
|
||||
|
||||
assert ps_http.tenant_list() == []
|
||||
for tenant in new_tenants:
|
||||
env.pageserver.tenant_attach(tenant, config=benchmark_tenant_config)
|
||||
for tenant in new_tenants:
|
||||
wait_until_tenant_active(ps_http, tenant)
|
||||
|
||||
# ensure all layers are resident for predictiable performance
|
||||
# TODO: ensure all kinds of eviction are disabled (per-tenant, disk-usage-based)
|
||||
for tenant in new_tenants:
|
||||
ps_http.download_all_layers(tenant, template_timeline)
|
||||
@@ -282,7 +282,7 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
# Now advance the generation in the control plane: subsequent validations
|
||||
# from the running pageserver will fail. No more deletions should happen.
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
generate_uploads_and_deletions(env, init=False)
|
||||
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -397,7 +397,7 @@ def test_deletion_queue_recovery(
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
assert env.attachment_service is not None
|
||||
env.attachment_service.attach_hook(env.initial_tenant, some_other_pageserver)
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user