From ddbdcdddd7886c72da740b990471dea9ed001413 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 16 Feb 2023 10:53:46 +0200 Subject: [PATCH] Tenant size calculation: refactor, rewrite, and add SVG (#2817) Refactor the tenant_size_model code. Segment now contains just the minimum amount of information needed to calculate the size. Other information that is useful for building up the segment tree, and for display purposes, is now kept elsewhere. The code in 'main.rs' has a new ScenarioBuilder struct for that. Calculating which Segments are "needed" is now the responsibility of the caller of tenant_size_mode, not part of the calculation itself. So it's up to the caller to make all the decisions with retention periods for each branch. The output of the sizing calculation is now a Vec of SizeResults, rather than a tree. It uses a tree representation internally, when doing the calculation, but it's not exposed to the caller anymore. Refactor the way the recursive calculation is performed. Rewrite the code in size.rs that builds the Segment model. Get rid of the intermediate representation with Update structs. Build the Segments directly, with some local HashMaps and Vecs to track branch points to help with that. retention_period is now an input to gather_inputs(), rather than an output. Update pageserver http API: rename /size endpoint to /synthetic_size with following parameters: - /synthetic_size?inputs_only to get debug info; - /synthetic_size?retention_period=0 to override cutoff that is used to calculate the size; pass header -H "Accept: text/html" to get HTML output, otherwise JSON is returned Update python tests and openapi spec. --------- Co-authored-by: Anastasia Lubennikova Co-authored-by: Joonas Koivunen --- Cargo.lock | 2 + libs/tenant_size_model/Cargo.toml | 2 + libs/tenant_size_model/src/calculation.rs | 219 ++++ libs/tenant_size_model/src/lib.rs | 427 +------- libs/tenant_size_model/src/main.rs | 269 ----- libs/tenant_size_model/src/svg.rs | 193 ++++ libs/tenant_size_model/tests/tests.rs | 313 ++++++ pageserver/src/http/openapi_spec.yml | 7 + pageserver/src/http/routes.rs | 109 +- pageserver/src/tenant.rs | 40 +- pageserver/src/tenant/size.rs | 1159 +++++++++------------ pageserver/src/walredo.rs | 3 +- test_runner/fixtures/neon_fixtures.py | 12 +- test_runner/regress/test_tenant_size.py | 180 +++- 14 files changed, 1581 insertions(+), 1354 deletions(-) create mode 100644 libs/tenant_size_model/src/calculation.rs delete mode 100644 libs/tenant_size_model/src/main.rs create mode 100644 libs/tenant_size_model/src/svg.rs create mode 100644 libs/tenant_size_model/tests/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 67e54d3833..98c4dca09b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3809,6 +3809,8 @@ name = "tenant_size_model" version = "0.1.0" dependencies = [ "anyhow", + "serde", + "serde_json", "workspace_hack", ] diff --git a/libs/tenant_size_model/Cargo.toml b/libs/tenant_size_model/Cargo.toml index a5f0160f35..15e78932a8 100644 --- a/libs/tenant_size_model/Cargo.toml +++ b/libs/tenant_size_model/Cargo.toml @@ -7,5 +7,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +serde.workspace = true +serde_json.workspace = true workspace_hack.workspace = true diff --git a/libs/tenant_size_model/src/calculation.rs b/libs/tenant_size_model/src/calculation.rs new file mode 100644 index 0000000000..093b053675 --- /dev/null +++ b/libs/tenant_size_model/src/calculation.rs @@ -0,0 +1,219 @@ +use crate::{SegmentMethod, SegmentSizeResult, SizeResult, StorageModel}; + +// +// *-g--*---D---> +// / +// / +// / *---b----*-B---> +// / / +// / / +// -----*--e---*-----f----* C +// E \ +// \ +// *--a---*---A--> +// +// If A and B need to be retained, is it cheaper to store +// snapshot at C+a+b, or snapshots at A and B ? +// +// If D also needs to be retained, which is cheaper: +// +// 1. E+g+e+f+a+b +// 2. D+C+a+b +// 3. D+A+B + +/// [`Segment`] which has had it's size calculated. +#[derive(Clone, Debug)] +struct SegmentSize { + method: SegmentMethod, + + // calculated size of this subtree, using this method + accum_size: u64, + + seg_id: usize, + children: Vec, +} + +struct SizeAlternatives { + // cheapest alternative if parent is available. + incremental: SegmentSize, + + // cheapest alternative if parent node is not available + non_incremental: Option, +} + +impl StorageModel { + pub fn calculate(&self) -> SizeResult { + // Build adjacency list. 'child_list' is indexed by segment id. Each entry + // contains a list of all child segments of the segment. + let mut roots: Vec = Vec::new(); + let mut child_list: Vec> = Vec::new(); + child_list.resize(self.segments.len(), Vec::new()); + + for (seg_id, seg) in self.segments.iter().enumerate() { + if let Some(parent_id) = seg.parent { + child_list[parent_id].push(seg_id); + } else { + roots.push(seg_id); + } + } + + let mut segment_results = Vec::new(); + segment_results.resize( + self.segments.len(), + SegmentSizeResult { + method: SegmentMethod::Skipped, + accum_size: 0, + }, + ); + + let mut total_size = 0; + for root in roots { + if let Some(selected) = self.size_here(root, &child_list).non_incremental { + StorageModel::fill_selected_sizes(&selected, &mut segment_results); + total_size += selected.accum_size; + } else { + // Couldn't find any way to get this root. Error? + } + } + + SizeResult { + total_size, + segments: segment_results, + } + } + + fn fill_selected_sizes(selected: &SegmentSize, result: &mut Vec) { + result[selected.seg_id] = SegmentSizeResult { + method: selected.method, + accum_size: selected.accum_size, + }; + // recurse to children + for child in selected.children.iter() { + StorageModel::fill_selected_sizes(child, result); + } + } + + // + // This is the core of the sizing calculation. + // + // This is a recursive function, that for each Segment calculates the best way + // to reach all the Segments that are marked as needed in this subtree, under two + // different conditions: + // a) when the parent of this segment is available (as a snaphot or through WAL), and + // b) when the parent of this segment is not available. + // + fn size_here(&self, seg_id: usize, child_list: &Vec>) -> SizeAlternatives { + let seg = &self.segments[seg_id]; + // First figure out the best way to get each child + let mut children = Vec::new(); + for child_id in &child_list[seg_id] { + children.push(self.size_here(*child_id, child_list)) + } + + // Method 1. If this node is not needed, we can skip it as long as we + // take snapshots later in each sub-tree + let snapshot_later = if !seg.needed { + let mut snapshot_later = SegmentSize { + seg_id, + method: SegmentMethod::Skipped, + accum_size: 0, + children: Vec::new(), + }; + + let mut possible = true; + for child in children.iter() { + if let Some(non_incremental) = &child.non_incremental { + snapshot_later.accum_size += non_incremental.accum_size; + snapshot_later.children.push(non_incremental.clone()) + } else { + possible = false; + break; + } + } + if possible { + Some(snapshot_later) + } else { + None + } + } else { + None + }; + + // Method 2. Get a snapshot here. This assumed to be possible, if the 'size' of + // this Segment was given. + let snapshot_here = if !seg.needed || seg.parent.is_none() { + if let Some(snapshot_size) = seg.size { + let mut snapshot_here = SegmentSize { + seg_id, + method: SegmentMethod::SnapshotHere, + accum_size: snapshot_size, + children: Vec::new(), + }; + for child in children.iter() { + snapshot_here.accum_size += child.incremental.accum_size; + snapshot_here.children.push(child.incremental.clone()) + } + Some(snapshot_here) + } else { + None + } + } else { + None + }; + + // Method 3. Use WAL to get here from parent + let wal_here = { + let mut wal_here = SegmentSize { + seg_id, + method: SegmentMethod::Wal, + accum_size: if let Some(parent_id) = seg.parent { + seg.lsn - self.segments[parent_id].lsn + } else { + 0 + }, + children: Vec::new(), + }; + for child in children { + wal_here.accum_size += child.incremental.accum_size; + wal_here.children.push(child.incremental) + } + wal_here + }; + + // If the parent is not available, what's the cheapest method involving + // a snapshot here or later? + let mut cheapest_non_incremental: Option = None; + if let Some(snapshot_here) = snapshot_here { + cheapest_non_incremental = Some(snapshot_here); + } + if let Some(snapshot_later) = snapshot_later { + // Use <=, to prefer skipping if the size is equal + if let Some(parent) = &cheapest_non_incremental { + if snapshot_later.accum_size <= parent.accum_size { + cheapest_non_incremental = Some(snapshot_later); + } + } else { + cheapest_non_incremental = Some(snapshot_later); + } + } + + // And what's the cheapest method, if the parent is available? + let cheapest_incremental = if let Some(cheapest_non_incremental) = &cheapest_non_incremental + { + // Is it cheaper to use a snapshot here or later, anyway? + // Use <, to prefer Wal over snapshot if the cost is the same + if wal_here.accum_size < cheapest_non_incremental.accum_size { + wal_here + } else { + cheapest_non_incremental.clone() + } + } else { + wal_here + }; + + SizeAlternatives { + incremental: cheapest_incremental, + non_incremental: cheapest_non_incremental, + } + } +} diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs index b156e1be9d..c151e3b42c 100644 --- a/libs/tenant_size_model/src/lib.rs +++ b/libs/tenant_size_model/src/lib.rs @@ -1,401 +1,70 @@ -use std::borrow::Cow; -use std::collections::HashMap; +//! Synthetic size calculation -use anyhow::Context; +mod calculation; +pub mod svg; -/// Pricing model or history size builder. +/// StorageModel is the input to the synthetic size calculation. It represents +/// a tree of timelines, with just the information that's needed for the +/// calculation. This doesn't track timeline names or where each timeline +/// begins and ends, for example. Instead, it consists of "points of interest" +/// on the timelines. A point of interest could be the timeline start or end point, +/// the oldest point on a timeline that needs to be retained because of PITR +/// cutoff, or snapshot points named by the user. For each such point, and the +/// edge connecting the points (implicit in Segment), we store information about +/// whether we need to be able to recover to the point, and if known, the logical +/// size at the point. /// -/// Maintains knowledge of the branches and their modifications. Generic over the branch name key -/// type. -pub struct Storage { - segments: Vec, - - /// Mapping from the branch name to the index of a segment describing it's latest state. - branches: HashMap, +/// The segments must form a well-formed tree, with no loops. +#[derive(serde::Serialize)] +pub struct StorageModel { + pub segments: Vec, } -/// Snapshot of a branch. -#[derive(Clone, Debug, Eq, PartialEq)] +/// Segment represents one point in the tree of branches, *and* the edge that leads +/// to it (if any). We don't need separate structs for points and edges, because each +/// point can have only one parent. +/// +/// When 'needed' is true, it means that we need to be able to reconstruct +/// any version between 'parent.lsn' and 'lsn'. If you want to represent that only +/// a single point is needed, create two Segments with the same lsn, and mark only +/// the child as needed. +/// +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct Segment { /// Previous segment index into ['Storage::segments`], if any. - parent: Option, + pub parent: Option, - /// Description of how did we get to this state. - /// - /// Mainly used in the original scenarios 1..=4 with insert, delete and update. Not used when - /// modifying a branch directly. - pub op: Cow<'static, str>, + /// LSN at this point + pub lsn: u64, - /// LSN before this state - start_lsn: u64, + /// Logical size at this node, if known. + pub size: Option, - /// LSN at this state - pub end_lsn: u64, - - /// Logical size before this state - start_size: u64, - - /// Logical size at this state. Can be None in the last Segment of a branch. - pub end_size: Option, - - /// Indices to [`Storage::segments`] - /// - /// FIXME: this could be an Option - children_after: Vec, - - /// Determined by `retention_period` given to [`Storage::calculate`] + /// If true, the segment from parent to this node is needed by `retention_period` pub needed: bool, } -// -// -// -// -// *-g--*---D---> -// / -// / -// / *---b----*-B---> -// / / -// / / -// -----*--e---*-----f----* C -// E \ -// \ -// *--a---*---A--> -// -// If A and B need to be retained, is it cheaper to store -// snapshot at C+a+b, or snapshots at A and B ? -// -// If D also needs to be retained, which is cheaper: -// -// 1. E+g+e+f+a+b -// 2. D+C+a+b -// 3. D+A+B +/// Result of synthetic size calculation. Returned by StorageModel::calculate() +pub struct SizeResult { + pub total_size: u64, -/// [`Segment`] which has had it's size calculated. -pub struct SegmentSize { - pub seg_id: usize, - - pub method: SegmentMethod, - - this_size: u64, - - pub children: Vec, + // This has same length as the StorageModel::segments vector in the input. + // Each entry in this array corresponds to the entry with same index in + // StorageModel::segments. + pub segments: Vec, } -impl SegmentSize { - fn total(&self) -> u64 { - self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total()) - } - - pub fn total_children(&self) -> u64 { - if self.method == SnapshotAfter { - self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total()) - } else { - self.children.iter().fold(0, |acc, x| acc + x.total()) - } - } +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct SegmentSizeResult { + pub method: SegmentMethod, + // calculated size of this subtree, using this method + pub accum_size: u64, } /// Different methods to retain history from a particular state -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub enum SegmentMethod { - SnapshotAfter, - Wal, - WalNeeded, + SnapshotHere, // A logical snapshot is needed after this segment + Wal, // Keep WAL leading up to this node Skipped, } - -use SegmentMethod::*; - -impl Storage { - /// Creates a new storage with the given default branch name. - pub fn new(initial_branch: K) -> Storage { - let init_segment = Segment { - op: "".into(), - needed: false, - parent: None, - start_lsn: 0, - end_lsn: 0, - start_size: 0, - end_size: Some(0), - children_after: Vec::new(), - }; - - Storage { - segments: vec![init_segment], - branches: HashMap::from([(initial_branch, 0)]), - } - } - - /// Advances the branch with a new point, at given LSN. - pub fn insert_point( - &mut self, - branch: &Q, - op: Cow<'static, str>, - lsn: u64, - size: Option, - ) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; - let newseg_id = self.segments.len(); - let lastseg = &mut self.segments[lastseg_id]; - - assert!(lsn > lastseg.end_lsn); - - let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; - - let newseg = Segment { - op, - parent: Some(lastseg_id), - start_lsn: lastseg.end_lsn, - end_lsn: lsn, - start_size, - end_size: size, - children_after: Vec::new(), - needed: false, - }; - lastseg.children_after.push(newseg_id); - - self.segments.push(newseg); - *self.branches.get_mut(branch).expect("read already") = newseg_id; - - Ok(()) - } - - /// Advances the branch with the named operation, by the relative LSN and logical size bytes. - pub fn modify_branch( - &mut self, - branch: &Q, - op: Cow<'static, str>, - lsn_bytes: u64, - size_bytes: i64, - ) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; - let newseg_id = self.segments.len(); - let lastseg = &mut self.segments[lastseg_id]; - - let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; - - let newseg = Segment { - op, - parent: Some(lastseg_id), - start_lsn: lastseg.end_lsn, - end_lsn: lastseg.end_lsn + lsn_bytes, - start_size: last_end_size, - end_size: Some((last_end_size as i64 + size_bytes) as u64), - children_after: Vec::new(), - needed: false, - }; - lastseg.children_after.push(newseg_id); - - self.segments.push(newseg); - *self.branches.get_mut(branch).expect("read already") = newseg_id; - Ok(()) - } - - pub fn insert(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - self.modify_branch(branch, "insert".into(), bytes, bytes as i64) - } - - pub fn update(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - self.modify_branch(branch, "update".into(), bytes, 0i64) - } - - pub fn delete(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)) - } - - pub fn branch(&mut self, parent: &Q, name: K) -> anyhow::Result<()> - where - K: std::borrow::Borrow + std::fmt::Debug, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - // Find the right segment - let branchseg_id = *self.branches.get(parent).with_context(|| { - format!( - "should had found the parent {:?} by key. in branches {:?}", - parent, self.branches - ) - })?; - - let _branchseg = &mut self.segments[branchseg_id]; - - // Create branch name for it - self.branches.insert(name, branchseg_id); - Ok(()) - } - - pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result { - // Phase 1: Mark all the segments that need to be retained - for (_branch, &last_seg_id) in self.branches.iter() { - let last_seg = &self.segments[last_seg_id]; - let cutoff_lsn = last_seg.start_lsn.saturating_sub(retention_period); - let mut seg_id = last_seg_id; - loop { - let seg = &mut self.segments[seg_id]; - if seg.end_lsn < cutoff_lsn { - break; - } - seg.needed = true; - if let Some(prev_seg_id) = seg.parent { - seg_id = prev_seg_id; - } else { - break; - } - } - } - - // Phase 2: For each oldest segment in a chain that needs to be retained, - // calculate if we should store snapshot or WAL - self.size_from_snapshot_later(0) - } - - fn size_from_wal(&self, seg_id: usize) -> anyhow::Result { - let seg = &self.segments[seg_id]; - - let this_size = seg.end_lsn - seg.start_lsn; - - let mut children = Vec::new(); - - // try both ways - for &child_id in seg.children_after.iter() { - // try each child both ways - let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id)?; - - let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id)?; - if p1.total() < p2.total() { - p1 - } else { - p2 - } - } else { - p1 - }; - children.push(p); - } - Ok(SegmentSize { - seg_id, - method: if seg.needed { WalNeeded } else { Wal }, - this_size, - children, - }) - } - - fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result { - // If this is needed, then it's time to do the snapshot and continue - // with wal method. - let seg = &self.segments[seg_id]; - //eprintln!("snap: seg{}: {} needed: {}", seg_id, seg.children_after.len(), seg.needed); - if seg.needed { - let mut children = Vec::new(); - - for &child_id in seg.children_after.iter() { - // try each child both ways - let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id)?; - - let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id)?; - if p1.total() < p2.total() { - p1 - } else { - p2 - } - } else { - p1 - }; - children.push(p); - } - Ok(SegmentSize { - seg_id, - method: WalNeeded, - this_size: seg.start_size, - children, - }) - } else { - // If any of the direct children are "needed", need to be able to reconstruct here - let mut children_needed = false; - for &child in seg.children_after.iter() { - let seg = &self.segments[child]; - if seg.needed { - children_needed = true; - break; - } - } - - let method1 = if !children_needed { - let mut children = Vec::new(); - for child in seg.children_after.iter() { - children.push(self.size_from_snapshot_later(*child)?); - } - Some(SegmentSize { - seg_id, - method: Skipped, - this_size: 0, - children, - }) - } else { - None - }; - - // If this a junction, consider snapshotting here - let method2 = if children_needed || seg.children_after.len() >= 2 { - let mut children = Vec::new(); - for child in seg.children_after.iter() { - children.push(self.size_from_wal(*child)?); - } - let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") }; - Some(SegmentSize { - seg_id, - method: SnapshotAfter, - this_size, - children, - }) - } else { - None - }; - - Ok(match (method1, method2) { - (None, None) => anyhow::bail!( - "neither method was applicable: children_after={}, children_needed={}", - seg.children_after.len(), - children_needed - ), - (Some(method), None) => method, - (None, Some(method)) => method, - (Some(method1), Some(method2)) => { - if method1.total() < method2.total() { - method1 - } else { - method2 - } - } - }) - } - } - - pub fn into_segments(self) -> Vec { - self.segments - } -} diff --git a/libs/tenant_size_model/src/main.rs b/libs/tenant_size_model/src/main.rs deleted file mode 100644 index e32dd055f4..0000000000 --- a/libs/tenant_size_model/src/main.rs +++ /dev/null @@ -1,269 +0,0 @@ -//! Tenant size model testing ground. -//! -//! Has a number of scenarios and a `main` for invoking these by number, calculating the history -//! size, outputs graphviz graph. Makefile in directory shows how to use graphviz to turn scenarios -//! into pngs. - -use tenant_size_model::{Segment, SegmentSize, Storage}; - -// Main branch only. Some updates on it. -fn scenario_1() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -// Main branch only. Some updates on it. -fn scenario_2() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - // Branch - storage.branch("main", "child")?; - storage.update("child", 1_000)?; - - // More updates on parent - storage.update("main", 1_000)?; - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -// Like 2, but more updates on main -fn scenario_3() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - // Branch - storage.branch("main", "child")?; - storage.update("child", 1_000)?; - - // More updates on parent - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -// Diverged branches -fn scenario_4() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - // Branch - storage.branch("main", "child")?; - storage.update("child", 1_000)?; - - // More updates on parent - for _ in 0..8 { - storage.update("main", 1_000)?; - } - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -fn scenario_5() -> anyhow::Result<(Vec, SegmentSize)> { - let mut storage = Storage::new("a"); - storage.insert("a", 5000)?; - storage.branch("a", "b")?; - storage.update("b", 4000)?; - storage.update("a", 2000)?; - storage.branch("a", "c")?; - storage.insert("c", 4000)?; - storage.insert("a", 2000)?; - - let size = storage.calculate(5000)?; - - Ok((storage.into_segments(), size)) -} - -fn scenario_6() -> anyhow::Result<(Vec, SegmentSize)> { - use std::borrow::Cow; - - const NO_OP: Cow<'static, str> = Cow::Borrowed(""); - - let branches = [ - Some(0x7ff1edab8182025f15ae33482edb590a_u128), - Some(0xb1719e044db05401a05a2ed588a3ad3f), - Some(0xb68d6691c895ad0a70809470020929ef), - ]; - - // compared to other scenarios, this one uses bytes instead of kB - - let mut storage = Storage::new(None); - - storage.branch(&None, branches[0])?; // at 0 - storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064 - storage.branch(&branches[0], branches[1])?; // at 108951064 - storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472 - storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424 - storage.branch(&branches[0], branches[2])?; // at 283415424 - storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616 - storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400 - - let size = storage.calculate(100_000)?; - - Ok((storage.into_segments(), size)) -} - -fn main() { - let args: Vec = std::env::args().collect(); - - let scenario = if args.len() < 2 { "1" } else { &args[1] }; - - let (segments, size) = match scenario { - "1" => scenario_1(), - "2" => scenario_2(), - "3" => scenario_3(), - "4" => scenario_4(), - "5" => scenario_5(), - "6" => scenario_6(), - other => { - eprintln!("invalid scenario {}", other); - std::process::exit(1); - } - } - .unwrap(); - - graphviz_tree(&segments, &size); -} - -fn graphviz_recurse(segments: &[Segment], node: &SegmentSize) { - use tenant_size_model::SegmentMethod::*; - - let seg_id = node.seg_id; - let seg = segments.get(seg_id).unwrap(); - let lsn = seg.end_lsn; - let size = seg.end_size.unwrap_or(0); - let method = node.method; - - println!(" {{"); - println!(" node [width=0.1 height=0.1 shape=oval]"); - - let tenant_size = node.total_children(); - - let penwidth = if seg.needed { 6 } else { 3 }; - let x = match method { - SnapshotAfter => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" style=filled penwidth={penwidth}"), - Wal => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"), - WalNeeded => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"), - Skipped => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"gray\" penwidth={penwidth}"), - }; - - println!(" \"seg{seg_id}\" [{x}]"); - println!(" }}"); - - // Recurse. Much of the data is actually on the edge - for child in node.children.iter() { - let child_id = child.seg_id; - graphviz_recurse(segments, child); - - let edge_color = match child.method { - SnapshotAfter => "gray", - Wal => "black", - WalNeeded => "black", - Skipped => "gray", - }; - - println!(" {{"); - println!(" edge [] "); - print!(" \"seg{seg_id}\" -> \"seg{child_id}\" ["); - print!("color={edge_color}"); - if child.method == WalNeeded { - print!(" penwidth=6"); - } - if child.method == Wal { - print!(" penwidth=3"); - } - - let next = segments.get(child_id).unwrap(); - - if next.op.is_empty() { - print!( - " label=\"{} / {}\"", - next.end_lsn - seg.end_lsn, - (next.end_size.unwrap_or(0) as i128 - seg.end_size.unwrap_or(0) as i128) - ); - } else { - print!(" label=\"{}: {}\"", next.op, next.end_lsn - seg.end_lsn); - } - println!("]"); - println!(" }}"); - } -} - -fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) { - println!("digraph G {{"); - println!(" fontname=\"Helvetica,Arial,sans-serif\""); - println!(" node [fontname=\"Helvetica,Arial,sans-serif\"]"); - println!(" edge [fontname=\"Helvetica,Arial,sans-serif\"]"); - println!(" graph [center=1 rankdir=LR]"); - println!(" edge [dir=none]"); - - graphviz_recurse(segments, tree); - - println!("}}"); -} - -#[test] -fn scenarios_return_same_size() { - type ScenarioFn = fn() -> anyhow::Result<(Vec, SegmentSize)>; - let truths: &[(u32, ScenarioFn, _)] = &[ - (line!(), scenario_1, 8000), - (line!(), scenario_2, 9000), - (line!(), scenario_3, 13000), - (line!(), scenario_4, 16000), - (line!(), scenario_5, 17000), - (line!(), scenario_6, 333_792_000), - ]; - - for (line, scenario, expected) in truths { - let (_, size) = scenario().unwrap(); - assert_eq!(*expected, size.total_children(), "scenario on line {line}"); - } -} diff --git a/libs/tenant_size_model/src/svg.rs b/libs/tenant_size_model/src/svg.rs new file mode 100644 index 0000000000..f26d3aa79d --- /dev/null +++ b/libs/tenant_size_model/src/svg.rs @@ -0,0 +1,193 @@ +use crate::{SegmentMethod, SegmentSizeResult, SizeResult, StorageModel}; +use std::fmt::Write; + +const SVG_WIDTH: f32 = 500.0; + +struct SvgDraw<'a> { + storage: &'a StorageModel, + branches: &'a [String], + seg_to_branch: &'a [usize], + sizes: &'a [SegmentSizeResult], + + // layout + xscale: f32, + min_lsn: u64, + seg_coordinates: Vec<(f32, f32)>, +} + +fn draw_legend(result: &mut String) -> anyhow::Result<()> { + writeln!( + result, + "" + )?; + writeln!(result, "logical snapshot")?; + writeln!( + result, + "" + )?; + writeln!( + result, + "WAL within retention period" + )?; + writeln!( + result, + "" + )?; + writeln!( + result, + "WAL retained to avoid copy" + )?; + writeln!( + result, + "" + )?; + writeln!(result, "WAL not retained")?; + Ok(()) +} + +pub fn draw_svg( + storage: &StorageModel, + branches: &[String], + seg_to_branch: &[usize], + sizes: &SizeResult, +) -> anyhow::Result { + let mut draw = SvgDraw { + storage, + branches, + seg_to_branch, + sizes: &sizes.segments, + + xscale: 0.0, + min_lsn: 0, + seg_coordinates: Vec::new(), + }; + + let mut result = String::new(); + + writeln!(result, "")?; + + draw.calculate_svg_layout(); + + // Draw the tree + for (seg_id, _seg) in storage.segments.iter().enumerate() { + draw.draw_seg_phase1(seg_id, &mut result)?; + } + + // Draw snapshots + for (seg_id, _seg) in storage.segments.iter().enumerate() { + draw.draw_seg_phase2(seg_id, &mut result)?; + } + + draw_legend(&mut result)?; + + write!(result, "")?; + + Ok(result) +} + +impl<'a> SvgDraw<'a> { + fn calculate_svg_layout(&mut self) { + // Find x scale + let segments = &self.storage.segments; + let min_lsn = segments.iter().map(|s| s.lsn).fold(u64::MAX, std::cmp::min); + let max_lsn = segments.iter().map(|s| s.lsn).fold(0, std::cmp::max); + + // Start with 1 pixel = 1 byte. Double the scale until it fits into the image + let mut xscale = 1.0; + while (max_lsn - min_lsn) as f32 / xscale > SVG_WIDTH { + xscale *= 2.0; + } + + // Layout the timelines on Y dimension. + // TODO + let mut y = 100.0; + let mut branch_y_coordinates = Vec::new(); + for _branch in self.branches { + branch_y_coordinates.push(y); + y += 40.0; + } + + // Calculate coordinates for each point + let seg_coordinates = std::iter::zip(segments, self.seg_to_branch) + .map(|(seg, branch_id)| { + let x = (seg.lsn - min_lsn) as f32 / xscale; + let y = branch_y_coordinates[*branch_id]; + (x, y) + }) + .collect(); + + self.xscale = xscale; + self.min_lsn = min_lsn; + self.seg_coordinates = seg_coordinates; + } + + /// Draws lines between points + fn draw_seg_phase1(&self, seg_id: usize, result: &mut String) -> anyhow::Result<()> { + let seg = &self.storage.segments[seg_id]; + + let wal_bytes = if let Some(parent_id) = seg.parent { + seg.lsn - self.storage.segments[parent_id].lsn + } else { + 0 + }; + + let style = match self.sizes[seg_id].method { + SegmentMethod::SnapshotHere => "stroke-width=\"1\" stroke=\"gray\"", + SegmentMethod::Wal if seg.needed && wal_bytes > 0 => { + "stroke-width=\"6\" stroke=\"black\"" + } + SegmentMethod::Wal => "stroke-width=\"3\" stroke=\"black\"", + SegmentMethod::Skipped => "stroke-width=\"1\" stroke=\"gray\"", + }; + if let Some(parent_id) = seg.parent { + let (x1, y1) = self.seg_coordinates[parent_id]; + let (x2, y2) = self.seg_coordinates[seg_id]; + + writeln!( + result, + "", + )?; + writeln!( + result, + " {wal_bytes} bytes of WAL (seg {seg_id})" + )?; + writeln!(result, "")?; + } else { + // draw a little dash to mark the starting point of this branch + let (x, y) = self.seg_coordinates[seg_id]; + let (x1, y1) = (x, y - 5.0); + let (x2, y2) = (x, y + 5.0); + + writeln!( + result, + "", + )?; + writeln!(result, " (seg {seg_id})")?; + writeln!(result, "")?; + } + + Ok(()) + } + + /// Draw circles where snapshots are taken + fn draw_seg_phase2(&self, seg_id: usize, result: &mut String) -> anyhow::Result<()> { + let seg = &self.storage.segments[seg_id]; + + // draw a snapshot point if it's needed + let (coord_x, coord_y) = self.seg_coordinates[seg_id]; + if self.sizes[seg_id].method == SegmentMethod::SnapshotHere { + writeln!( + result, + "", + )?; + writeln!( + result, + " logical size {}", + seg.size.unwrap() + )?; + write!(result, "")?; + } + + Ok(()) + } +} diff --git a/libs/tenant_size_model/tests/tests.rs b/libs/tenant_size_model/tests/tests.rs new file mode 100644 index 0000000000..7660d41c56 --- /dev/null +++ b/libs/tenant_size_model/tests/tests.rs @@ -0,0 +1,313 @@ +//! Tenant size model tests. + +use tenant_size_model::{Segment, SizeResult, StorageModel}; + +use std::collections::HashMap; + +struct ScenarioBuilder { + segments: Vec, + + /// Mapping from the branch name to the index of a segment describing its latest state. + branches: HashMap, +} + +impl ScenarioBuilder { + /// Creates a new storage with the given default branch name. + pub fn new(initial_branch: &str) -> ScenarioBuilder { + let init_segment = Segment { + parent: None, + lsn: 0, + size: Some(0), + needed: false, // determined later + }; + + ScenarioBuilder { + segments: vec![init_segment], + branches: HashMap::from([(initial_branch.into(), 0)]), + } + } + + /// Advances the branch with the named operation, by the relative LSN and logical size bytes. + pub fn modify_branch(&mut self, branch: &str, lsn_bytes: u64, size_bytes: i64) { + let lastseg_id = *self.branches.get(branch).unwrap(); + let newseg_id = self.segments.len(); + let lastseg = &mut self.segments[lastseg_id]; + + let newseg = Segment { + parent: Some(lastseg_id), + lsn: lastseg.lsn + lsn_bytes, + size: Some((lastseg.size.unwrap() as i64 + size_bytes) as u64), + needed: false, + }; + + self.segments.push(newseg); + *self.branches.get_mut(branch).expect("read already") = newseg_id; + } + + pub fn insert(&mut self, branch: &str, bytes: u64) { + self.modify_branch(branch, bytes, bytes as i64); + } + + pub fn update(&mut self, branch: &str, bytes: u64) { + self.modify_branch(branch, bytes, 0i64); + } + + pub fn _delete(&mut self, branch: &str, bytes: u64) { + self.modify_branch(branch, bytes, -(bytes as i64)); + } + + /// Panics if the parent branch cannot be found. + pub fn branch(&mut self, parent: &str, name: &str) { + // Find the right segment + let branchseg_id = *self + .branches + .get(parent) + .expect("should had found the parent by key"); + let _branchseg = &mut self.segments[branchseg_id]; + + // Create branch name for it + self.branches.insert(name.to_string(), branchseg_id); + } + + pub fn calculate(&mut self, retention_period: u64) -> (StorageModel, SizeResult) { + // Phase 1: Mark all the segments that need to be retained + for (_branch, &last_seg_id) in self.branches.iter() { + let last_seg = &self.segments[last_seg_id]; + let cutoff_lsn = last_seg.lsn.saturating_sub(retention_period); + let mut seg_id = last_seg_id; + loop { + let seg = &mut self.segments[seg_id]; + if seg.lsn <= cutoff_lsn { + break; + } + seg.needed = true; + if let Some(prev_seg_id) = seg.parent { + seg_id = prev_seg_id; + } else { + break; + } + } + } + + // Perform the calculation + let storage_model = StorageModel { + segments: self.segments.clone(), + }; + let size_result = storage_model.calculate(); + (storage_model, size_result) + } +} + +// Main branch only. Some updates on it. +#[test] +fn scenario_1() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Calculate the synthetic size with retention horizon 1000 + let (_model, result) = scenario.calculate(1000); + + // The end of the branch is at LSN 10000. Need to retain + // a logical snapshot at LSN 9000, plus the WAL between 9000-10000. + // The logical snapshot has size 5000. + assert_eq!(result.total_size, 5000 + 1000); +} + +// Main branch only. Some updates on it. +#[test] +fn scenario_2() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Branch + scenario.branch("main", "child"); + scenario.update("child", 1_000); + + // More updates on parent + scenario.update("main", 1_000); + + // + // The history looks like this now: + // + // 10000 11000 + // *----*----*--------------* main + // | + // | 11000 + // +-------------- child + // + // + // With retention horizon 1000, we need to retain logical snapshot + // at the branch point, size 5000, and the WAL from 10000-11000 on + // both branches. + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 5000 + 1000 + 1000); +} + +// Like 2, but more updates on main +#[test] +fn scenario_3() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Branch + scenario.branch("main", "child"); + scenario.update("child", 1_000); + + // More updates on parent + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // + // The history looks like this now: + // + // 10000 15000 + // *----*----*------------------------------------* main + // | + // | 11000 + // +-------------- child + // + // + // With retention horizon 1000, it's still cheapest to retain + // - snapshot at branch point (size 5000) + // - WAL on child between 10000-11000 + // - WAL on main between 10000-15000 + // + // This is in total 5000 + 1000 + 5000 + // + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 5000 + 1000 + 5000); +} + +// Diverged branches +#[test] +fn scenario_4() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Branch + scenario.branch("main", "child"); + scenario.update("child", 1_000); + + // More updates on parent + for _ in 0..8 { + scenario.update("main", 1_000); + } + + // + // The history looks like this now: + // + // 10000 18000 + // *----*----*------------------------------------* main + // | + // | 11000 + // +-------------- child + // + // + // With retention horizon 1000, it's now cheapest to retain + // separate snapshots on both branches: + // - snapshot on main branch at LSN 17000 (size 5000) + // - WAL on main between 17000-18000 + // - snapshot on child branch at LSN 10000 (size 5000) + // - WAL on child between 10000-11000 + // + // This is in total 5000 + 1000 + 5000 + 1000 = 12000 + // + // (If we used the the method from the previous scenario, and + // kept only snapshot at the branch point, we'd need to keep + // all the WAL between 10000-18000 on the main branch, so + // the total size would be 5000 + 1000 + 8000 = 14000. The + // calculation always picks the cheapest alternative) + + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 5000 + 1000 + 5000 + 1000); +} + +#[test] +fn scenario_5() { + let mut scenario = ScenarioBuilder::new("a"); + scenario.insert("a", 5000); + scenario.branch("a", "b"); + scenario.update("b", 4000); + scenario.update("a", 2000); + scenario.branch("a", "c"); + scenario.insert("c", 4000); + scenario.insert("a", 2000); + + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 17000); +} + +#[test] +fn scenario_6() { + let branches = [ + "7ff1edab8182025f15ae33482edb590a", + "b1719e044db05401a05a2ed588a3ad3f", + "0xb68d6691c895ad0a70809470020929ef", + ]; + + // compared to other scenarios, this one uses bytes instead of kB + + let mut scenario = ScenarioBuilder::new(""); + + scenario.branch("", branches[0]); // at 0 + scenario.modify_branch(branches[0], 108951064, 43696128); // at 108951064 + scenario.branch(branches[0], branches[1]); // at 108951064 + scenario.modify_branch(branches[1], 15560408, -1851392); // at 124511472 + scenario.modify_branch(branches[0], 174464360, -1531904); // at 283415424 + scenario.branch(branches[0], branches[2]); // at 283415424 + scenario.modify_branch(branches[2], 15906192, 8192); // at 299321616 + scenario.modify_branch(branches[0], 18909976, 32768); // at 302325400 + + let (model, result) = scenario.calculate(100_000); + + // FIXME: We previously calculated 333_792_000. But with this PR, we get + // a much lower number. At a quick look at the model output and the + // calculations here, the new result seems correct to me. + eprintln!( + " MODEL: {}", + serde_json::to_string(&model.segments).unwrap() + ); + eprintln!( + "RESULT: {}", + serde_json::to_string(&result.segments).unwrap() + ); + + assert_eq!(result.total_size, 136_236_928); +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index fc271fe83b..e68ceb2dc6 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -437,6 +437,13 @@ paths: type: boolean description: | When true, skip calculation and only provide the model inputs (for debugging). Defaults to false. + - name: retention_period + in: query + required: false + schema: + type: integer + description: | + Override the default retention period (in bytes) used for size calculation. get: description: | Calculate tenant's size, which is a mixture of WAL (bytes) and logical_size (bytes). diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6a9232e097..7cd7e81fe1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -7,6 +7,7 @@ use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest; use remote_storage::GenericRemoteStorage; +use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; @@ -20,6 +21,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::mgr::TenantMapInsertError; +use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::{PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; @@ -479,11 +481,19 @@ async fn tenant_status(request: Request) -> Result, ApiErro /// to debug any of the calculations. Requires `tenant_id` request parameter, supports /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model /// values. +/// +/// 'retention_period' query parameter overrides the cutoff that is used to calculate the size +/// (only if it is shorter than the real cutoff). +/// +/// Note: we don't update the cached size and prometheus metric here. +/// The retention period might be different, and it's nice to have a method to just calculate it +/// without modifying anything anyway. async fn tenant_size_handler(request: Request) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let inputs_only: Option = parse_query_param(&request, "inputs_only")?; + let retention_period: Option = parse_query_param(&request, "retention_period")?; + let headers = request.headers(); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let tenant = mgr::get_tenant(tenant_id, true) @@ -492,24 +502,29 @@ async fn tenant_size_handler(request: Request) -> Result, A // this can be long operation let inputs = tenant - .gather_size_inputs(&ctx) + .gather_size_inputs(retention_period, &ctx) .await .map_err(ApiError::InternalServerError)?; - let size = if !inputs_only.unwrap_or(false) { - Some( - tenant - .calc_and_update_cached_synthetic_size(&inputs) - .map_err(ApiError::InternalServerError)?, - ) - } else { - None - }; + let mut sizes = None; + if !inputs_only.unwrap_or(false) { + let storage_model = inputs + .calculate_model() + .map_err(ApiError::InternalServerError)?; + let size = storage_model.calculate(); - /// Private response type with the additional "unstable" `inputs` field. - /// - /// The type is described with `id` and `size` in the openapi_spec file, but the `inputs` is - /// intentionally left out. The type resides in the pageserver not to expose `ModelInputs`. + // If request header expects html, return html + if headers["Accept"] == "text/html" { + return synthetic_size_html_response(inputs, storage_model, size); + } + sizes = Some(size); + } else if headers["Accept"] == "text/html" { + return Err(ApiError::BadRequest(anyhow!( + "inputs_only parameter is incompatible with html output request" + ))); + } + + /// The type resides in the pageserver not to expose `ModelInputs`. #[serde_with::serde_as] #[derive(serde::Serialize)] struct TenantHistorySize { @@ -519,6 +534,9 @@ async fn tenant_size_handler(request: Request) -> Result, A /// /// Will be none if `?inputs_only=true` was given. size: Option, + /// Size of each segment used in the model. + /// Will be null if `?inputs_only=true` was given. + segment_sizes: Option>, inputs: crate::tenant::size::ModelInputs, } @@ -526,7 +544,8 @@ async fn tenant_size_handler(request: Request) -> Result, A StatusCode::OK, TenantHistorySize { id: tenant_id, - size, + size: sizes.as_ref().map(|x| x.total_size), + segment_sizes: sizes.map(|x| x.segments), inputs, }, ) @@ -591,6 +610,62 @@ async fn evict_timeline_layer_handler(request: Request) -> Result Result, ApiError> { + let mut timeline_ids: Vec = Vec::new(); + let mut timeline_map: HashMap = HashMap::new(); + for (index, ti) in inputs.timeline_inputs.iter().enumerate() { + timeline_map.insert(ti.timeline_id, index); + timeline_ids.push(ti.timeline_id.to_string()); + } + let seg_to_branch: Vec = inputs + .segments + .iter() + .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap()) + .collect(); + + let svg = + tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes) + .map_err(ApiError::InternalServerError)?; + + let mut response = String::new(); + + use std::fmt::Write; + write!(response, "\n\n").unwrap(); + write!(response, "
\n{svg}\n
").unwrap(); + writeln!(response, "Project size: {}", sizes.total_size).unwrap(); + writeln!(response, "
").unwrap();
+    writeln!(
+        response,
+        "{}",
+        serde_json::to_string_pretty(&inputs).unwrap()
+    )
+    .unwrap();
+    writeln!(
+        response,
+        "{}",
+        serde_json::to_string_pretty(&sizes.segments).unwrap()
+    )
+    .unwrap();
+    writeln!(response, "
").unwrap(); + write!(response, "\n\n").unwrap(); + + html_response(StatusCode::OK, response) +} + +pub fn html_response(status: StatusCode, data: String) -> Result, ApiError> { + let response = Response::builder() + .status(status) + .header(hyper::header::CONTENT_TYPE, "text/html") + .body(Body::from(data.as_bytes().to_vec())) + .map_err(|e| ApiError::InternalServerError(e.into()))?; + Ok(response) +} + // Helper function to standardize the error messages we produce on bad durations // // Intended to be used with anyhow's `with_context`, e.g.: @@ -1019,7 +1094,7 @@ pub fn make_router( .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) .get("/v1/tenant/:tenant_id", tenant_status) - .get("/v1/tenant/:tenant_id/size", tenant_size_handler) + .get("/v1/tenant/:tenant_id/synthetic_size", tenant_size_handler) .put("/v1/tenant/config", update_tenant_config_handler) .get("/v1/tenant/:tenant_id/config", get_tenant_config_handler) .get("/v1/tenant/:tenant_id/timeline", timeline_list_handler) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 23210b98d5..9e9c98ad62 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2418,6 +2418,9 @@ impl Tenant { #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] pub async fn gather_size_inputs( &self, + // `max_retention_period` overrides the cutoff that is used to calculate the size + // (only if it is shorter than the real cutoff). + max_retention_period: Option, ctx: &RequestContext, ) -> anyhow::Result { let logical_sizes_at_once = self @@ -2425,32 +2428,41 @@ impl Tenant { .concurrent_tenant_size_logical_size_queries .inner(); - // TODO: Having a single mutex block concurrent reads is unfortunate, but since the queries - // are for testing/experimenting, we tolerate this. + // TODO: Having a single mutex block concurrent reads is not great for performance. + // + // But the only case where we need to run multiple of these at once is when we + // request a size for a tenant manually via API, while another background calculation + // is in progress (which is not a common case). // // See more for on the issue #2748 condenced out of the initial PR review. let mut shared_cache = self.cached_logical_sizes.lock().await; - size::gather_inputs(self, logical_sizes_at_once, &mut shared_cache, ctx).await + size::gather_inputs( + self, + logical_sizes_at_once, + max_retention_period, + &mut shared_cache, + ctx, + ) + .await } - /// Calculate synthetic tenant size + /// Calculate synthetic tenant size and cache the result. /// This is periodically called by background worker. /// result is cached in tenant struct #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result { - let inputs = self.gather_size_inputs(ctx).await?; + let inputs = self.gather_size_inputs(None, ctx).await?; - self.calc_and_update_cached_synthetic_size(&inputs) - } - - /// Calculate synthetic size , cache it and set metric value - pub fn calc_and_update_cached_synthetic_size( - &self, - inputs: &size::ModelInputs, - ) -> anyhow::Result { let size = inputs.calculate()?; + self.set_cached_synthetic_size(size); + + Ok(size) + } + + /// Cache given synthetic size and update the metric value + pub fn set_cached_synthetic_size(&self, size: u64) { self.cached_synthetic_tenant_size .store(size, Ordering::Relaxed); @@ -2458,8 +2470,6 @@ impl Tenant { .get_metric_with_label_values(&[&self.tenant_id.to_string()]) .unwrap() .set(size); - - Ok(size) } pub fn get_cached_synthetic_size(&self) -> u64 { diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 2fed4f88b3..2c5efe283b 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -1,8 +1,9 @@ use std::cmp; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::Context; +use anyhow::{bail, Context}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; @@ -10,35 +11,80 @@ use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; use super::Tenant; +use crate::tenant::Timeline; use utils::id::TimelineId; use utils::lsn::Lsn; use tracing::*; +use tenant_size_model::{Segment, StorageModel}; + /// Inputs to the actual tenant sizing model /// /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to /// be a transferrable format between execution environments and developer. +/// +/// This tracks more information than the actual StorageModel that calculation +/// needs. We will convert this into a StorageModel when it's time to perform +/// the calculation. +/// #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ModelInputs { - updates: Vec, - retention_period: u64, + pub segments: Vec, + pub timeline_inputs: Vec, +} - /// Relevant lsns per timeline. - /// - /// This field is not required for deserialization purposes, which is mostly used in tests. The - /// LSNs explain the outcome (updates) but are not needed in size calculation. - #[serde_as(as = "HashMap")] - #[serde(default)] - timeline_inputs: HashMap, +/// A [`Segment`], with some extra information for display purposes +#[serde_with::serde_as] +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct SegmentMeta { + pub segment: Segment, + #[serde_as(as = "serde_with::DisplayFromStr")] + pub timeline_id: TimelineId, + pub kind: LsnKind, +} + +impl SegmentMeta { + fn size_needed(&self) -> bool { + match self.kind { + LsnKind::BranchStart => { + // If we don't have a later GcCutoff point on this branch, and + // no ancestor, calculate size for the branch start point. + self.segment.needed && self.segment.parent.is_none() + } + LsnKind::BranchPoint => true, + LsnKind::GcCutOff => true, + LsnKind::BranchEnd => false, + } + } +} + +#[derive( + Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub enum LsnKind { + /// A timeline starting here + BranchStart, + /// A child timeline branches off from here + BranchPoint, + /// GC cutoff point + GcCutOff, + /// Last record LSN + BranchEnd, } /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as /// part of [`ModelInputs`] from the HTTP api, explaining the inputs. #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] -struct TimelineInputs { +pub struct TimelineInputs { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub timeline_id: TimelineId, + + #[serde_as(as = "Option")] + pub ancestor_id: Option, + #[serde_as(as = "serde_with::DisplayFromStr")] ancestor_lsn: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] @@ -49,118 +95,14 @@ struct TimelineInputs { horizon_cutoff: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] pitr_cutoff: Lsn, + + /// Cutoff point based on GC settings #[serde_as(as = "serde_with::DisplayFromStr")] next_gc_cutoff: Lsn, -} -// Adjust BranchFrom sorting so that we always process ancestor -// before descendants. This is needed to correctly calculate size of -// descendant timelines. -// -// Note that we may have multiple BranchFroms at the same LSN, so we -// need to sort them in the tree order. -// -// see updates_sort_with_branches_at_same_lsn test below -fn sort_updates_in_tree_order(updates: Vec) -> anyhow::Result> { - let mut sorted_updates = Vec::with_capacity(updates.len()); - let mut known_timelineids = HashSet::new(); - let mut i = 0; - while i < updates.len() { - let curr_upd = &updates[i]; - - if let Command::BranchFrom(parent_id) = curr_upd.command { - let parent_id = match parent_id { - Some(parent_id) if known_timelineids.contains(&parent_id) => { - // we have already processed ancestor - // process this BranchFrom Update normally - known_timelineids.insert(curr_upd.timeline_id); - sorted_updates.push(*curr_upd); - i += 1; - continue; - } - None => { - known_timelineids.insert(curr_upd.timeline_id); - sorted_updates.push(*curr_upd); - i += 1; - continue; - } - Some(parent_id) => parent_id, - }; - - let mut j = i; - - // we have not processed ancestor yet. - // there is a chance that it is at the same Lsn - if !known_timelineids.contains(&parent_id) { - let mut curr_lsn_branchfroms: HashMap> = - HashMap::new(); - - // inspect all branchpoints at the same lsn - while j < updates.len() && updates[j].lsn == curr_upd.lsn { - let lookahead_upd = &updates[j]; - j += 1; - - if let Command::BranchFrom(lookahead_parent_id) = lookahead_upd.command { - match lookahead_parent_id { - Some(lookahead_parent_id) - if !known_timelineids.contains(&lookahead_parent_id) => - { - // we have not processed ancestor yet - // store it for later - let es = - curr_lsn_branchfroms.entry(lookahead_parent_id).or_default(); - es.push((lookahead_upd.timeline_id, j)); - } - _ => { - // we have already processed ancestor - // process this BranchFrom Update normally - known_timelineids.insert(lookahead_upd.timeline_id); - sorted_updates.push(*lookahead_upd); - } - } - } - } - - // process BranchFroms in the tree order - // check that we don't have a cycle if somet entry is orphan - // (this should not happen, but better to be safe) - let mut processed_some_entry = true; - while processed_some_entry { - processed_some_entry = false; - - curr_lsn_branchfroms.retain(|parent_id, branchfroms| { - if known_timelineids.contains(parent_id) { - for (timeline_id, j) in branchfroms { - known_timelineids.insert(*timeline_id); - sorted_updates.push(updates[*j - 1]); - } - processed_some_entry = true; - false - } else { - true - } - }); - } - - if !curr_lsn_branchfroms.is_empty() { - // orphans are expected to be rare and transient between tenant reloads - // for example, an broken ancestor without the child branch being broken. - anyhow::bail!( - "orphan branch(es) detected in BranchFroms: {curr_lsn_branchfroms:?}" - ); - } - } - - assert!(j > i); - i = j; - } else { - // not a BranchFrom, keep the same order - sorted_updates.push(*curr_upd); - i += 1; - } - } - - Ok(sorted_updates) + /// Cutoff point calculated from the user-supplied 'max_retention_period' + #[serde_as(as = "Option")] + retention_param_cutoff: Option, } /// Gathers the inputs for the tenant sizing model. @@ -181,257 +123,257 @@ fn sort_updates_in_tree_order(updates: Vec) -> anyhow::Result, + max_retention_period: Option, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, ctx: &RequestContext, ) -> anyhow::Result { - // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to - // our advantage with `?` error handling. - let mut joinset = tokio::task::JoinSet::new(); - // refresh is needed to update gc related pitr_cutoff and horizon_cutoff tenant .refresh_gc_info(ctx) .await .context("Failed to refresh gc_info before gathering inputs")?; + // Collect information about all the timelines let timelines = tenant.list_timelines(); if timelines.is_empty() { // perhaps the tenant has just been created, and as such doesn't have any data yet return Ok(ModelInputs { - updates: vec![], - retention_period: 0, - timeline_inputs: HashMap::default(), + segments: vec![], + timeline_inputs: Vec::new(), }); } + // Build a map of branch points. + let mut branchpoints: HashMap> = HashMap::new(); + for timeline in timelines.iter() { + if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() { + branchpoints + .entry(ancestor_id) + .or_default() + .insert(timeline.get_ancestor_lsn()); + } + } + + // These become the final result. + let mut timeline_inputs = Vec::with_capacity(timelines.len()); + let mut segments: Vec = Vec::new(); + + // + // Build Segments representing each timeline. As we do that, also remember + // the branchpoints and branch startpoints in 'branchpoint_segments' and + // 'branchstart_segments' + // + + // BranchPoint segments of each timeline + // (timeline, branchpoint LSN) -> segment_id + let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new(); + + // timeline, Branchpoint seg id, (ancestor, ancestor LSN) + type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>); + let mut branchstart_segments: Vec = Vec::new(); + + for timeline in timelines.iter() { + let timeline_id = timeline.timeline_id; + let last_record_lsn = timeline.get_last_record_lsn(); + let ancestor_lsn = timeline.get_ancestor_lsn(); + + // there's a race between the update (holding tenant.gc_lock) and this read but it + // might not be an issue, because it's not for Timeline::gc + let gc_info = timeline.gc_info.read().unwrap(); + + // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a + // new gc run, which we have no control over. however differently from `Timeline::gc` + // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not + // actually removing files. + let mut next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff); + + // If the caller provided a shorter retention period, use that instead of the GC cutoff. + let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period { + let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period)); + if next_gc_cutoff < param_cutoff { + next_gc_cutoff = param_cutoff; + } + Some(param_cutoff) + } else { + None + }; + + // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we + // want to query any logical size before initdb_lsn. + let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn); + + // Build "interesting LSNs" on this timeline + let mut lsns: Vec<(Lsn, LsnKind)> = gc_info + .retain_lsns + .iter() + .filter(|&&lsn| lsn > ancestor_lsn) + .copied() + // this assumes there are no other retain_lsns than the branchpoints + .map(|lsn| (lsn, LsnKind::BranchPoint)) + .collect::>(); + + // Add branch points we collected earlier, just in case there were any that were + // not present in retain_lsns. We will remove any duplicates below later. + if let Some(this_branchpoints) = branchpoints.get(&timeline_id) { + lsns.extend( + this_branchpoints + .iter() + .map(|lsn| (*lsn, LsnKind::BranchPoint)), + ) + } + + // Add a point for the GC cutoff + let branch_start_needed = next_gc_cutoff <= branch_start_lsn; + if !branch_start_needed { + lsns.push((next_gc_cutoff, LsnKind::GcCutOff)); + } + + lsns.sort_unstable(); + lsns.dedup(); + + // + // Create Segments for the interesting points. + // + + // Timeline start point + let ancestor = timeline + .get_ancestor_timeline_id() + .map(|ancestor_id| (ancestor_id, ancestor_lsn)); + branchstart_segments.push((timeline_id, segments.len(), ancestor)); + segments.push(SegmentMeta { + segment: Segment { + parent: None, // filled in later + lsn: branch_start_lsn.0, + size: None, // filled in later + needed: branch_start_needed, + }, + timeline_id: timeline.timeline_id, + kind: LsnKind::BranchStart, + }); + + // GC cutoff point, and any branch points, i.e. points where + // other timelines branch off from this timeline. + let mut parent = segments.len() - 1; + for (lsn, kind) in lsns { + if kind == LsnKind::BranchPoint { + branchpoint_segments.insert((timeline_id, lsn), segments.len()); + } + segments.push(SegmentMeta { + segment: Segment { + parent: Some(parent), + lsn: lsn.0, + size: None, + needed: lsn > next_gc_cutoff, + }, + timeline_id: timeline.timeline_id, + kind, + }); + parent += 1; + } + + // Current end of the timeline + segments.push(SegmentMeta { + segment: Segment { + parent: Some(parent), + lsn: last_record_lsn.0, + size: None, // Filled in later, if necessary + needed: true, + }, + timeline_id: timeline.timeline_id, + kind: LsnKind::BranchEnd, + }); + + timeline_inputs.push(TimelineInputs { + timeline_id: timeline.timeline_id, + ancestor_id: timeline.get_ancestor_timeline_id(), + ancestor_lsn, + last_record: last_record_lsn, + // this is not used above, because it might not have updated recently enough + latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), + horizon_cutoff: gc_info.horizon_cutoff, + pitr_cutoff: gc_info.pitr_cutoff, + next_gc_cutoff, + retention_param_cutoff, + }); + } + + // We now have all segments from the timelines in 'segments'. The timelines + // haven't been linked to each other yet, though. Do that. + for (_timeline_id, seg_id, ancestor) in branchstart_segments { + // Look up the branch point + if let Some(ancestor) = ancestor { + let parent_id = *branchpoint_segments.get(&ancestor).unwrap(); + segments[seg_id].segment.parent = Some(parent_id); + } + } + + // We left the 'size' field empty in all of the Segments so far. + // Now find logical sizes for all of the points that might need or benefit from them. + fill_logical_sizes(&timelines, &mut segments, limit, logical_size_cache, ctx).await?; + + Ok(ModelInputs { + segments, + timeline_inputs, + }) +} + +/// Augment 'segments' with logical sizes +/// +/// this will probably conflict with on-demand downloaded layers, or at least force them all +/// to be downloaded +/// +async fn fill_logical_sizes( + timelines: &[Arc], + segments: &mut [SegmentMeta], + limit: &Arc, + logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, + ctx: &RequestContext, +) -> anyhow::Result<()> { + let timeline_hash: HashMap> = HashMap::from_iter( + timelines + .iter() + .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))), + ); + // record the used/inserted cache keys here, to remove extras not to start leaking // after initial run the cache should be quite stable, but live timelines will eventually // require new lsns to be inspected. - let mut needed_cache = HashSet::<(TimelineId, Lsn)>::new(); + let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option>::new(); - let mut updates = Vec::new(); + // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to + // our advantage with `?` error handling. + let mut joinset = tokio::task::JoinSet::new(); - // record the per timeline values useful to debug the model inputs, also used to track - // ancestor_lsn without keeping a hold of Timeline - let mut timeline_inputs = HashMap::with_capacity(timelines.len()); - - // used to determine the `retention_period` for the size model - let mut max_cutoff_distance = None; - - // mapping from (TimelineId, Lsn) => if this branch point has been handled already via - // GcInfo::retain_lsns or if it needs to have its logical_size calculated. - let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new(); - - for timeline in timelines { - if !timeline.is_active() { - anyhow::bail!( - "timeline {} is not active, cannot calculate tenant_size now", - timeline.timeline_id - ); + // For each point that would benefit from having a logical size available, + // spawn a Task to fetch it, unless we have it cached already. + for seg in segments.iter() { + if !seg.size_needed() { + continue; } - let last_record_lsn = timeline.get_last_record_lsn(); + let timeline_id = seg.timeline_id; + let lsn = Lsn(seg.segment.lsn); - let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = { - // there's a race between the update (holding tenant.gc_lock) and this read but it - // might not be an issue, because it's not for Timeline::gc - let gc_info = timeline.gc_info.read().unwrap(); - - // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a - // new gc run, which we have no control over. however differently from `Timeline::gc` - // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not - // actually removing files. - let next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff); - - // the minimum where we should find the next_gc_cutoff for our calculations. - // - // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we - // want to query any logical size before initdb_lsn. - let cutoff_minimum = cmp::max(timeline.get_ancestor_lsn(), timeline.initdb_lsn); - - let maybe_cutoff = if next_gc_cutoff > cutoff_minimum { - Some((next_gc_cutoff, LsnKind::GcCutOff)) - } else { - None - }; - - // this assumes there are no other lsns than the branchpoints - let lsns = gc_info - .retain_lsns - .iter() - .inspect(|&&lsn| { - trace!( - timeline_id=%timeline.timeline_id, - "retained lsn: {lsn:?}, is_before_ancestor_lsn={}", - lsn < timeline.get_ancestor_lsn() - ) - }) - .filter(|&&lsn| lsn > timeline.get_ancestor_lsn()) - .copied() - .map(|lsn| (lsn, LsnKind::BranchPoint)) - .chain(maybe_cutoff) - .collect::>(); - - ( - lsns, - gc_info.horizon_cutoff, - gc_info.pitr_cutoff, - next_gc_cutoff, - ) - }; - - // update this to have a retention_period later for the tenant_size_model - // tenant_size_model compares this to the last segments start_lsn - if let Some(cutoff_distance) = last_record_lsn.checked_sub(next_gc_cutoff) { - match max_cutoff_distance.as_mut() { - Some(max) => { - *max = std::cmp::max(*max, cutoff_distance); - } - _ => { - max_cutoff_distance = Some(cutoff_distance); - } - } - } - - // all timelines branch from something, because it might be impossible to pinpoint - // which is the tenant_size_model's "default" branch. - - let ancestor_lsn = timeline.get_ancestor_lsn(); - - updates.push(Update { - lsn: ancestor_lsn, - command: Command::BranchFrom(timeline.get_ancestor_timeline_id()), - timeline_id: timeline.timeline_id, - }); - - if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() { - // refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches - // which are over gc_horizon. for example, a "main" branch which never received any - // updates apart from initdb not have branch points recorded. - referenced_branch_froms - .entry((parent_timeline_id, timeline.get_ancestor_lsn())) - .or_default(); - } - - for (lsn, _kind) in &interesting_lsns { - // mark this visited so don't need to re-process this parent - *referenced_branch_froms - .entry((timeline.timeline_id, *lsn)) - .or_default() = true; - - if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) { - updates.push(Update { - lsn: *lsn, - timeline_id: timeline.timeline_id, - command: Command::Update(*size), - }); - - needed_cache.insert((timeline.timeline_id, *lsn)); - } else { - let timeline = Arc::clone(&timeline); + if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) { + let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned(); + if cached_size.is_none() { + let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap()); let parallel_size_calcs = Arc::clone(limit); let ctx = ctx.attached_child(); joinset.spawn(calculate_logical_size( parallel_size_calcs, timeline, - *lsn, + lsn, ctx, )); } - } - - timeline_inputs.insert( - timeline.timeline_id, - TimelineInputs { - ancestor_lsn, - last_record: last_record_lsn, - // this is not used above, because it might not have updated recently enough - latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), - horizon_cutoff, - pitr_cutoff, - next_gc_cutoff, - }, - ); - } - - // iterate over discovered branch points and make sure we are getting logical sizes at those - // points. - for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() { - if *handled { - continue; - } - - let timeline_id = *timeline_id; - let lsn = *lsn; - - match timeline_inputs.get(&timeline_id) { - Some(inputs) if inputs.ancestor_lsn == lsn => { - // we don't need an update at this branch point which is also point where - // timeline_id branch was branched from. - continue; - } - Some(_) => {} - None => { - // we should have this because we have iterated through all of the timelines - anyhow::bail!("missing timeline_input for {timeline_id}") - } - } - - if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) { - updates.push(Update { - lsn, - timeline_id, - command: Command::Update(*size), - }); - - needed_cache.insert((timeline_id, lsn)); - } else { - let timeline = tenant - .get_timeline(timeline_id, false) - .context("find referenced ancestor timeline")?; - let parallel_size_calcs = Arc::clone(limit); - joinset.spawn(calculate_logical_size( - parallel_size_calcs, - timeline.clone(), - lsn, - ctx.attached_child(), - )); - - if let Some(parent_id) = timeline.get_ancestor_timeline_id() { - // we should not find new ones because we iterated tenants all timelines - anyhow::ensure!( - timeline_inputs.contains_key(&parent_id), - "discovered new timeline {parent_id} (parent of {timeline_id})" - ); - } - }; - } - - // finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch - // point. this is needed by the model. - for (timeline_id, inputs) in timeline_inputs.iter() { - let lsn = inputs.last_record; - - if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) { - // this means that the (timeline_id, last_record_lsn) represents a branch point - // we do not want to add EndOfBranch updates for these points because it doesn't fit - // into the current tenant_size_model. - continue; - } - - if lsn > inputs.ancestor_lsn { - // all timelines also have an end point if they have made any progress - updates.push(Update { - lsn, - command: Command::EndOfBranch, - timeline_id: *timeline_id, - }); + e.insert(cached_size); } } + // Perform the size lookups let mut have_any_error = false; - while let Some(res) = joinset.join_next().await { // each of these come with Result, JoinError> // because of spawn + spawn_blocking @@ -460,19 +402,13 @@ pub(super) async fn gather_inputs( debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated"); logical_size_cache.insert((timeline.timeline_id, lsn), size); - needed_cache.insert((timeline.timeline_id, lsn)); - - updates.push(Update { - lsn, - timeline_id: timeline.timeline_id, - command: Command::Update(size), - }); + sizes_needed.insert((timeline.timeline_id, lsn), Some(size)); } } } // prune any keys not needed anymore; we record every used key and added key. - logical_size_cache.retain(|key, _| needed_cache.contains(key)); + logical_size_cache.retain(|key, _| sizes_needed.contains_key(key)); if have_any_error { // we cannot complete this round, because we are missing data. @@ -480,105 +416,47 @@ pub(super) async fn gather_inputs( anyhow::bail!("failed to calculate some logical_sizes"); } - // the data gathered to updates is per lsn, regardless of the branch, so we can use it to - // our advantage, not requiring a sorted container or graph walk. - // - // for branch points, which come as multiple updates at the same LSN, the Command::Update - // is needed before a branch is made out of that branch Command::BranchFrom. this is - // handled by the variant order in `Command`. - // - updates.sort_unstable(); - - // And another sort to handle Command::BranchFrom ordering - // in case when there are multiple branches at the same LSN. - let sorted_updates = sort_updates_in_tree_order(updates)?; - - let retention_period = match max_cutoff_distance { - Some(max) => max.0, - None => { - anyhow::bail!("the first branch should have a gc_cutoff after it's branch point at 0") + // Insert the looked up sizes to the Segments + for seg in segments.iter_mut() { + if !seg.size_needed() { + continue; } - }; - Ok(ModelInputs { - updates: sorted_updates, - retention_period, - timeline_inputs, - }) + let timeline_id = seg.timeline_id; + let lsn = Lsn(seg.segment.lsn); + + if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) { + seg.segment.size = Some(*size); + } else { + bail!("could not find size at {} in timeline {}", lsn, timeline_id); + } + } + Ok(()) } impl ModelInputs { + pub fn calculate_model(&self) -> anyhow::Result { + // Convert SegmentMetas into plain Segments + let storage = StorageModel { + segments: self + .segments + .iter() + .map(|seg| seg.segment.clone()) + .collect(), + }; + + Ok(storage) + } + + // calculate total project size pub fn calculate(&self) -> anyhow::Result { - // Option is used for "naming" the branches because it is assumed to be - // impossible to always determine the a one main branch. - let mut storage = tenant_size_model::Storage::>::new(None); + let storage = self.calculate_model()?; + let sizes = storage.calculate(); - for update in &self.updates { - let Update { - lsn, - command: op, - timeline_id, - } = update; - - let Lsn(now) = *lsn; - match op { - Command::Update(sz) => { - storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?; - } - Command::EndOfBranch => { - storage.insert_point(&Some(*timeline_id), "".into(), now, None)?; - } - Command::BranchFrom(parent) => { - // This branch command may fail if it cannot find a parent to branch from. - storage.branch(parent, Some(*timeline_id))?; - } - } - } - - Ok(storage.calculate(self.retention_period)?.total_children()) + Ok(sizes.total_size) } } -/// A point of interest in the tree of branches -#[serde_with::serde_as] -#[derive( - Debug, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize, -)] -struct Update { - #[serde_as(as = "serde_with::DisplayFromStr")] - lsn: utils::lsn::Lsn, - command: Command, - #[serde_as(as = "serde_with::DisplayFromStr")] - timeline_id: TimelineId, -} - -#[serde_with::serde_as] -#[derive(PartialOrd, PartialEq, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "snake_case")] -enum Command { - Update(u64), - BranchFrom(#[serde_as(as = "Option")] Option), - EndOfBranch, -} - -impl std::fmt::Debug for Command { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // custom one-line implementation makes it more enjoyable to read {:#?} avoiding 3 - // linebreaks - match self { - Self::Update(arg0) => write!(f, "Update({arg0})"), - Self::BranchFrom(arg0) => write!(f, "BranchFrom({arg0:?})"), - Self::EndOfBranch => write!(f, "EndOfBranch"), - } - } -} - -#[derive(Debug, Clone, Copy)] -enum LsnKind { - BranchPoint, - GcCutOff, -} - /// Newtype around the tuple that carries the timeline at lsn logical size calculation. struct TimelineAtLsnSizeResult( Arc, @@ -604,227 +482,230 @@ async fn calculate_logical_size( Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) } -#[test] -fn updates_sort() { - use std::str::FromStr; - use utils::id::TimelineId; - use utils::lsn::Lsn; - - let ids = [ - TimelineId::from_str("7ff1edab8182025f15ae33482edb590a").unwrap(), - TimelineId::from_str("b1719e044db05401a05a2ed588a3ad3f").unwrap(), - TimelineId::from_str("b68d6691c895ad0a70809470020929ef").unwrap(), - ]; - - // try through all permutations - let ids = [ - [&ids[0], &ids[1], &ids[2]], - [&ids[0], &ids[2], &ids[1]], - [&ids[1], &ids[0], &ids[2]], - [&ids[1], &ids[2], &ids[0]], - [&ids[2], &ids[0], &ids[1]], - [&ids[2], &ids[1], &ids[0]], - ]; - - for ids in ids { - // apply a fixture which uses a permutation of ids - let commands = [ - Update { - lsn: Lsn(0), - command: Command::BranchFrom(None), - timeline_id: *ids[0], - }, - Update { - lsn: Lsn::from_str("0/67E7618").unwrap(), - command: Command::Update(43696128), - timeline_id: *ids[0], - }, - Update { - lsn: Lsn::from_str("0/67E7618").unwrap(), - command: Command::BranchFrom(Some(*ids[0])), - timeline_id: *ids[1], - }, - Update { - lsn: Lsn::from_str("0/76BE4F0").unwrap(), - command: Command::Update(41844736), - timeline_id: *ids[1], - }, - Update { - lsn: Lsn::from_str("0/10E49380").unwrap(), - command: Command::Update(42164224), - timeline_id: *ids[0], - }, - Update { - lsn: Lsn::from_str("0/10E49380").unwrap(), - command: Command::BranchFrom(Some(*ids[0])), - timeline_id: *ids[2], - }, - Update { - lsn: Lsn::from_str("0/11D74910").unwrap(), - command: Command::Update(42172416), - timeline_id: *ids[2], - }, - Update { - lsn: Lsn::from_str("0/12051E98").unwrap(), - command: Command::Update(42196992), - timeline_id: *ids[0], - }, - ]; - - let mut sorted = commands; - - // these must sort in the same order, regardless of how the ids sort - // which is why the timeline_id is the last field - sorted.sort_unstable(); - - assert_eq!(commands, sorted, "{:#?} vs. {:#?}", commands, sorted); - } -} - #[test] fn verify_size_for_multiple_branches() { // this is generated from integration test test_tenant_size_with_multiple_branches, but this way // it has the stable lsn's // - // timelineinputs have been left out, because those explain the inputs, but don't participate - // in further size calculations. - let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#; - + // The timeline_inputs don't participate in the size calculation, and are here just to explain + // the inputs. + let doc = r#" +{ + "segments": [ + { + "segment": { + "parent": 9, + "lsn": 26033560, + "size": null, + "needed": false + }, + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 0, + "lsn": 35720400, + "size": 25206784, + "needed": false + }, + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 1, + "lsn": 35851472, + "size": null, + "needed": true + }, + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "kind": "BranchEnd" + }, + { + "segment": { + "parent": 7, + "lsn": 24566168, + "size": null, + "needed": false + }, + "timeline_id": "454626700469f0a9914949b9d018e876", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 3, + "lsn": 25261936, + "size": 26050560, + "needed": false + }, + "timeline_id": "454626700469f0a9914949b9d018e876", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 4, + "lsn": 25393008, + "size": null, + "needed": true + }, + "timeline_id": "454626700469f0a9914949b9d018e876", + "kind": "BranchEnd" + }, + { + "segment": { + "parent": null, + "lsn": 23694408, + "size": null, + "needed": false + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 6, + "lsn": 24566168, + "size": 25739264, + "needed": false + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchPoint" + }, + { + "segment": { + "parent": 7, + "lsn": 25902488, + "size": 26402816, + "needed": false + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 8, + "lsn": 26033560, + "size": 26468352, + "needed": true + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchPoint" + }, + { + "segment": { + "parent": 9, + "lsn": 26033560, + "size": null, + "needed": true + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchEnd" + } + ], + "timeline_inputs": [ + { + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "ancestor_lsn": "0/18D3D98", + "last_record": "0/2230CD0", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/2210CD0", + "pitr_cutoff": "0/2210CD0", + "next_gc_cutoff": "0/2210CD0", + "retention_param_cutoff": null + }, + { + "timeline_id": "454626700469f0a9914949b9d018e876", + "ancestor_lsn": "0/176D998", + "last_record": "0/1837770", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/1817770", + "pitr_cutoff": "0/1817770", + "next_gc_cutoff": "0/1817770", + "retention_param_cutoff": null + }, + { + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "ancestor_lsn": "0/0", + "last_record": "0/18D3D98", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/18B3D98", + "pitr_cutoff": "0/18B3D98", + "next_gc_cutoff": "0/18B3D98", + "retention_param_cutoff": null + } + ] +} +"#; let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); - assert_eq!(inputs.calculate().unwrap(), 36_409_872); + assert_eq!(inputs.calculate().unwrap(), 37_851_408); } #[test] -fn updates_sort_with_branches_at_same_lsn() { - use std::str::FromStr; - use Command::{BranchFrom, EndOfBranch}; - - macro_rules! lsn { - ($e:expr) => { - Lsn::from_str($e).unwrap() - }; +fn verify_size_for_one_branch() { + let doc = r#" +{ + "segments": [ + { + "segment": { + "parent": null, + "lsn": 0, + "size": null, + "needed": false + }, + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 0, + "lsn": 305547335776, + "size": 220054675456, + "needed": false + }, + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 1, + "lsn": 305614444640, + "size": null, + "needed": true + }, + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "kind": "BranchEnd" } + ], + "timeline_inputs": [ + { + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "ancestor_lsn": "0/0", + "last_record": "47/280A5860", + "latest_gc_cutoff": "47/240A5860", + "horizon_cutoff": "47/240A5860", + "pitr_cutoff": "47/240A5860", + "next_gc_cutoff": "47/240A5860", + "retention_param_cutoff": "0/0" + } + ] +}"#; - let ids = [ - TimelineId::from_str("00000000000000000000000000000000").unwrap(), - TimelineId::from_str("11111111111111111111111111111111").unwrap(), - TimelineId::from_str("22222222222222222222222222222222").unwrap(), - TimelineId::from_str("33333333333333333333333333333333").unwrap(), - TimelineId::from_str("44444444444444444444444444444444").unwrap(), - ]; + let model: ModelInputs = serde_json::from_str(doc).unwrap(); - // issue https://github.com/neondatabase/neon/issues/3179 - let commands = vec![ - Update { - lsn: lsn!("0/0"), - command: BranchFrom(None), - timeline_id: ids[0], - }, - Update { - lsn: lsn!("0/169AD58"), - command: Command::Update(25387008), - timeline_id: ids[0], - }, - // next three are wrongly sorted, because - // ids[1] is branched from before ids[1] exists - // and ids[2] is branched from before ids[2] exists - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[0])), - timeline_id: ids[2], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[2])), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CA85B8"), - command: Command::Update(28925952), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: Command::Update(29024256), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[4], - }, - Update { - lsn: lsn!("0/22DCE70"), - command: Command::Update(32546816), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/230CE70"), - command: EndOfBranch, - timeline_id: ids[3], - }, - ]; + let res = model.calculate_model().unwrap().calculate(); - let expected = vec![ - Update { - lsn: lsn!("0/0"), - command: BranchFrom(None), - timeline_id: ids[0], - }, - Update { - lsn: lsn!("0/169AD58"), - command: Command::Update(25387008), - timeline_id: ids[0], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[0])), - timeline_id: ids[2], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[2])), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/1CA85B8"), - command: Command::Update(28925952), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: Command::Update(29024256), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[4], - }, - Update { - lsn: lsn!("0/22DCE70"), - command: Command::Update(32546816), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/230CE70"), - command: EndOfBranch, - timeline_id: ids[3], - }, - ]; + println!("calculated synthetic size: {}", res.total_size); + println!("result: {:?}", serde_json::to_string(&res.segments)); - let sorted_commands = sort_updates_in_tree_order(commands).unwrap(); - - assert_eq!(sorted_commands, expected); + use utils::lsn::Lsn; + let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap(); + let last_lsn: Lsn = "47/280A5860".parse().unwrap(); + println!( + "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}", + u64::from(latest_gc_cutoff_lsn), + u64::from(last_lsn) + ); + assert_eq!(res.total_size, 220121784320); } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c943bf0a27..21c6ede27e 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -27,8 +27,7 @@ use std::fs::OpenOptions; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::ops::{Deref, DerefMut}; -use std::os::fd::RawFd; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::process::Stdio; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b35252243e..0620ad8a35 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1217,7 +1217,7 @@ class PageserverHttpClient(requests.Session): """ Returns the tenant size, together with the model inputs as the second tuple item. """ - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/size") + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size") self.verbose_error(res) res = res.json() assert isinstance(res, dict) @@ -1228,6 +1228,16 @@ class PageserverHttpClient(requests.Session): assert type(inputs) is dict return (size, inputs) + def tenant_size_debug(self, tenant_id: TenantId) -> str: + """ + Returns the tenant size debug info, as an HTML string + """ + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size", + headers={"Accept": "text/html"}, + ) + return res.text + def timeline_list( self, tenant_id: TenantId, diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index bb3bca8782..8c2996f491 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -1,13 +1,13 @@ -from typing import Any, List, Tuple +from pathlib import Path +from typing import List, Tuple import pytest from fixtures.log_helper import log -from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn from fixtures.types import Lsn -def test_empty_tenant_size(neon_simple_env: NeonEnv): +def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path): env = neon_simple_env (tenant_id, _) = env.neon_cli.create_tenant() http_client = env.pageserver.http_client() @@ -18,6 +18,9 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): main_branch_name = "main" + branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0] + assert branch_name == main_branch_name + with env.postgres.create_start( main_branch_name, tenant_id=tenant_id, @@ -39,12 +42,44 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): size, inputs = http_client.tenant_size_and_modelinputs(tenant_id) assert size == initial_size, "tenant_size should not be affected by shutdown of compute" - expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"] - actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore - assert actual_commands == expected_commands + expected_inputs = { + "segments": [ + { + "segment": {"parent": None, "lsn": 23694408, "size": 25362432, "needed": True}, + "timeline_id": f"{main_timeline_id}", + "kind": "BranchStart", + }, + { + "segment": {"parent": 0, "lsn": 23694528, "size": None, "needed": True}, + "timeline_id": f"{main_timeline_id}", + "kind": "BranchEnd", + }, + ], + "timeline_inputs": [ + { + "timeline_id": f"{main_timeline_id}", + "ancestor_id": None, + "ancestor_lsn": "0/0", + "last_record": "0/1698CC0", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/0", + "pitr_cutoff": "0/0", + "next_gc_cutoff": "0/0", + "retention_param_cutoff": None, + } + ], + } + expected_inputs = mask_model_inputs(expected_inputs) + actual_inputs = mask_model_inputs(inputs) + + assert expected_inputs == actual_inputs + + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) -def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): +def test_branched_empty_timeline_size(neon_simple_env: NeonEnv, test_output_dir: Path): """ Issue found in production. Because the ancestor branch was under gc_horizon, the branchpoint was "dangling" and the computation could not be @@ -75,8 +110,12 @@ def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): assert size_after_branching > initial_size + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) -def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): + +def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv, test_output_dir: Path): """ More general version of test_branched_empty_timeline_size @@ -128,9 +167,13 @@ def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): size_after_writes = http_client.tenant_size(tenant_id) assert size_after_writes > initial_size + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + @pytest.mark.skip("This should work, but is left out because assumed covered by other tests") -def test_branch_point_within_horizon(neon_simple_env: NeonEnv): +def test_branch_point_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path): """ gc_horizon = 15 @@ -167,9 +210,13 @@ def test_branch_point_within_horizon(neon_simple_env: NeonEnv): assert size_before_branching < size_after + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + @pytest.mark.skip("This should work, but is left out because assumed covered by other tests") -def test_parent_within_horizon(neon_simple_env: NeonEnv): +def test_parent_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path): """ gc_horizon = 5 @@ -179,7 +226,7 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv): """ env = neon_simple_env - gc_horizon = 200_000 + gc_horizon = 5_000 (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) http_client = env.pageserver.http_client() @@ -212,9 +259,13 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv): assert size_before_branching < size_after + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + @pytest.mark.skip("This should work, but is left out because assumed covered by other tests") -def test_only_heads_within_horizon(neon_simple_env: NeonEnv): +def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path): """ gc_horizon = small @@ -253,8 +304,14 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv): latest_size = size_now + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) -def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): + +def test_single_branch_get_tenant_size_grows( + neon_env_builder: NeonEnvBuilder, test_output_dir: Path +): """ Operate on single branch reading the tenants size after each transaction. """ @@ -279,7 +336,20 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): collected_responses: List[Tuple[Lsn, int]] = [] + size_debug_file = open(test_output_dir / "size_debug.html", "w") + + def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int): + if current_lsn - initdb_lsn > gc_horizon: + assert ( + size >= prev + ), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size" + else: + assert ( + size > prev + ), "tenant_size should grow, because we continue to add WAL to initial snapshot size" + with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) with pg.cursor() as cur: cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)") @@ -297,13 +367,19 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) - size = http_client.tenant_size(tenant_id) + size, sizes = http_client.tenant_size_and_modelinputs(tenant_id) + + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) if len(collected_responses) > 0: prev = collected_responses[-1][1] if size == 0: assert prev == 0 else: + # branch start shouldn't be past gc_horizon yet + # thus the size should grow as we insert more data + assert current_lsn - initdb_lsn <= gc_horizon assert size > prev collected_responses.append((current_lsn, size)) @@ -323,9 +399,15 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) - size = http_client.tenant_size(tenant_id) + size, sizes = http_client.tenant_size_and_modelinputs(tenant_id) + + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + prev = collected_responses[-1][1] - assert size > prev, "tenant_size should grow with updates" + + check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev) + collected_responses.append((current_lsn, size)) while True: @@ -340,9 +422,9 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): size = http_client.tenant_size(tenant_id) prev = collected_responses[-1][1] - assert ( - size > prev - ), "even though rows have been deleted, the tenant_size should increase" + + check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev) + collected_responses.append((current_lsn, size)) with pg.cursor() as cur: @@ -352,7 +434,9 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): size = http_client.tenant_size(tenant_id) prev = collected_responses[-1][1] - assert size > prev, "dropping table grows tenant_size" + + check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev) + collected_responses.append((current_lsn, size)) # this isn't too many lines to forget for a while. observed while @@ -364,24 +448,17 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): env.pageserver.stop() env.pageserver.start() + size_debug_file.close() + size_after = http_client.tenant_size(tenant_id) prev = collected_responses[-1][1] assert size_after == prev, "size after restarting pageserver should not have changed" - ps_metrics = parse_metrics(http_client.get_metrics(), "pageserver") - tenant_metric_filter = { - "tenant_id": str(tenant_id), - } - tenant_size_metric = int( - ps_metrics.query_one("pageserver_tenant_synthetic_size", filter=tenant_metric_filter).value - ) - - assert tenant_size_metric == size_after, "API size value should be equal to metric size value" - - -def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder): +def test_get_tenant_size_with_multiple_branches( + neon_env_builder: NeonEnvBuilder, test_output_dir: Path +): """ Reported size goes up while branches or rows are being added, goes down after removing branches. """ @@ -481,6 +558,10 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder size_after = http_client.tenant_size(tenant_id) assert size_after == size_after_thinning_branch + size_debug_file_before = open(test_output_dir / "size_debug_before.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file_before.write(size_debug) + # teardown, delete branches, and the size should be going down http_client.timeline_delete(tenant_id, first_branch_timeline_id) @@ -493,3 +574,38 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder assert size_after_deleting_second < size_after_continuing_on_main assert size_after_deleting_second > size_after_first_branch + + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + + +# Helper for tests that compare timeline_inputs +# We don't want to compare the exact values, because they can be unstable +# and cause flaky tests. So replace the values with useful invariants. +def mask_model_inputs(x): + if isinstance(x, dict): + newx = {} + for k, v in x.items(): + if k == "size": + if v is None or v == 0: + # no change + newx[k] = v + elif v < 0: + newx[k] = "<0" + else: + newx[k] = ">0" + elif k.endswith("lsn") or k.endswith("cutoff") or k == "last_record": + if v is None or v == 0 or v == "0/0": + # no change + newx[k] = v + else: + newx[k] = "masked" + else: + newx[k] = mask_model_inputs(v) + return newx + elif isinstance(x, list): + newlist = [mask_model_inputs(v) for v in x] + return newlist + else: + return x