diff --git a/Cargo.lock b/Cargo.lock index 98daddbd96..9a5ac0b1d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2157,6 +2157,7 @@ dependencies = [ "svg_fmt", "tar", "tempfile", + "tenant_size_model", "thiserror", "tokio", "tokio-postgres", @@ -3533,6 +3534,13 @@ dependencies = [ "winapi", ] +[[package]] +name = "tenant_size_model" +version = "0.1.0" +dependencies = [ + "workspace_hack", +] + [[package]] name = "termcolor" version = "1.1.3" diff --git a/libs/tenant_size_model/.gitignore b/libs/tenant_size_model/.gitignore new file mode 100644 index 0000000000..15a65bec1e --- /dev/null +++ b/libs/tenant_size_model/.gitignore @@ -0,0 +1,3 @@ +*.dot +*.png +*.svg diff --git a/libs/tenant_size_model/Cargo.toml b/libs/tenant_size_model/Cargo.toml new file mode 100644 index 0000000000..1aabf5a4f9 --- /dev/null +++ b/libs/tenant_size_model/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "tenant_size_model" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +workspace_hack = { version = "0.1", path = "../../workspace_hack" } diff --git a/libs/tenant_size_model/Makefile b/libs/tenant_size_model/Makefile new file mode 100644 index 0000000000..1cffe81c10 --- /dev/null +++ b/libs/tenant_size_model/Makefile @@ -0,0 +1,13 @@ +all: 1.svg 2.svg 3.svg 4.svg 1.png 2.png 3.png 4.png + +../../target/debug/tenant_size_model: Cargo.toml src/main.rs src/lib.rs + cargo build --bin tenant_size_model + +%.svg: %.dot + dot -Tsvg $< > $@ + +%.png: %.dot + dot -Tpng $< > $@ + +%.dot: ../../target/debug/tenant_size_model + ../../target/debug/tenant_size_model $* > $@ diff --git a/libs/tenant_size_model/README.md b/libs/tenant_size_model/README.md new file mode 100644 index 0000000000..b850130d67 --- /dev/null +++ b/libs/tenant_size_model/README.md @@ -0,0 +1,7 @@ +# Logical size + WAL pricing + +This is a simulator to calculate the tenant size in different scenarios, +using the "Logical size + WAL" method. Makefile produces diagrams used in a +private presentation: + +https://docs.google.com/presentation/d/1OapE4k11xmcwMh7I7YvNWGC63yCRLh6udO9bXZ-fZmo/edit?usp=sharing diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs new file mode 100644 index 0000000000..c7ec1e8870 --- /dev/null +++ b/libs/tenant_size_model/src/lib.rs @@ -0,0 +1,349 @@ +use std::borrow::Cow; +use std::collections::HashMap; + +/// Pricing model or history size builder. +/// +/// Maintains knowledge of the branches and their modifications. Generic over the branch name key +/// type. +pub struct Storage { + segments: Vec, + + /// Mapping from the branch name to the index of a segment describing it's latest state. + branches: HashMap, +} + +/// Snapshot of a branch. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Segment { + /// Previous segment index into ['Storage::segments`], if any. + parent: Option, + + /// Description of how did we get to this state. + /// + /// Mainly used in the original scenarios 1..=4 with insert, delete and update. Not used when + /// modifying a branch directly. + pub op: Cow<'static, str>, + + /// LSN before this state + start_lsn: u64, + + /// LSN at this state + pub end_lsn: u64, + + /// Logical size before this state + start_size: u64, + + /// Logical size at this state + pub end_size: u64, + + /// Indices to [`Storage::segments`] + /// + /// FIXME: this could be an Option + children_after: Vec, + + /// Determined by `retention_period` given to [`Storage::calculate`] + pub needed: bool, +} + +// +// +// +// +// *-g--*---D---> +// / +// / +// / *---b----*-B---> +// / / +// / / +// -----*--e---*-----f----* C +// E \ +// \ +// *--a---*---A--> +// +// If A and B need to be retained, is it cheaper to store +// snapshot at C+a+b, or snapshots at A and B ? +// +// If D also needs to be retained, which is cheaper: +// +// 1. E+g+e+f+a+b +// 2. D+C+a+b +// 3. D+A+B + +/// [`Segment`] which has had it's size calculated. +pub struct SegmentSize { + pub seg_id: usize, + + pub method: SegmentMethod, + + this_size: u64, + + pub children: Vec, +} + +impl SegmentSize { + fn total(&self) -> u64 { + self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total()) + } + + pub fn total_children(&self) -> u64 { + if self.method == SnapshotAfter { + self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total()) + } else { + self.children.iter().fold(0, |acc, x| acc + x.total()) + } + } +} + +/// Different methods to retain history from a particular state +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum SegmentMethod { + SnapshotAfter, + Wal, + WalNeeded, + Skipped, +} + +use SegmentMethod::*; + +impl Storage { + /// Creates a new storage with the given default branch name. + pub fn new(initial_branch: K) -> Storage { + let init_segment = Segment { + op: "".into(), + needed: false, + parent: None, + start_lsn: 0, + end_lsn: 0, + start_size: 0, + end_size: 0, + children_after: Vec::new(), + }; + + Storage { + segments: vec![init_segment], + branches: HashMap::from([(initial_branch, 0)]), + } + } + + /// Advances the branch with the named operation, by the relative LSN and logical size bytes. + pub fn modify_branch( + &mut self, + branch: &Q, + op: Cow<'static, str>, + lsn_bytes: u64, + size_bytes: i64, + ) where + K: std::borrow::Borrow, + Q: std::hash::Hash + Eq, + { + let lastseg_id = *self.branches.get(branch).unwrap(); + let newseg_id = self.segments.len(); + let lastseg = &mut self.segments[lastseg_id]; + + let newseg = Segment { + op, + parent: Some(lastseg_id), + start_lsn: lastseg.end_lsn, + end_lsn: lastseg.end_lsn + lsn_bytes, + start_size: lastseg.end_size, + end_size: (lastseg.end_size as i64 + size_bytes) as u64, + children_after: Vec::new(), + needed: false, + }; + lastseg.children_after.push(newseg_id); + + self.segments.push(newseg); + *self.branches.get_mut(branch).expect("read already") = newseg_id; + } + + pub fn insert(&mut self, branch: &Q, bytes: u64) + where + K: std::borrow::Borrow, + Q: std::hash::Hash + Eq, + { + self.modify_branch(branch, "insert".into(), bytes, bytes as i64); + } + + pub fn update(&mut self, branch: &Q, bytes: u64) + where + K: std::borrow::Borrow, + Q: std::hash::Hash + Eq, + { + self.modify_branch(branch, "update".into(), bytes, 0i64); + } + + pub fn delete(&mut self, branch: &Q, bytes: u64) + where + K: std::borrow::Borrow, + Q: std::hash::Hash + Eq, + { + self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)); + } + + /// Panics if the parent branch cannot be found. + pub fn branch(&mut self, parent: &Q, name: K) + where + K: std::borrow::Borrow, + Q: std::hash::Hash + Eq, + { + // Find the right segment + let branchseg_id = *self + .branches + .get(parent) + .expect("should had found the parent by key"); + let _branchseg = &mut self.segments[branchseg_id]; + + // Create branch name for it + self.branches.insert(name, branchseg_id); + } + + pub fn calculate(&mut self, retention_period: u64) -> SegmentSize { + // Phase 1: Mark all the segments that need to be retained + for (_branch, &last_seg_id) in self.branches.iter() { + let last_seg = &self.segments[last_seg_id]; + let cutoff_lsn = last_seg.start_lsn.saturating_sub(retention_period); + let mut seg_id = last_seg_id; + loop { + let seg = &mut self.segments[seg_id]; + if seg.end_lsn < cutoff_lsn { + break; + } + seg.needed = true; + if let Some(prev_seg_id) = seg.parent { + seg_id = prev_seg_id; + } else { + break; + } + } + } + + // Phase 2: For each oldest segment in a chain that needs to be retained, + // calculate if we should store snapshot or WAL + self.size_from_snapshot_later(0) + } + + fn size_from_wal(&self, seg_id: usize) -> SegmentSize { + let seg = &self.segments[seg_id]; + + let this_size = seg.end_lsn - seg.start_lsn; + + let mut children = Vec::new(); + + // try both ways + for &child_id in seg.children_after.iter() { + // try each child both ways + let child = &self.segments[child_id]; + let p1 = self.size_from_wal(child_id); + + let p = if !child.needed { + let p2 = self.size_from_snapshot_later(child_id); + if p1.total() < p2.total() { + p1 + } else { + p2 + } + } else { + p1 + }; + children.push(p); + } + SegmentSize { + seg_id, + method: if seg.needed { WalNeeded } else { Wal }, + this_size, + children, + } + } + + fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize { + // If this is needed, then it's time to do the snapshot and continue + // with wal method. + let seg = &self.segments[seg_id]; + //eprintln!("snap: seg{}: {} needed: {}", seg_id, seg.children_after.len(), seg.needed); + if seg.needed { + let mut children = Vec::new(); + + for &child_id in seg.children_after.iter() { + // try each child both ways + let child = &self.segments[child_id]; + let p1 = self.size_from_wal(child_id); + + let p = if !child.needed { + let p2 = self.size_from_snapshot_later(child_id); + if p1.total() < p2.total() { + p1 + } else { + p2 + } + } else { + p1 + }; + children.push(p); + } + SegmentSize { + seg_id, + method: WalNeeded, + this_size: seg.start_size, + children, + } + } else { + // If any of the direct children are "needed", need to be able to reconstruct here + let mut children_needed = false; + for &child in seg.children_after.iter() { + let seg = &self.segments[child]; + if seg.needed { + children_needed = true; + break; + } + } + + let method1 = if !children_needed { + let mut children = Vec::new(); + for child in seg.children_after.iter() { + children.push(self.size_from_snapshot_later(*child)); + } + Some(SegmentSize { + seg_id, + method: Skipped, + this_size: 0, + children, + }) + } else { + None + }; + + // If this a junction, consider snapshotting here + let method2 = if children_needed || seg.children_after.len() >= 2 { + let mut children = Vec::new(); + for child in seg.children_after.iter() { + children.push(self.size_from_wal(*child)); + } + Some(SegmentSize { + seg_id, + method: SnapshotAfter, + this_size: seg.end_size, + children, + }) + } else { + None + }; + + match (method1, method2) { + (None, None) => panic!(), + (Some(method), None) => method, + (None, Some(method)) => method, + (Some(method1), Some(method2)) => { + if method1.total() < method2.total() { + method1 + } else { + method2 + } + } + } + } + } + + pub fn into_segments(self) -> Vec { + self.segments + } +} diff --git a/libs/tenant_size_model/src/main.rs b/libs/tenant_size_model/src/main.rs new file mode 100644 index 0000000000..47c0e8122f --- /dev/null +++ b/libs/tenant_size_model/src/main.rs @@ -0,0 +1,268 @@ +//! Tenant size model testing ground. +//! +//! Has a number of scenarios and a `main` for invoking these by number, calculating the history +//! size, outputs graphviz graph. Makefile in directory shows how to use graphviz to turn scenarios +//! into pngs. + +use tenant_size_model::{Segment, SegmentSize, Storage}; + +// Main branch only. Some updates on it. +fn scenario_1() -> (Vec, SegmentSize) { + // Create main branch + let mut storage = Storage::new("main"); + + // Bulk load 5 GB of data to it + storage.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + storage.update("main", 1_000); + } + + let size = storage.calculate(1000); + + (storage.into_segments(), size) +} + +// Main branch only. Some updates on it. +fn scenario_2() -> (Vec, SegmentSize) { + // Create main branch + let mut storage = Storage::new("main"); + + // Bulk load 5 GB of data to it + storage.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + storage.update("main", 1_000); + } + + // Branch + storage.branch("main", "child"); + storage.update("child", 1_000); + + // More updates on parent + storage.update("main", 1_000); + + let size = storage.calculate(1000); + + (storage.into_segments(), size) +} + +// Like 2, but more updates on main +fn scenario_3() -> (Vec, SegmentSize) { + // Create main branch + let mut storage = Storage::new("main"); + + // Bulk load 5 GB of data to it + storage.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + storage.update("main", 1_000); + } + + // Branch + storage.branch("main", "child"); + storage.update("child", 1_000); + + // More updates on parent + for _ in 0..5 { + storage.update("main", 1_000); + } + + let size = storage.calculate(1000); + + (storage.into_segments(), size) +} + +// Diverged branches +fn scenario_4() -> (Vec, SegmentSize) { + // Create main branch + let mut storage = Storage::new("main"); + + // Bulk load 5 GB of data to it + storage.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + storage.update("main", 1_000); + } + + // Branch + storage.branch("main", "child"); + storage.update("child", 1_000); + + // More updates on parent + for _ in 0..8 { + storage.update("main", 1_000); + } + + let size = storage.calculate(1000); + + (storage.into_segments(), size) +} + +fn scenario_5() -> (Vec, SegmentSize) { + let mut storage = Storage::new("a"); + storage.insert("a", 5000); + storage.branch("a", "b"); + storage.update("b", 4000); + storage.update("a", 2000); + storage.branch("a", "c"); + storage.insert("c", 4000); + storage.insert("a", 2000); + + let size = storage.calculate(5000); + + (storage.into_segments(), size) +} + +fn scenario_6() -> (Vec, SegmentSize) { + use std::borrow::Cow; + + const NO_OP: Cow<'static, str> = Cow::Borrowed(""); + + let branches = [ + Some(0x7ff1edab8182025f15ae33482edb590a_u128), + Some(0xb1719e044db05401a05a2ed588a3ad3f), + Some(0xb68d6691c895ad0a70809470020929ef), + ]; + + // compared to other scenarios, this one uses bytes instead of kB + + let mut storage = Storage::new(None); + + storage.branch(&None, branches[0]); // at 0 + storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064 + storage.branch(&branches[0], branches[1]); // at 108951064 + storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472 + storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424 + storage.branch(&branches[0], branches[2]); // at 283415424 + storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616 + storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400 + + let size = storage.calculate(100_000); + + (storage.into_segments(), size) +} + +fn main() { + let args: Vec = std::env::args().collect(); + + let scenario = if args.len() < 2 { "1" } else { &args[1] }; + + let (segments, size) = match scenario { + "1" => scenario_1(), + "2" => scenario_2(), + "3" => scenario_3(), + "4" => scenario_4(), + "5" => scenario_5(), + "6" => scenario_6(), + other => { + eprintln!("invalid scenario {}", other); + std::process::exit(1); + } + }; + + graphviz_tree(&segments, &size); +} + +fn graphviz_recurse(segments: &[Segment], node: &SegmentSize) { + use tenant_size_model::SegmentMethod::*; + + let seg_id = node.seg_id; + let seg = segments.get(seg_id).unwrap(); + let lsn = seg.end_lsn; + let size = seg.end_size; + let method = node.method; + + println!(" {{"); + println!(" node [width=0.1 height=0.1 shape=oval]"); + + let tenant_size = node.total_children(); + + let penwidth = if seg.needed { 6 } else { 3 }; + let x = match method { + SnapshotAfter => + format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" style=filled penwidth={penwidth}"), + Wal => + format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"), + WalNeeded => + format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"), + Skipped => + format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"gray\" penwidth={penwidth}"), + }; + + println!(" \"seg{seg_id}\" [{x}]"); + println!(" }}"); + + // Recurse. Much of the data is actually on the edge + for child in node.children.iter() { + let child_id = child.seg_id; + graphviz_recurse(segments, child); + + let edge_color = match child.method { + SnapshotAfter => "gray", + Wal => "black", + WalNeeded => "black", + Skipped => "gray", + }; + + println!(" {{"); + println!(" edge [] "); + print!(" \"seg{seg_id}\" -> \"seg{child_id}\" ["); + print!("color={edge_color}"); + if child.method == WalNeeded { + print!(" penwidth=6"); + } + if child.method == Wal { + print!(" penwidth=3"); + } + + let next = segments.get(child_id).unwrap(); + + if next.op.is_empty() { + print!( + " label=\"{} / {}\"", + next.end_lsn - seg.end_lsn, + (next.end_size as i128 - seg.end_size as i128) + ); + } else { + print!(" label=\"{}: {}\"", next.op, next.end_lsn - seg.end_lsn); + } + println!("]"); + println!(" }}"); + } +} + +fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) { + println!("digraph G {{"); + println!(" fontname=\"Helvetica,Arial,sans-serif\""); + println!(" node [fontname=\"Helvetica,Arial,sans-serif\"]"); + println!(" edge [fontname=\"Helvetica,Arial,sans-serif\"]"); + println!(" graph [center=1 rankdir=LR]"); + println!(" edge [dir=none]"); + + graphviz_recurse(segments, tree); + + println!("}}"); +} + +#[test] +fn scenarios_return_same_size() { + type ScenarioFn = fn() -> (Vec, SegmentSize); + let truths: &[(u32, ScenarioFn, _)] = &[ + (line!(), scenario_1, 8000), + (line!(), scenario_2, 9000), + (line!(), scenario_3, 13000), + (line!(), scenario_4, 16000), + (line!(), scenario_5, 17000), + (line!(), scenario_6, 333_792_000), + ]; + + for (line, scenario, expected) in truths { + let (_, size) = scenario(); + assert_eq!(*expected, size.total_children(), "scenario on line {line}"); + } +} diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index 289cec12a8..39fed8670d 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -13,7 +13,7 @@ use crate::seqwait::MonotonicCounter; pub const XLOG_BLCKSZ: u32 = 8192; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr -#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] +#[derive(Clone, Copy, Eq, Ord, PartialEq, PartialOrd, Hash, Serialize, Deserialize)] #[serde(transparent)] pub struct Lsn(pub u64); diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 4262ca9820..7ce936ca27 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -63,6 +63,7 @@ etcd_broker = { path = "../libs/etcd_broker" } metrics = { path = "../libs/metrics" } utils = { path = "../libs/utils" } remote_storage = { path = "../libs/remote_storage" } +tenant_size_model = { path = "../libs/tenant_size_model" } workspace_hack = { version = "0.1", path = "../workspace_hack" } close_fds = "0.3.2" walkdir = "2.3.2" diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 6a372fb081..747e63af2b 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -9,6 +9,7 @@ use remote_storage::RemoteStorageConfig; use std::env; use utils::crashsafe::path_with_suffix_extension; +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; @@ -48,6 +49,9 @@ pub mod defaults { pub const DEFAULT_LOG_FORMAT: &str = "plain"; + pub const DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES: usize = + super::ConfigurableSemaphore::DEFAULT_INITIAL.get(); + /// /// Default built-in configuration file. /// @@ -67,6 +71,9 @@ pub mod defaults { #initial_superuser_name = '{DEFAULT_SUPERUSER}' #log_format = '{DEFAULT_LOG_FORMAT}' + +#concurrent_tenant_size_logical_size_queries = '{DEFAULT_CONCURRENT_TENANT_SIZE_LOGICAL_SIZE_QUERIES}' + # [tenant_config] #checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes #checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT} @@ -132,6 +139,9 @@ pub struct PageServerConf { pub broker_endpoints: Vec, pub log_format: LogFormat, + + /// Number of concurrent [`Tenant::gather_size_inputs`] allowed. + pub concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore, } #[derive(Debug, Clone, PartialEq, Eq)] @@ -200,6 +210,8 @@ struct PageServerConfigBuilder { broker_endpoints: BuilderValue>, log_format: BuilderValue, + + concurrent_tenant_size_logical_size_queries: BuilderValue, } impl Default for PageServerConfigBuilder { @@ -228,6 +240,8 @@ impl Default for PageServerConfigBuilder { broker_etcd_prefix: Set(etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string()), broker_endpoints: Set(Vec::new()), log_format: Set(LogFormat::from_str(DEFAULT_LOG_FORMAT).unwrap()), + + concurrent_tenant_size_logical_size_queries: Set(ConfigurableSemaphore::default()), } } } @@ -304,6 +318,10 @@ impl PageServerConfigBuilder { self.log_format = BuilderValue::Set(log_format) } + pub fn concurrent_tenant_size_logical_size_queries(&mut self, u: ConfigurableSemaphore) { + self.concurrent_tenant_size_logical_size_queries = BuilderValue::Set(u); + } + pub fn build(self) -> anyhow::Result { let broker_endpoints = self .broker_endpoints @@ -349,6 +367,11 @@ impl PageServerConfigBuilder { .broker_etcd_prefix .ok_or(anyhow!("missing broker_etcd_prefix"))?, log_format: self.log_format.ok_or(anyhow!("missing log_format"))?, + concurrent_tenant_size_logical_size_queries: self + .concurrent_tenant_size_logical_size_queries + .ok_or(anyhow!( + "missing concurrent_tenant_size_logical_size_queries" + ))?, }) } } @@ -476,6 +499,12 @@ impl PageServerConf { "log_format" => builder.log_format( LogFormat::from_config(&parse_toml_string(key, item)?)? ), + "concurrent_tenant_size_logical_size_queries" => builder.concurrent_tenant_size_logical_size_queries({ + let input = parse_toml_string(key, item)?; + let permits = input.parse::().context("expected a number of initial permits, not {s:?}")?; + let permits = NonZeroUsize::new(permits).context("initial semaphore permits out of range: 0, use other configuration to disable a feature")?; + ConfigurableSemaphore::new(permits) + }), _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -589,6 +618,7 @@ impl PageServerConf { broker_endpoints: Vec::new(), broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), + concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), } } } @@ -654,6 +684,58 @@ fn parse_toml_array(name: &str, item: &Item) -> anyhow::Result> { .collect() } +/// Configurable semaphore permits setting. +/// +/// Does not allow semaphore permits to be zero, because at runtime initially zero permits and empty +/// semaphore cannot be distinguished, leading any feature using these to await forever (or until +/// new permits are added). +#[derive(Debug, Clone)] +pub struct ConfigurableSemaphore { + initial_permits: NonZeroUsize, + inner: std::sync::Arc, +} + +impl ConfigurableSemaphore { + pub const DEFAULT_INITIAL: NonZeroUsize = match NonZeroUsize::new(1) { + Some(x) => x, + None => panic!("const unwrap is not yet stable"), + }; + + /// Initializse using a non-zero amount of permits. + /// + /// Require a non-zero initial permits, because using permits == 0 is a crude way to disable a + /// feature such as [`Tenant::gather_size_inputs`]. Otherwise any semaphore using future will + /// behave like [`futures::future::pending`], just waiting until new permits are added. + pub fn new(initial_permits: NonZeroUsize) -> Self { + ConfigurableSemaphore { + initial_permits, + inner: std::sync::Arc::new(tokio::sync::Semaphore::new(initial_permits.get())), + } + } +} + +impl Default for ConfigurableSemaphore { + fn default() -> Self { + Self::new(Self::DEFAULT_INITIAL) + } +} + +impl PartialEq for ConfigurableSemaphore { + fn eq(&self, other: &Self) -> bool { + // the number of permits can be increased at runtime, so we cannot really fulfill the + // PartialEq value equality otherwise + self.initial_permits == other.initial_permits + } +} + +impl Eq for ConfigurableSemaphore {} + +impl ConfigurableSemaphore { + pub fn inner(&self) -> &std::sync::Arc { + &self.inner + } +} + #[cfg(test)] mod tests { use std::{ @@ -725,6 +807,7 @@ log_format = 'json' .expect("Failed to parse a valid broker endpoint URL")], broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), log_format: LogFormat::from_str(defaults::DEFAULT_LOG_FORMAT).unwrap(), + concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), }, "Correct defaults should be used when no config values are provided" ); @@ -770,6 +853,7 @@ log_format = 'json' .expect("Failed to parse a valid broker endpoint URL")], broker_etcd_prefix: etcd_broker::DEFAULT_NEON_BROKER_ETCD_PREFIX.to_string(), log_format: LogFormat::Json, + concurrent_tenant_size_logical_size_queries: ConfigurableSemaphore::default(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 89609f5674..1bb5f94f4e 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -354,6 +354,54 @@ paths: schema: $ref: "#/components/schemas/Error" + /v1/tenant/{tenant_id}/size: + parameters: + - name: tenant_id + in: path + required: true + schema: + type: string + format: hex + get: + description: | + Calculate tenant's size, which is a mixture of WAL (bytes) and logical_size (bytes). + responses: + "200": + description: OK, + content: + application/json: + schema: + type: object + required: + - id + - size + properties: + id: + type: string + format: hex + size: + type: integer + description: | + Size metric in bytes. + "401": + description: Unauthorized Error + content: + application/json: + schema: + $ref: "#/components/schemas/UnauthorizedError" + "403": + description: Forbidden Error + content: + application/json: + schema: + $ref: "#/components/schemas/ForbiddenError" + "500": + description: Generic operation error + content: + application/json: + schema: + $ref: "#/components/schemas/Error" + /v1/tenant/{tenant_id}/timeline/: parameters: - name: tenant_id diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index d88cf6e075..7087c68dbd 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -566,6 +566,44 @@ async fn tenant_status(request: Request) -> Result, ApiErro ) } +async fn tenant_size_handler(request: Request) -> Result, ApiError> { + let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; + check_permission(&request, Some(tenant_id))?; + + let tenant = tenant_mgr::get_tenant(tenant_id, false).map_err(ApiError::InternalServerError)?; + + // this can be long operation, it currently is not backed by any request coalescing or similar + let inputs = tenant + .gather_size_inputs() + .await + .map_err(ApiError::InternalServerError)?; + + let size = inputs.calculate().map_err(ApiError::InternalServerError)?; + + /// Private response type with the additional "unstable" `inputs` field. + /// + /// The type is described with `id` and `size` in the openapi_spec file, but the `inputs` is + /// intentionally left out. The type resides in the pageserver not to expose `ModelInputs`. + #[serde_with::serde_as] + #[derive(serde::Serialize)] + struct TenantHistorySize { + #[serde_as(as = "serde_with::DisplayFromStr")] + id: TenantId, + /// Size is a mixture of WAL and logical size, so the unit is bytes. + size: u64, + inputs: crate::tenant::size::ModelInputs, + } + + json_response( + StatusCode::OK, + TenantHistorySize { + id: tenant_id, + size, + inputs, + }, + ) +} + // Helper function to standardize the error messages we produce on bad durations // // Intended to be used with anyhow's `with_context`, e.g.: @@ -893,6 +931,7 @@ pub fn make_router( .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) .get("/v1/tenant/:tenant_id", tenant_status) + .get("/v1/tenant/:tenant_id/size", tenant_size_handler) .put("/v1/tenant/config", tenant_config_handler) .get("/v1/tenant/:tenant_id/timeline", timeline_list_handler) .post("/v1/tenant/:tenant_id/timeline", timeline_create_handler) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 7ae2d0f14c..43586b926d 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -31,6 +31,7 @@ const STORAGE_TIME_OPERATIONS: &[&str] = &[ "compact", "create images", "init logical size", + "logical size", "load layer map", "gc", ]; @@ -365,6 +366,7 @@ pub struct TimelineMetrics { pub compact_time_histo: Histogram, pub create_images_time_histo: Histogram, pub init_logical_size_histo: Histogram, + pub logical_size_histo: Histogram, pub load_layer_map_histo: Histogram, pub last_record_gauge: IntGauge, pub wait_lsn_time_histo: Histogram, @@ -397,6 +399,9 @@ impl TimelineMetrics { let init_logical_size_histo = STORAGE_TIME .get_metric_with_label_values(&["init logical size", &tenant_id, &timeline_id]) .unwrap(); + let logical_size_histo = STORAGE_TIME + .get_metric_with_label_values(&["logical size", &tenant_id, &timeline_id]) + .unwrap(); let load_layer_map_histo = STORAGE_TIME .get_metric_with_label_values(&["load layer map", &tenant_id, &timeline_id]) .unwrap(); @@ -428,6 +433,7 @@ impl TimelineMetrics { compact_time_histo, create_images_time_histo, init_logical_size_histo, + logical_size_histo, load_layer_map_histo, last_record_gauge, wait_lsn_time_histo, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index ed9d2e8c7a..4621482065 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -72,6 +72,8 @@ pub mod storage_layer; mod timeline; +pub mod size; + use storage_layer::Layer; pub use timeline::Timeline; @@ -120,6 +122,9 @@ pub struct Tenant { /// Makes every timeline to backup their files to remote storage. upload_layers: bool, + + /// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`]. + cached_logical_sizes: tokio::sync::Mutex>, } /// A timeline with some of its files on disk, being initialized. @@ -834,6 +839,7 @@ impl Tenant { remote_index, upload_layers, state, + cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()), } } @@ -955,8 +961,9 @@ impl Tenant { // +-----baz--------> // // - // 1. Grab 'gc_cs' mutex to prevent new timelines from being created - // 2. Scan all timelines, and on each timeline, make note of the + // 1. Grab 'gc_cs' mutex to prevent new timelines from being created while Timeline's + // `gc_infos` are being refreshed + // 2. Scan collected timelines, and on each timeline, make note of the // all the points where other timelines have been branched off. // We will refrain from removing page versions at those LSNs. // 3. For each timeline, scan all layer files on the timeline. @@ -977,6 +984,68 @@ impl Tenant { let mut totals: GcResult = Default::default(); let now = Instant::now(); + let gc_timelines = self.refresh_gc_info_internal(target_timeline_id, horizon, pitr)?; + + // Perform GC for each timeline. + // + // Note that we don't hold the GC lock here because we don't want + // to delay the branch creation task, which requires the GC lock. + // A timeline GC iteration can be slow because it may need to wait for + // compaction (both require `layer_removal_cs` lock), + // but the GC iteration can run concurrently with branch creation. + // + // See comments in [`Tenant::branch_timeline`] for more information + // about why branch creation task can run concurrently with timeline's GC iteration. + for timeline in gc_timelines { + if task_mgr::is_shutdown_requested() { + // We were requested to shut down. Stop and return with the progress we + // made. + break; + } + + // If requested, force flush all in-memory layers to disk first, + // so that they too can be garbage collected. That's + // used in tests, so we want as deterministic results as possible. + if checkpoint_before_gc { + timeline.checkpoint(CheckpointConfig::Forced)?; + info!( + "timeline {} checkpoint_before_gc done", + timeline.timeline_id + ); + } + + let result = timeline.gc()?; + totals += result; + } + + totals.elapsed = now.elapsed(); + Ok(totals) + } + + /// Refreshes the Timeline::gc_info for all timelines, returning the + /// vector of timelines which have [`Timeline::get_last_record_lsn`] past + /// [`Tenant::get_gc_horizon`]. + /// + /// This is usually executed as part of periodic gc, but can now be triggered more often. + pub fn refresh_gc_info(&self) -> anyhow::Result>> { + // since this method can now be called at different rates than the configured gc loop, it + // might be that these configuration values get applied faster than what it was previously, + // since these were only read from the gc task. + let horizon = self.get_gc_horizon(); + let pitr = self.get_pitr_interval(); + + // refresh all timelines + let target_timeline_id = None; + + self.refresh_gc_info_internal(target_timeline_id, horizon, pitr) + } + + fn refresh_gc_info_internal( + &self, + target_timeline_id: Option, + horizon: u64, + pitr: Duration, + ) -> anyhow::Result>> { // grab mutex to prevent new timelines from being created here. let gc_cs = self.gc_cs.lock().unwrap(); @@ -995,9 +1064,6 @@ impl Tenant { timelines .iter() .map(|(timeline_id, timeline_entry)| { - // This is unresolved question for now, how to do gc in presence of remote timelines - // especially when this is combined with branching. - // Somewhat related: https://github.com/neondatabase/neon/issues/999 if let Some(ancestor_timeline_id) = &timeline_entry.get_ancestor_timeline_id() { // If target_timeline is specified, we only need to know branchpoints of its children if let Some(timeline_id) = target_timeline_id { @@ -1052,40 +1118,7 @@ impl Tenant { } drop(gc_cs); - // Perform GC for each timeline. - // - // Note that we don't hold the GC lock here because we don't want - // to delay the branch creation task, which requires the GC lock. - // A timeline GC iteration can be slow because it may need to wait for - // compaction (both require `layer_removal_cs` lock), - // but the GC iteration can run concurrently with branch creation. - // - // See comments in [`Tenant::branch_timeline`] for more information - // about why branch creation task can run concurrently with timeline's GC iteration. - for timeline in gc_timelines { - if task_mgr::is_shutdown_requested() { - // We were requested to shut down. Stop and return with the progress we - // made. - break; - } - - // If requested, force flush all in-memory layers to disk first, - // so that they too can be garbage collected. That's - // used in tests, so we want as deterministic results as possible. - if checkpoint_before_gc { - timeline.checkpoint(CheckpointConfig::Forced)?; - info!( - "timeline {} checkpoint_before_gc done", - timeline.timeline_id - ); - } - - let result = timeline.gc()?; - totals += result; - } - - totals.elapsed = now.elapsed(); - Ok(totals) + Ok(gc_timelines) } /// Branch an existing timeline @@ -1444,6 +1477,25 @@ impl Tenant { Ok(()) } + + /// Gathers inputs from all of the timelines to produce a sizing model input. + /// + /// Future is cancellation safe. Only one calculation can be running at once per tenant. + #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] + pub async fn gather_size_inputs(&self) -> anyhow::Result { + let logical_sizes_at_once = self + .conf + .concurrent_tenant_size_logical_size_queries + .inner(); + + // TODO: Having a single mutex block concurrent reads is unfortunate, but since the queries + // are for testing/experimenting, we tolerate this. + // + // See more for on the issue #2748 condenced out of the initial PR review. + let mut shared_cache = self.cached_logical_sizes.lock().await; + + size::gather_inputs(self, logical_sizes_at_once, &mut *shared_cache).await + } } /// Create the cluster temporarily in 'initdbpath' directory inside the repository diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs new file mode 100644 index 0000000000..f0c611ae39 --- /dev/null +++ b/pageserver/src/tenant/size.rs @@ -0,0 +1,454 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use anyhow::Context; +use tokio::sync::Semaphore; + +use super::Tenant; +use utils::id::TimelineId; +use utils::lsn::Lsn; + +use tracing::*; + +/// Inputs to the actual tenant sizing model +/// +/// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to +/// be a transferrable format between execution environments and developer. +#[serde_with::serde_as] +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct ModelInputs { + updates: Vec, + retention_period: u64, + #[serde_as(as = "HashMap")] + timeline_inputs: HashMap, +} + +/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as +/// part of [`ModelInputs`] from the HTTP api, explaining the inputs. +#[serde_with::serde_as] +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct TimelineInputs { + #[serde_as(as = "serde_with::DisplayFromStr")] + last_record: Lsn, + #[serde_as(as = "serde_with::DisplayFromStr")] + latest_gc_cutoff: Lsn, + #[serde_as(as = "serde_with::DisplayFromStr")] + horizon_cutoff: Lsn, + #[serde_as(as = "serde_with::DisplayFromStr")] + pitr_cutoff: Lsn, + #[serde_as(as = "serde_with::DisplayFromStr")] + next_gc_cutoff: Lsn, +} + +pub(super) async fn gather_inputs( + tenant: &Tenant, + limit: &Arc, + logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, +) -> anyhow::Result { + // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to + // our advantage with `?` error handling. + let mut joinset = tokio::task::JoinSet::new(); + + let timelines = tenant + .refresh_gc_info() + .context("Failed to refresh gc_info before gathering inputs")?; + + if timelines.is_empty() { + // All timelines are below tenant's gc_horizon; alternative would be to use + // Tenant::list_timelines but then those gc_info's would not be updated yet, possibly + // missing GcInfo::retain_lsns or having obsolete values for cutoff's. + return Ok(ModelInputs { + updates: vec![], + retention_period: 0, + timeline_inputs: HashMap::new(), + }); + } + + // record the used/inserted cache keys here, to remove extras not to start leaking + // after initial run the cache should be quite stable, but live timelines will eventually + // require new lsns to be inspected. + let mut needed_cache = HashSet::<(TimelineId, Lsn)>::new(); + + let mut updates = Vec::new(); + + // record the per timline values used to determine `retention_period` + let mut timeline_inputs = HashMap::with_capacity(timelines.len()); + + // used to determine the `retention_period` for the size model + let mut max_cutoff_distance = None; + + // this will probably conflict with on-demand downloaded layers, or at least force them all + // to be downloaded + for timeline in timelines { + let last_record_lsn = timeline.get_last_record_lsn(); + + let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = { + // there's a race between the update (holding tenant.gc_lock) and this read but it + // might not be an issue, because it's not for Timeline::gc + let gc_info = timeline.gc_info.read().unwrap(); + + // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a + // new gc run, which we have no control over. + // maybe this should be moved to gc_info.next_gc_cutoff()? + let next_gc_cutoff = std::cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff); + + let maybe_cutoff = if next_gc_cutoff > timeline.get_ancestor_lsn() { + // only include these if they are after branching point; otherwise we would end up + // with duplicate updates before the actual branching. + Some((next_gc_cutoff, LsnKind::GcCutOff)) + } else { + None + }; + + // this assumes there are no other lsns than the branchpoints + let lsns = gc_info + .retain_lsns + .iter() + .inspect(|&&lsn| { + trace!( + timeline_id=%timeline.timeline_id, + "retained lsn: {lsn:?}, is_before_ancestor_lsn={}", + lsn < timeline.get_ancestor_lsn() + ) + }) + .filter(|&&lsn| lsn > timeline.get_ancestor_lsn()) + .copied() + .map(|lsn| (lsn, LsnKind::BranchPoint)) + .chain(maybe_cutoff) + .collect::>(); + + ( + lsns, + gc_info.horizon_cutoff, + gc_info.pitr_cutoff, + next_gc_cutoff, + ) + }; + + // update this to have a retention_period later for the tenant_size_model + // tenant_size_model compares this to the last segments start_lsn + if let Some(cutoff_distance) = last_record_lsn.checked_sub(next_gc_cutoff) { + match max_cutoff_distance.as_mut() { + Some(max) => { + *max = std::cmp::max(*max, cutoff_distance); + } + _ => { + max_cutoff_distance = Some(cutoff_distance); + } + } + } + + // all timelines branch from something, because it might be impossible to pinpoint + // which is the tenant_size_model's "default" branch. + updates.push(Update { + lsn: timeline.get_ancestor_lsn(), + command: Command::BranchFrom(timeline.get_ancestor_timeline_id()), + timeline_id: timeline.timeline_id, + }); + + for (lsn, _kind) in &interesting_lsns { + if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) { + updates.push(Update { + lsn: *lsn, + timeline_id: timeline.timeline_id, + command: Command::Update(*size), + }); + + needed_cache.insert((timeline.timeline_id, *lsn)); + } else { + let timeline = Arc::clone(&timeline); + let parallel_size_calcs = Arc::clone(limit); + joinset.spawn(calculate_logical_size(parallel_size_calcs, timeline, *lsn)); + } + } + + timeline_inputs.insert( + timeline.timeline_id, + TimelineInputs { + last_record: last_record_lsn, + // this is not used above, because it might not have updated recently enough + latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), + horizon_cutoff, + pitr_cutoff, + next_gc_cutoff, + }, + ); + } + + let mut have_any_error = false; + + while let Some(res) = joinset.join_next().await { + // each of these come with Result, JoinError> + // because of spawn + spawn_blocking + let res = res.and_then(|inner| inner); + match res { + Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size))) => { + debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated"); + + logical_size_cache.insert((timeline.timeline_id, lsn), size); + needed_cache.insert((timeline.timeline_id, lsn)); + + updates.push(Update { + lsn, + timeline_id: timeline.timeline_id, + command: Command::Update(size), + }); + } + Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error))) => { + warn!( + timeline_id=%timeline.timeline_id, + "failed to calculate logical size at {lsn}: {error:#}" + ); + have_any_error = true; + } + Err(join_error) if join_error.is_cancelled() => { + unreachable!("we are not cancelling any of the futures, nor should be"); + } + Err(join_error) => { + // cannot really do anything, as this panic is likely a bug + error!("logical size query panicked: {join_error:#}"); + have_any_error = true; + } + } + } + + // prune any keys not needed anymore; we record every used key and added key. + logical_size_cache.retain(|key, _| needed_cache.contains(key)); + + if have_any_error { + // we cannot complete this round, because we are missing data. + // we have however cached all we were able to request calculation on. + anyhow::bail!("failed to calculate some logical_sizes"); + } + + // the data gathered to updates is per lsn, regardless of the branch, so we can use it to + // our advantage, not requiring a sorted container or graph walk. + // + // for branch points, which come as multiple updates at the same LSN, the Command::Update + // is needed before a branch is made out of that branch Command::BranchFrom. this is + // handled by the variant order in `Command`. + updates.sort_unstable(); + + let retention_period = match max_cutoff_distance { + Some(max) => max.0, + None => { + anyhow::bail!("the first branch should have a gc_cutoff after it's branch point at 0") + } + }; + + Ok(ModelInputs { + updates, + retention_period, + timeline_inputs, + }) +} + +impl ModelInputs { + pub fn calculate(&self) -> anyhow::Result { + // Option is used for "naming" the branches because it is assumed to be + // impossible to always determine the a one main branch. + let mut storage = tenant_size_model::Storage::>::new(None); + + // tracking these not to require modifying the current implementation of the size model, + // which works in relative LSNs and sizes. + let mut last_state: HashMap = HashMap::new(); + + for update in &self.updates { + let Update { + lsn, + command: op, + timeline_id, + } = update; + match op { + Command::Update(sz) => { + let latest = last_state.get_mut(timeline_id).ok_or_else(|| { + anyhow::anyhow!( + "ordering-mismatch: there must had been a previous state for {timeline_id}" + ) + })?; + + let lsn_bytes = { + let Lsn(now) = lsn; + let Lsn(prev) = latest.0; + debug_assert!(prev <= *now, "self.updates should had been sorted"); + now - prev + }; + + let size_diff = + i64::try_from(*sz as i128 - latest.1 as i128).with_context(|| { + format!("size difference i64 overflow for {timeline_id}") + })?; + + storage.modify_branch(&Some(*timeline_id), "".into(), lsn_bytes, size_diff); + *latest = (*lsn, *sz); + } + Command::BranchFrom(parent) => { + storage.branch(parent, Some(*timeline_id)); + + let size = parent + .as_ref() + .and_then(|id| last_state.get(id)) + .map(|x| x.1) + .unwrap_or(0); + last_state.insert(*timeline_id, (*lsn, size)); + } + } + } + + Ok(storage.calculate(self.retention_period).total_children()) + } +} + +/// Single size model update. +/// +/// Sizing model works with relative increments over latest branch state. +/// Updates are absolute, so additional state needs to be tracked when applying. +#[serde_with::serde_as] +#[derive( + Debug, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize, +)] +struct Update { + #[serde_as(as = "serde_with::DisplayFromStr")] + lsn: utils::lsn::Lsn, + command: Command, + #[serde_as(as = "serde_with::DisplayFromStr")] + timeline_id: TimelineId, +} + +#[serde_with::serde_as] +#[derive(PartialOrd, PartialEq, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "snake_case")] +enum Command { + Update(u64), + BranchFrom(#[serde_as(as = "Option")] Option), +} + +impl std::fmt::Debug for Command { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // custom one-line implementation makes it more enjoyable to read {:#?} avoiding 3 + // linebreaks + match self { + Self::Update(arg0) => write!(f, "Update({arg0})"), + Self::BranchFrom(arg0) => write!(f, "BranchFrom({arg0:?})"), + } + } +} + +#[derive(Debug, Clone, Copy)] +enum LsnKind { + BranchPoint, + GcCutOff, +} + +/// Newtype around the tuple that carries the timeline at lsn logical size calculation. +struct TimelineAtLsnSizeResult( + Arc, + utils::lsn::Lsn, + anyhow::Result, +); + +#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))] +async fn calculate_logical_size( + limit: Arc, + timeline: Arc, + lsn: utils::lsn::Lsn, +) -> Result { + let permit = tokio::sync::Semaphore::acquire_owned(limit) + .await + .expect("global semaphore should not had been closed"); + + tokio::task::spawn_blocking(move || { + let _permit = permit; + let size_res = timeline.calculate_logical_size(lsn); + TimelineAtLsnSizeResult(timeline, lsn, size_res) + }) + .await +} + +#[test] +fn updates_sort() { + use std::str::FromStr; + use utils::id::TimelineId; + use utils::lsn::Lsn; + + let ids = [ + TimelineId::from_str("7ff1edab8182025f15ae33482edb590a").unwrap(), + TimelineId::from_str("b1719e044db05401a05a2ed588a3ad3f").unwrap(), + TimelineId::from_str("b68d6691c895ad0a70809470020929ef").unwrap(), + ]; + + // try through all permutations + let ids = [ + [&ids[0], &ids[1], &ids[2]], + [&ids[0], &ids[2], &ids[1]], + [&ids[1], &ids[0], &ids[2]], + [&ids[1], &ids[2], &ids[0]], + [&ids[2], &ids[0], &ids[1]], + [&ids[2], &ids[1], &ids[0]], + ]; + + for ids in ids { + // apply a fixture which uses a permutation of ids + let commands = [ + Update { + lsn: Lsn(0), + command: Command::BranchFrom(None), + timeline_id: *ids[0], + }, + Update { + lsn: Lsn::from_str("0/67E7618").unwrap(), + command: Command::Update(43696128), + timeline_id: *ids[0], + }, + Update { + lsn: Lsn::from_str("0/67E7618").unwrap(), + command: Command::BranchFrom(Some(*ids[0])), + timeline_id: *ids[1], + }, + Update { + lsn: Lsn::from_str("0/76BE4F0").unwrap(), + command: Command::Update(41844736), + timeline_id: *ids[1], + }, + Update { + lsn: Lsn::from_str("0/10E49380").unwrap(), + command: Command::Update(42164224), + timeline_id: *ids[0], + }, + Update { + lsn: Lsn::from_str("0/10E49380").unwrap(), + command: Command::BranchFrom(Some(*ids[0])), + timeline_id: *ids[2], + }, + Update { + lsn: Lsn::from_str("0/11D74910").unwrap(), + command: Command::Update(42172416), + timeline_id: *ids[2], + }, + Update { + lsn: Lsn::from_str("0/12051E98").unwrap(), + command: Command::Update(42196992), + timeline_id: *ids[0], + }, + ]; + + let mut sorted = commands; + + // these must sort in the same order, regardless of how the ids sort + // which is why the timeline_id is the last field + sorted.sort_unstable(); + + assert_eq!(commands, sorted, "{:#?} vs. {:#?}", commands, sorted); + } +} + +#[test] +fn verify_size_for_multiple_branches() { + // this is generated from integration test test_tenant_size_with_multiple_branches, but this way + // it has the stable lsn's + let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072,"timeline_inputs":{"cd9d9409c216e64bf580904facedb01b":{"last_record":"0/18D5E40","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/18B5E40","pitr_cutoff":"0/18B5E40","next_gc_cutoff":"0/18B5E40"},"10b532a550540bc15385eac4edde416a":{"last_record":"0/1839818","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/1819818","pitr_cutoff":"0/1819818","next_gc_cutoff":"0/1819818"},"230fc9d756f7363574c0d66533564dcc":{"last_record":"0/222F438","latest_gc_cutoff":"0/169ACF0","horizon_cutoff":"0/220F438","pitr_cutoff":"0/220F438","next_gc_cutoff":"0/220F438"}}}"#; + + let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); + + assert_eq!(inputs.calculate().unwrap(), 36_409_872); +} diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index d63429ea6a..279da70128 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -272,6 +272,11 @@ impl LogicalSize { self.size_added_after_initial .fetch_add(delta, AtomicOrdering::SeqCst); } + + /// Returns the initialized (already calculated) value, if any. + fn initialized_size(&self) -> Option { + self.initial_logical_size.get().copied() + } } pub struct WalReceiverInfo { @@ -979,9 +984,26 @@ impl Timeline { /// Calculate the logical size of the database at the latest LSN. /// /// NOTE: counted incrementally, includes ancestors, this can be a slow operation. - fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result { - info!("Calculating logical size for timeline {}", self.timeline_id); - let timer = self.metrics.init_logical_size_histo.start_timer(); + pub fn calculate_logical_size(&self, up_to_lsn: Lsn) -> anyhow::Result { + info!( + "Calculating logical size for timeline {} at {}", + self.timeline_id, up_to_lsn + ); + let timer = if up_to_lsn == self.initdb_lsn { + if let Some(size) = self.current_logical_size.initialized_size() { + if size != 0 { + // non-zero size means that the size has already been calculated by this method + // after startup. if the logical size is for a new timeline without layers the + // size will be zero, and we cannot use that, or this caching strategy until + // pageserver restart. + return Ok(size); + } + } + + self.metrics.init_logical_size_histo.start_timer() + } else { + self.metrics.logical_size_histo.start_timer() + }; let logical_size = self.get_current_logical_size_non_incremental(up_to_lsn)?; debug!("calculated logical size: {logical_size}"); timer.stop_and_record(); diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b62c80824a..63b809a786 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1047,6 +1047,21 @@ class PageserverHttpClient(requests.Session): assert isinstance(res_json, dict) return res_json + def tenant_size(self, tenant_id: TenantId) -> int: + """ + Returns the tenant size, together with the model inputs as the second tuple item. + """ + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/size") + self.verbose_error(res) + res = res.json() + assert isinstance(res, dict) + assert TenantId(res["id"]) == tenant_id + size = res["size"] + assert type(size) == int + # there are additional inputs, which are the collected raw information before being fed to the tenant_size_model + # there are no tests for those right now. + return size + def timeline_list(self, tenant_id: TenantId) -> List[Dict[str, Any]]: res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline") self.verbose_error(res) @@ -2742,12 +2757,12 @@ def wait_for_last_record_lsn( tenant: TenantId, timeline: TimelineId, lsn: Lsn, -): - """waits for pageserver to catch up to a certain lsn""" +) -> Lsn: + """waits for pageserver to catch up to a certain lsn, returns the last observed lsn.""" for i in range(10): current_lsn = last_record_lsn(pageserver_http_client, tenant, timeline) if current_lsn >= lsn: - return + return current_lsn log.info( "waiting for last_record_lsn to reach {}, now {}, iteration {}".format( lsn, current_lsn, i + 1 @@ -2759,10 +2774,12 @@ def wait_for_last_record_lsn( ) -def wait_for_last_flush_lsn(env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId): - """Wait for pageserver to catch up the latest flush LSN""" +def wait_for_last_flush_lsn( + env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId +) -> Lsn: + """Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn.""" last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) - wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn) + return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn) def fork_at_current_lsn( diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py new file mode 100644 index 0000000000..ecf78499bb --- /dev/null +++ b/test_runner/regress/test_tenant_size.py @@ -0,0 +1,276 @@ +import time +from typing import List, Tuple + +from fixtures.log_helper import log +from fixtures.neon_fixtures import ( + NeonEnv, + NeonEnvBuilder, + PageserverApiException, + wait_for_last_flush_lsn, +) +from fixtures.types import Lsn + + +def test_empty_tenant_size(neon_simple_env: NeonEnv): + env = neon_simple_env + (tenant_id, _) = env.neon_cli.create_tenant() + http_client = env.pageserver.http_client() + size = http_client.tenant_size(tenant_id) + + # we should never have zero, because there should be the initdb however + # this is questionable if we should have anything in this case, as the + # gc_cutoff is negative + assert ( + size == 0 + ), "initial implementation returns zero tenant_size before last_record_lsn is past gc_horizon" + + with env.postgres.create_start("main", tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("SELECT 1") + row = cur.fetchone() + assert row is not None + assert row[0] == 1 + size = http_client.tenant_size(tenant_id) + assert size == 0, "starting idle compute should not change the tenant size" + + # the size should be the same, until we increase the size over the + # gc_horizon + size = http_client.tenant_size(tenant_id) + assert size == 0, "tenant_size should not be affected by shutdown of compute" + + +def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): + """ + Operate on single branch reading the tenants size after each transaction. + """ + + # gc and compaction is not wanted automatically + # the pitr_interval here is quite problematic, so we cannot really use it. + # it'd have to be calibrated per test executing env. + neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='1h', gc_period='1h', pitr_interval='0sec', gc_horizon={128 * 1024}}}" + # in this test we don't run gc or compaction, but the tenant size is + # expected to use the "next gc" cutoff, so the small amounts should work + + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + branch_name, timeline_id = env.neon_cli.list_timelines(tenant_id)[0] + + http_client = env.pageserver.http_client() + + collected_responses: List[Tuple[Lsn, int]] = [] + + with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg: + with pg.cursor() as cur: + cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)") + + batch_size = 100 + + i = 0 + while True: + with pg.cursor() as cur: + cur.execute( + f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)", + (i, i), + ) + + i += 1 + + current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + size = http_client.tenant_size(tenant_id) + + if len(collected_responses) > 0: + prev = collected_responses[-1][1] + if size == 0: + assert prev == 0 + else: + assert size > prev + + collected_responses.append((current_lsn, size)) + + if len(collected_responses) > 2: + break + + while True: + with pg.cursor() as cur: + cur.execute( + f"UPDATE t0 SET i = -i WHERE i IN (SELECT i FROM t0 WHERE i > 0 LIMIT {batch_size})" + ) + updated = cur.rowcount + + if updated == 0: + break + + current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + size = http_client.tenant_size(tenant_id) + prev = collected_responses[-1][1] + assert size > prev, "tenant_size should grow with updates" + collected_responses.append((current_lsn, size)) + + while True: + with pg.cursor() as cur: + cur.execute(f"DELETE FROM t0 WHERE i IN (SELECT i FROM t0 LIMIT {batch_size})") + deleted = cur.rowcount + + if deleted == 0: + break + + current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + size = http_client.tenant_size(tenant_id) + prev = collected_responses[-1][1] + assert ( + size > prev + ), "even though rows have been deleted, the tenant_size should increase" + collected_responses.append((current_lsn, size)) + + with pg.cursor() as cur: + cur.execute("DROP TABLE t0") + + current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) + + size = http_client.tenant_size(tenant_id) + prev = collected_responses[-1][1] + assert size > prev, "dropping table grows tenant_size" + collected_responses.append((current_lsn, size)) + + # this isn't too many lines to forget for a while. observed while + # developing these tests that locally the value is a bit more than what we + # get in the ci. + for lsn, size in collected_responses: + log.info(f"collected: {lsn}, {size}") + + env.pageserver.stop() + env.pageserver.start() + + size_after = http_client.tenant_size(tenant_id) + prev = collected_responses[-1][1] + + assert size_after == prev, "size after restarting pageserver should not have changed" + + +def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder): + """ + Reported size goes up while branches or rows are being added, goes down after removing branches. + """ + + gc_horizon = 128 * 1024 + + neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='1h', gc_period='1h', pitr_interval='0sec', gc_horizon={gc_horizon}}}" + + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + main_branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0] + + http_client = env.pageserver.http_client() + + main_pg = env.postgres.create_start(main_branch_name, tenant_id=tenant_id) + + batch_size = 10000 + + with main_pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t0 AS SELECT i::bigint n FROM generate_series(0, {batch_size}) s(i)" + ) + + wait_for_last_flush_lsn(env, main_pg, tenant_id, main_timeline_id) + size_at_branch = http_client.tenant_size(tenant_id) + assert size_at_branch > 0 + + first_branch_timeline_id = env.neon_cli.create_branch( + "first-branch", main_branch_name, tenant_id + ) + + # unsure why this happens, the size difference is more than a page alignment + size_after_first_branch = http_client.tenant_size(tenant_id) + assert size_after_first_branch > size_at_branch + assert size_after_first_branch - size_at_branch == gc_horizon + + first_branch_pg = env.postgres.create_start("first-branch", tenant_id=tenant_id) + + with first_branch_pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, {batch_size}) s(i)" + ) + + wait_for_last_flush_lsn(env, first_branch_pg, tenant_id, first_branch_timeline_id) + size_after_growing_first_branch = http_client.tenant_size(tenant_id) + assert size_after_growing_first_branch > size_after_first_branch + + with main_pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t1 AS SELECT i::bigint n FROM generate_series(0, 2*{batch_size}) s(i)" + ) + + wait_for_last_flush_lsn(env, main_pg, tenant_id, main_timeline_id) + size_after_continuing_on_main = http_client.tenant_size(tenant_id) + assert size_after_continuing_on_main > size_after_growing_first_branch + + second_branch_timeline_id = env.neon_cli.create_branch( + "second-branch", main_branch_name, tenant_id + ) + size_after_second_branch = http_client.tenant_size(tenant_id) + assert size_after_second_branch > size_after_continuing_on_main + + second_branch_pg = env.postgres.create_start("second-branch", tenant_id=tenant_id) + + with second_branch_pg.cursor() as cur: + cur.execute( + f"CREATE TABLE t2 AS SELECT i::bigint n FROM generate_series(0, 3*{batch_size}) s(i)" + ) + + wait_for_last_flush_lsn(env, second_branch_pg, tenant_id, second_branch_timeline_id) + size_after_growing_second_branch = http_client.tenant_size(tenant_id) + assert size_after_growing_second_branch > size_after_second_branch + + with second_branch_pg.cursor() as cur: + cur.execute("DROP TABLE t0") + cur.execute("DROP TABLE t1") + cur.execute("VACUUM FULL") + + wait_for_last_flush_lsn(env, second_branch_pg, tenant_id, second_branch_timeline_id) + size_after_thinning_branch = http_client.tenant_size(tenant_id) + assert ( + size_after_thinning_branch > size_after_growing_second_branch + ), "tenant_size should grow with dropped tables and full vacuum" + + first_branch_pg.stop_and_destroy() + second_branch_pg.stop_and_destroy() + main_pg.stop() + env.pageserver.stop() + env.pageserver.start() + + # chance of compaction and gc on startup might have an effect on the + # tenant_size but so far this has been reliable, even though at least gc + # and tenant_size race for the same locks + size_after = http_client.tenant_size(tenant_id) + assert size_after == size_after_thinning_branch + + # teardown, delete branches, and the size should be going down + deleted = False + for _ in range(10): + try: + http_client.timeline_delete(tenant_id, first_branch_timeline_id) + deleted = True + break + except PageserverApiException as e: + # compaction is ok but just retry if this fails; related to #2442 + if "cannot lock compaction critical section" in str(e): + time.sleep(1) + continue + raise + + assert deleted + + size_after_deleting_first = http_client.tenant_size(tenant_id) + assert size_after_deleting_first < size_after_thinning_branch + + http_client.timeline_delete(tenant_id, second_branch_timeline_id) + size_after_deleting_second = http_client.tenant_size(tenant_id) + assert size_after_deleting_second < size_after_deleting_first + + assert size_after_deleting_second < size_after_continuing_on_main + assert size_after_deleting_second > size_after_first_branch