diff --git a/Cargo.lock b/Cargo.lock index 67e54d3833..98c4dca09b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3809,6 +3809,8 @@ name = "tenant_size_model" version = "0.1.0" dependencies = [ "anyhow", + "serde", + "serde_json", "workspace_hack", ] diff --git a/libs/tenant_size_model/Cargo.toml b/libs/tenant_size_model/Cargo.toml index a5f0160f35..15e78932a8 100644 --- a/libs/tenant_size_model/Cargo.toml +++ b/libs/tenant_size_model/Cargo.toml @@ -7,5 +7,7 @@ license.workspace = true [dependencies] anyhow.workspace = true +serde.workspace = true +serde_json.workspace = true workspace_hack.workspace = true diff --git a/libs/tenant_size_model/src/calculation.rs b/libs/tenant_size_model/src/calculation.rs new file mode 100644 index 0000000000..093b053675 --- /dev/null +++ b/libs/tenant_size_model/src/calculation.rs @@ -0,0 +1,219 @@ +use crate::{SegmentMethod, SegmentSizeResult, SizeResult, StorageModel}; + +// +// *-g--*---D---> +// / +// / +// / *---b----*-B---> +// / / +// / / +// -----*--e---*-----f----* C +// E \ +// \ +// *--a---*---A--> +// +// If A and B need to be retained, is it cheaper to store +// snapshot at C+a+b, or snapshots at A and B ? +// +// If D also needs to be retained, which is cheaper: +// +// 1. E+g+e+f+a+b +// 2. D+C+a+b +// 3. D+A+B + +/// [`Segment`] which has had it's size calculated. +#[derive(Clone, Debug)] +struct SegmentSize { + method: SegmentMethod, + + // calculated size of this subtree, using this method + accum_size: u64, + + seg_id: usize, + children: Vec, +} + +struct SizeAlternatives { + // cheapest alternative if parent is available. + incremental: SegmentSize, + + // cheapest alternative if parent node is not available + non_incremental: Option, +} + +impl StorageModel { + pub fn calculate(&self) -> SizeResult { + // Build adjacency list. 'child_list' is indexed by segment id. Each entry + // contains a list of all child segments of the segment. + let mut roots: Vec = Vec::new(); + let mut child_list: Vec> = Vec::new(); + child_list.resize(self.segments.len(), Vec::new()); + + for (seg_id, seg) in self.segments.iter().enumerate() { + if let Some(parent_id) = seg.parent { + child_list[parent_id].push(seg_id); + } else { + roots.push(seg_id); + } + } + + let mut segment_results = Vec::new(); + segment_results.resize( + self.segments.len(), + SegmentSizeResult { + method: SegmentMethod::Skipped, + accum_size: 0, + }, + ); + + let mut total_size = 0; + for root in roots { + if let Some(selected) = self.size_here(root, &child_list).non_incremental { + StorageModel::fill_selected_sizes(&selected, &mut segment_results); + total_size += selected.accum_size; + } else { + // Couldn't find any way to get this root. Error? + } + } + + SizeResult { + total_size, + segments: segment_results, + } + } + + fn fill_selected_sizes(selected: &SegmentSize, result: &mut Vec) { + result[selected.seg_id] = SegmentSizeResult { + method: selected.method, + accum_size: selected.accum_size, + }; + // recurse to children + for child in selected.children.iter() { + StorageModel::fill_selected_sizes(child, result); + } + } + + // + // This is the core of the sizing calculation. + // + // This is a recursive function, that for each Segment calculates the best way + // to reach all the Segments that are marked as needed in this subtree, under two + // different conditions: + // a) when the parent of this segment is available (as a snaphot or through WAL), and + // b) when the parent of this segment is not available. + // + fn size_here(&self, seg_id: usize, child_list: &Vec>) -> SizeAlternatives { + let seg = &self.segments[seg_id]; + // First figure out the best way to get each child + let mut children = Vec::new(); + for child_id in &child_list[seg_id] { + children.push(self.size_here(*child_id, child_list)) + } + + // Method 1. If this node is not needed, we can skip it as long as we + // take snapshots later in each sub-tree + let snapshot_later = if !seg.needed { + let mut snapshot_later = SegmentSize { + seg_id, + method: SegmentMethod::Skipped, + accum_size: 0, + children: Vec::new(), + }; + + let mut possible = true; + for child in children.iter() { + if let Some(non_incremental) = &child.non_incremental { + snapshot_later.accum_size += non_incremental.accum_size; + snapshot_later.children.push(non_incremental.clone()) + } else { + possible = false; + break; + } + } + if possible { + Some(snapshot_later) + } else { + None + } + } else { + None + }; + + // Method 2. Get a snapshot here. This assumed to be possible, if the 'size' of + // this Segment was given. + let snapshot_here = if !seg.needed || seg.parent.is_none() { + if let Some(snapshot_size) = seg.size { + let mut snapshot_here = SegmentSize { + seg_id, + method: SegmentMethod::SnapshotHere, + accum_size: snapshot_size, + children: Vec::new(), + }; + for child in children.iter() { + snapshot_here.accum_size += child.incremental.accum_size; + snapshot_here.children.push(child.incremental.clone()) + } + Some(snapshot_here) + } else { + None + } + } else { + None + }; + + // Method 3. Use WAL to get here from parent + let wal_here = { + let mut wal_here = SegmentSize { + seg_id, + method: SegmentMethod::Wal, + accum_size: if let Some(parent_id) = seg.parent { + seg.lsn - self.segments[parent_id].lsn + } else { + 0 + }, + children: Vec::new(), + }; + for child in children { + wal_here.accum_size += child.incremental.accum_size; + wal_here.children.push(child.incremental) + } + wal_here + }; + + // If the parent is not available, what's the cheapest method involving + // a snapshot here or later? + let mut cheapest_non_incremental: Option = None; + if let Some(snapshot_here) = snapshot_here { + cheapest_non_incremental = Some(snapshot_here); + } + if let Some(snapshot_later) = snapshot_later { + // Use <=, to prefer skipping if the size is equal + if let Some(parent) = &cheapest_non_incremental { + if snapshot_later.accum_size <= parent.accum_size { + cheapest_non_incremental = Some(snapshot_later); + } + } else { + cheapest_non_incremental = Some(snapshot_later); + } + } + + // And what's the cheapest method, if the parent is available? + let cheapest_incremental = if let Some(cheapest_non_incremental) = &cheapest_non_incremental + { + // Is it cheaper to use a snapshot here or later, anyway? + // Use <, to prefer Wal over snapshot if the cost is the same + if wal_here.accum_size < cheapest_non_incremental.accum_size { + wal_here + } else { + cheapest_non_incremental.clone() + } + } else { + wal_here + }; + + SizeAlternatives { + incremental: cheapest_incremental, + non_incremental: cheapest_non_incremental, + } + } +} diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs index b156e1be9d..c151e3b42c 100644 --- a/libs/tenant_size_model/src/lib.rs +++ b/libs/tenant_size_model/src/lib.rs @@ -1,401 +1,70 @@ -use std::borrow::Cow; -use std::collections::HashMap; +//! Synthetic size calculation -use anyhow::Context; +mod calculation; +pub mod svg; -/// Pricing model or history size builder. +/// StorageModel is the input to the synthetic size calculation. It represents +/// a tree of timelines, with just the information that's needed for the +/// calculation. This doesn't track timeline names or where each timeline +/// begins and ends, for example. Instead, it consists of "points of interest" +/// on the timelines. A point of interest could be the timeline start or end point, +/// the oldest point on a timeline that needs to be retained because of PITR +/// cutoff, or snapshot points named by the user. For each such point, and the +/// edge connecting the points (implicit in Segment), we store information about +/// whether we need to be able to recover to the point, and if known, the logical +/// size at the point. /// -/// Maintains knowledge of the branches and their modifications. Generic over the branch name key -/// type. -pub struct Storage { - segments: Vec, - - /// Mapping from the branch name to the index of a segment describing it's latest state. - branches: HashMap, +/// The segments must form a well-formed tree, with no loops. +#[derive(serde::Serialize)] +pub struct StorageModel { + pub segments: Vec, } -/// Snapshot of a branch. -#[derive(Clone, Debug, Eq, PartialEq)] +/// Segment represents one point in the tree of branches, *and* the edge that leads +/// to it (if any). We don't need separate structs for points and edges, because each +/// point can have only one parent. +/// +/// When 'needed' is true, it means that we need to be able to reconstruct +/// any version between 'parent.lsn' and 'lsn'. If you want to represent that only +/// a single point is needed, create two Segments with the same lsn, and mark only +/// the child as needed. +/// +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub struct Segment { /// Previous segment index into ['Storage::segments`], if any. - parent: Option, + pub parent: Option, - /// Description of how did we get to this state. - /// - /// Mainly used in the original scenarios 1..=4 with insert, delete and update. Not used when - /// modifying a branch directly. - pub op: Cow<'static, str>, + /// LSN at this point + pub lsn: u64, - /// LSN before this state - start_lsn: u64, + /// Logical size at this node, if known. + pub size: Option, - /// LSN at this state - pub end_lsn: u64, - - /// Logical size before this state - start_size: u64, - - /// Logical size at this state. Can be None in the last Segment of a branch. - pub end_size: Option, - - /// Indices to [`Storage::segments`] - /// - /// FIXME: this could be an Option - children_after: Vec, - - /// Determined by `retention_period` given to [`Storage::calculate`] + /// If true, the segment from parent to this node is needed by `retention_period` pub needed: bool, } -// -// -// -// -// *-g--*---D---> -// / -// / -// / *---b----*-B---> -// / / -// / / -// -----*--e---*-----f----* C -// E \ -// \ -// *--a---*---A--> -// -// If A and B need to be retained, is it cheaper to store -// snapshot at C+a+b, or snapshots at A and B ? -// -// If D also needs to be retained, which is cheaper: -// -// 1. E+g+e+f+a+b -// 2. D+C+a+b -// 3. D+A+B +/// Result of synthetic size calculation. Returned by StorageModel::calculate() +pub struct SizeResult { + pub total_size: u64, -/// [`Segment`] which has had it's size calculated. -pub struct SegmentSize { - pub seg_id: usize, - - pub method: SegmentMethod, - - this_size: u64, - - pub children: Vec, + // This has same length as the StorageModel::segments vector in the input. + // Each entry in this array corresponds to the entry with same index in + // StorageModel::segments. + pub segments: Vec, } -impl SegmentSize { - fn total(&self) -> u64 { - self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total()) - } - - pub fn total_children(&self) -> u64 { - if self.method == SnapshotAfter { - self.this_size + self.children.iter().fold(0, |acc, x| acc + x.total()) - } else { - self.children.iter().fold(0, |acc, x| acc + x.total()) - } - } +#[derive(Clone, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct SegmentSizeResult { + pub method: SegmentMethod, + // calculated size of this subtree, using this method + pub accum_size: u64, } /// Different methods to retain history from a particular state -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub enum SegmentMethod { - SnapshotAfter, - Wal, - WalNeeded, + SnapshotHere, // A logical snapshot is needed after this segment + Wal, // Keep WAL leading up to this node Skipped, } - -use SegmentMethod::*; - -impl Storage { - /// Creates a new storage with the given default branch name. - pub fn new(initial_branch: K) -> Storage { - let init_segment = Segment { - op: "".into(), - needed: false, - parent: None, - start_lsn: 0, - end_lsn: 0, - start_size: 0, - end_size: Some(0), - children_after: Vec::new(), - }; - - Storage { - segments: vec![init_segment], - branches: HashMap::from([(initial_branch, 0)]), - } - } - - /// Advances the branch with a new point, at given LSN. - pub fn insert_point( - &mut self, - branch: &Q, - op: Cow<'static, str>, - lsn: u64, - size: Option, - ) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; - let newseg_id = self.segments.len(); - let lastseg = &mut self.segments[lastseg_id]; - - assert!(lsn > lastseg.end_lsn); - - let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; - - let newseg = Segment { - op, - parent: Some(lastseg_id), - start_lsn: lastseg.end_lsn, - end_lsn: lsn, - start_size, - end_size: size, - children_after: Vec::new(), - needed: false, - }; - lastseg.children_after.push(newseg_id); - - self.segments.push(newseg); - *self.branches.get_mut(branch).expect("read already") = newseg_id; - - Ok(()) - } - - /// Advances the branch with the named operation, by the relative LSN and logical size bytes. - pub fn modify_branch( - &mut self, - branch: &Q, - op: Cow<'static, str>, - lsn_bytes: u64, - size_bytes: i64, - ) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; - let newseg_id = self.segments.len(); - let lastseg = &mut self.segments[lastseg_id]; - - let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; - - let newseg = Segment { - op, - parent: Some(lastseg_id), - start_lsn: lastseg.end_lsn, - end_lsn: lastseg.end_lsn + lsn_bytes, - start_size: last_end_size, - end_size: Some((last_end_size as i64 + size_bytes) as u64), - children_after: Vec::new(), - needed: false, - }; - lastseg.children_after.push(newseg_id); - - self.segments.push(newseg); - *self.branches.get_mut(branch).expect("read already") = newseg_id; - Ok(()) - } - - pub fn insert(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - self.modify_branch(branch, "insert".into(), bytes, bytes as i64) - } - - pub fn update(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - self.modify_branch(branch, "update".into(), bytes, 0i64) - } - - pub fn delete(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> - where - K: std::borrow::Borrow, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)) - } - - pub fn branch(&mut self, parent: &Q, name: K) -> anyhow::Result<()> - where - K: std::borrow::Borrow + std::fmt::Debug, - Q: std::hash::Hash + Eq + std::fmt::Debug, - { - // Find the right segment - let branchseg_id = *self.branches.get(parent).with_context(|| { - format!( - "should had found the parent {:?} by key. in branches {:?}", - parent, self.branches - ) - })?; - - let _branchseg = &mut self.segments[branchseg_id]; - - // Create branch name for it - self.branches.insert(name, branchseg_id); - Ok(()) - } - - pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result { - // Phase 1: Mark all the segments that need to be retained - for (_branch, &last_seg_id) in self.branches.iter() { - let last_seg = &self.segments[last_seg_id]; - let cutoff_lsn = last_seg.start_lsn.saturating_sub(retention_period); - let mut seg_id = last_seg_id; - loop { - let seg = &mut self.segments[seg_id]; - if seg.end_lsn < cutoff_lsn { - break; - } - seg.needed = true; - if let Some(prev_seg_id) = seg.parent { - seg_id = prev_seg_id; - } else { - break; - } - } - } - - // Phase 2: For each oldest segment in a chain that needs to be retained, - // calculate if we should store snapshot or WAL - self.size_from_snapshot_later(0) - } - - fn size_from_wal(&self, seg_id: usize) -> anyhow::Result { - let seg = &self.segments[seg_id]; - - let this_size = seg.end_lsn - seg.start_lsn; - - let mut children = Vec::new(); - - // try both ways - for &child_id in seg.children_after.iter() { - // try each child both ways - let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id)?; - - let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id)?; - if p1.total() < p2.total() { - p1 - } else { - p2 - } - } else { - p1 - }; - children.push(p); - } - Ok(SegmentSize { - seg_id, - method: if seg.needed { WalNeeded } else { Wal }, - this_size, - children, - }) - } - - fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result { - // If this is needed, then it's time to do the snapshot and continue - // with wal method. - let seg = &self.segments[seg_id]; - //eprintln!("snap: seg{}: {} needed: {}", seg_id, seg.children_after.len(), seg.needed); - if seg.needed { - let mut children = Vec::new(); - - for &child_id in seg.children_after.iter() { - // try each child both ways - let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id)?; - - let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id)?; - if p1.total() < p2.total() { - p1 - } else { - p2 - } - } else { - p1 - }; - children.push(p); - } - Ok(SegmentSize { - seg_id, - method: WalNeeded, - this_size: seg.start_size, - children, - }) - } else { - // If any of the direct children are "needed", need to be able to reconstruct here - let mut children_needed = false; - for &child in seg.children_after.iter() { - let seg = &self.segments[child]; - if seg.needed { - children_needed = true; - break; - } - } - - let method1 = if !children_needed { - let mut children = Vec::new(); - for child in seg.children_after.iter() { - children.push(self.size_from_snapshot_later(*child)?); - } - Some(SegmentSize { - seg_id, - method: Skipped, - this_size: 0, - children, - }) - } else { - None - }; - - // If this a junction, consider snapshotting here - let method2 = if children_needed || seg.children_after.len() >= 2 { - let mut children = Vec::new(); - for child in seg.children_after.iter() { - children.push(self.size_from_wal(*child)?); - } - let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") }; - Some(SegmentSize { - seg_id, - method: SnapshotAfter, - this_size, - children, - }) - } else { - None - }; - - Ok(match (method1, method2) { - (None, None) => anyhow::bail!( - "neither method was applicable: children_after={}, children_needed={}", - seg.children_after.len(), - children_needed - ), - (Some(method), None) => method, - (None, Some(method)) => method, - (Some(method1), Some(method2)) => { - if method1.total() < method2.total() { - method1 - } else { - method2 - } - } - }) - } - } - - pub fn into_segments(self) -> Vec { - self.segments - } -} diff --git a/libs/tenant_size_model/src/main.rs b/libs/tenant_size_model/src/main.rs deleted file mode 100644 index e32dd055f4..0000000000 --- a/libs/tenant_size_model/src/main.rs +++ /dev/null @@ -1,269 +0,0 @@ -//! Tenant size model testing ground. -//! -//! Has a number of scenarios and a `main` for invoking these by number, calculating the history -//! size, outputs graphviz graph. Makefile in directory shows how to use graphviz to turn scenarios -//! into pngs. - -use tenant_size_model::{Segment, SegmentSize, Storage}; - -// Main branch only. Some updates on it. -fn scenario_1() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -// Main branch only. Some updates on it. -fn scenario_2() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - // Branch - storage.branch("main", "child")?; - storage.update("child", 1_000)?; - - // More updates on parent - storage.update("main", 1_000)?; - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -// Like 2, but more updates on main -fn scenario_3() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - // Branch - storage.branch("main", "child")?; - storage.update("child", 1_000)?; - - // More updates on parent - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -// Diverged branches -fn scenario_4() -> anyhow::Result<(Vec, SegmentSize)> { - // Create main branch - let mut storage = Storage::new("main"); - - // Bulk load 5 GB of data to it - storage.insert("main", 5_000)?; - - // Stream of updates - for _ in 0..5 { - storage.update("main", 1_000)?; - } - - // Branch - storage.branch("main", "child")?; - storage.update("child", 1_000)?; - - // More updates on parent - for _ in 0..8 { - storage.update("main", 1_000)?; - } - - let size = storage.calculate(1000)?; - - Ok((storage.into_segments(), size)) -} - -fn scenario_5() -> anyhow::Result<(Vec, SegmentSize)> { - let mut storage = Storage::new("a"); - storage.insert("a", 5000)?; - storage.branch("a", "b")?; - storage.update("b", 4000)?; - storage.update("a", 2000)?; - storage.branch("a", "c")?; - storage.insert("c", 4000)?; - storage.insert("a", 2000)?; - - let size = storage.calculate(5000)?; - - Ok((storage.into_segments(), size)) -} - -fn scenario_6() -> anyhow::Result<(Vec, SegmentSize)> { - use std::borrow::Cow; - - const NO_OP: Cow<'static, str> = Cow::Borrowed(""); - - let branches = [ - Some(0x7ff1edab8182025f15ae33482edb590a_u128), - Some(0xb1719e044db05401a05a2ed588a3ad3f), - Some(0xb68d6691c895ad0a70809470020929ef), - ]; - - // compared to other scenarios, this one uses bytes instead of kB - - let mut storage = Storage::new(None); - - storage.branch(&None, branches[0])?; // at 0 - storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064 - storage.branch(&branches[0], branches[1])?; // at 108951064 - storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472 - storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424 - storage.branch(&branches[0], branches[2])?; // at 283415424 - storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616 - storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400 - - let size = storage.calculate(100_000)?; - - Ok((storage.into_segments(), size)) -} - -fn main() { - let args: Vec = std::env::args().collect(); - - let scenario = if args.len() < 2 { "1" } else { &args[1] }; - - let (segments, size) = match scenario { - "1" => scenario_1(), - "2" => scenario_2(), - "3" => scenario_3(), - "4" => scenario_4(), - "5" => scenario_5(), - "6" => scenario_6(), - other => { - eprintln!("invalid scenario {}", other); - std::process::exit(1); - } - } - .unwrap(); - - graphviz_tree(&segments, &size); -} - -fn graphviz_recurse(segments: &[Segment], node: &SegmentSize) { - use tenant_size_model::SegmentMethod::*; - - let seg_id = node.seg_id; - let seg = segments.get(seg_id).unwrap(); - let lsn = seg.end_lsn; - let size = seg.end_size.unwrap_or(0); - let method = node.method; - - println!(" {{"); - println!(" node [width=0.1 height=0.1 shape=oval]"); - - let tenant_size = node.total_children(); - - let penwidth = if seg.needed { 6 } else { 3 }; - let x = match method { - SnapshotAfter => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" style=filled penwidth={penwidth}"), - Wal => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"), - WalNeeded => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"black\" penwidth={penwidth}"), - Skipped => - format!("label=\"lsn: {lsn}\\nsize: {size}\\ntenant_size: {tenant_size}\" color=\"gray\" penwidth={penwidth}"), - }; - - println!(" \"seg{seg_id}\" [{x}]"); - println!(" }}"); - - // Recurse. Much of the data is actually on the edge - for child in node.children.iter() { - let child_id = child.seg_id; - graphviz_recurse(segments, child); - - let edge_color = match child.method { - SnapshotAfter => "gray", - Wal => "black", - WalNeeded => "black", - Skipped => "gray", - }; - - println!(" {{"); - println!(" edge [] "); - print!(" \"seg{seg_id}\" -> \"seg{child_id}\" ["); - print!("color={edge_color}"); - if child.method == WalNeeded { - print!(" penwidth=6"); - } - if child.method == Wal { - print!(" penwidth=3"); - } - - let next = segments.get(child_id).unwrap(); - - if next.op.is_empty() { - print!( - " label=\"{} / {}\"", - next.end_lsn - seg.end_lsn, - (next.end_size.unwrap_or(0) as i128 - seg.end_size.unwrap_or(0) as i128) - ); - } else { - print!(" label=\"{}: {}\"", next.op, next.end_lsn - seg.end_lsn); - } - println!("]"); - println!(" }}"); - } -} - -fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) { - println!("digraph G {{"); - println!(" fontname=\"Helvetica,Arial,sans-serif\""); - println!(" node [fontname=\"Helvetica,Arial,sans-serif\"]"); - println!(" edge [fontname=\"Helvetica,Arial,sans-serif\"]"); - println!(" graph [center=1 rankdir=LR]"); - println!(" edge [dir=none]"); - - graphviz_recurse(segments, tree); - - println!("}}"); -} - -#[test] -fn scenarios_return_same_size() { - type ScenarioFn = fn() -> anyhow::Result<(Vec, SegmentSize)>; - let truths: &[(u32, ScenarioFn, _)] = &[ - (line!(), scenario_1, 8000), - (line!(), scenario_2, 9000), - (line!(), scenario_3, 13000), - (line!(), scenario_4, 16000), - (line!(), scenario_5, 17000), - (line!(), scenario_6, 333_792_000), - ]; - - for (line, scenario, expected) in truths { - let (_, size) = scenario().unwrap(); - assert_eq!(*expected, size.total_children(), "scenario on line {line}"); - } -} diff --git a/libs/tenant_size_model/src/svg.rs b/libs/tenant_size_model/src/svg.rs new file mode 100644 index 0000000000..f26d3aa79d --- /dev/null +++ b/libs/tenant_size_model/src/svg.rs @@ -0,0 +1,193 @@ +use crate::{SegmentMethod, SegmentSizeResult, SizeResult, StorageModel}; +use std::fmt::Write; + +const SVG_WIDTH: f32 = 500.0; + +struct SvgDraw<'a> { + storage: &'a StorageModel, + branches: &'a [String], + seg_to_branch: &'a [usize], + sizes: &'a [SegmentSizeResult], + + // layout + xscale: f32, + min_lsn: u64, + seg_coordinates: Vec<(f32, f32)>, +} + +fn draw_legend(result: &mut String) -> anyhow::Result<()> { + writeln!( + result, + "" + )?; + writeln!(result, "logical snapshot")?; + writeln!( + result, + "" + )?; + writeln!( + result, + "WAL within retention period" + )?; + writeln!( + result, + "" + )?; + writeln!( + result, + "WAL retained to avoid copy" + )?; + writeln!( + result, + "" + )?; + writeln!(result, "WAL not retained")?; + Ok(()) +} + +pub fn draw_svg( + storage: &StorageModel, + branches: &[String], + seg_to_branch: &[usize], + sizes: &SizeResult, +) -> anyhow::Result { + let mut draw = SvgDraw { + storage, + branches, + seg_to_branch, + sizes: &sizes.segments, + + xscale: 0.0, + min_lsn: 0, + seg_coordinates: Vec::new(), + }; + + let mut result = String::new(); + + writeln!(result, "")?; + + draw.calculate_svg_layout(); + + // Draw the tree + for (seg_id, _seg) in storage.segments.iter().enumerate() { + draw.draw_seg_phase1(seg_id, &mut result)?; + } + + // Draw snapshots + for (seg_id, _seg) in storage.segments.iter().enumerate() { + draw.draw_seg_phase2(seg_id, &mut result)?; + } + + draw_legend(&mut result)?; + + write!(result, "")?; + + Ok(result) +} + +impl<'a> SvgDraw<'a> { + fn calculate_svg_layout(&mut self) { + // Find x scale + let segments = &self.storage.segments; + let min_lsn = segments.iter().map(|s| s.lsn).fold(u64::MAX, std::cmp::min); + let max_lsn = segments.iter().map(|s| s.lsn).fold(0, std::cmp::max); + + // Start with 1 pixel = 1 byte. Double the scale until it fits into the image + let mut xscale = 1.0; + while (max_lsn - min_lsn) as f32 / xscale > SVG_WIDTH { + xscale *= 2.0; + } + + // Layout the timelines on Y dimension. + // TODO + let mut y = 100.0; + let mut branch_y_coordinates = Vec::new(); + for _branch in self.branches { + branch_y_coordinates.push(y); + y += 40.0; + } + + // Calculate coordinates for each point + let seg_coordinates = std::iter::zip(segments, self.seg_to_branch) + .map(|(seg, branch_id)| { + let x = (seg.lsn - min_lsn) as f32 / xscale; + let y = branch_y_coordinates[*branch_id]; + (x, y) + }) + .collect(); + + self.xscale = xscale; + self.min_lsn = min_lsn; + self.seg_coordinates = seg_coordinates; + } + + /// Draws lines between points + fn draw_seg_phase1(&self, seg_id: usize, result: &mut String) -> anyhow::Result<()> { + let seg = &self.storage.segments[seg_id]; + + let wal_bytes = if let Some(parent_id) = seg.parent { + seg.lsn - self.storage.segments[parent_id].lsn + } else { + 0 + }; + + let style = match self.sizes[seg_id].method { + SegmentMethod::SnapshotHere => "stroke-width=\"1\" stroke=\"gray\"", + SegmentMethod::Wal if seg.needed && wal_bytes > 0 => { + "stroke-width=\"6\" stroke=\"black\"" + } + SegmentMethod::Wal => "stroke-width=\"3\" stroke=\"black\"", + SegmentMethod::Skipped => "stroke-width=\"1\" stroke=\"gray\"", + }; + if let Some(parent_id) = seg.parent { + let (x1, y1) = self.seg_coordinates[parent_id]; + let (x2, y2) = self.seg_coordinates[seg_id]; + + writeln!( + result, + "", + )?; + writeln!( + result, + " {wal_bytes} bytes of WAL (seg {seg_id})" + )?; + writeln!(result, "")?; + } else { + // draw a little dash to mark the starting point of this branch + let (x, y) = self.seg_coordinates[seg_id]; + let (x1, y1) = (x, y - 5.0); + let (x2, y2) = (x, y + 5.0); + + writeln!( + result, + "", + )?; + writeln!(result, " (seg {seg_id})")?; + writeln!(result, "")?; + } + + Ok(()) + } + + /// Draw circles where snapshots are taken + fn draw_seg_phase2(&self, seg_id: usize, result: &mut String) -> anyhow::Result<()> { + let seg = &self.storage.segments[seg_id]; + + // draw a snapshot point if it's needed + let (coord_x, coord_y) = self.seg_coordinates[seg_id]; + if self.sizes[seg_id].method == SegmentMethod::SnapshotHere { + writeln!( + result, + "", + )?; + writeln!( + result, + " logical size {}", + seg.size.unwrap() + )?; + write!(result, "")?; + } + + Ok(()) + } +} diff --git a/libs/tenant_size_model/tests/tests.rs b/libs/tenant_size_model/tests/tests.rs new file mode 100644 index 0000000000..7660d41c56 --- /dev/null +++ b/libs/tenant_size_model/tests/tests.rs @@ -0,0 +1,313 @@ +//! Tenant size model tests. + +use tenant_size_model::{Segment, SizeResult, StorageModel}; + +use std::collections::HashMap; + +struct ScenarioBuilder { + segments: Vec, + + /// Mapping from the branch name to the index of a segment describing its latest state. + branches: HashMap, +} + +impl ScenarioBuilder { + /// Creates a new storage with the given default branch name. + pub fn new(initial_branch: &str) -> ScenarioBuilder { + let init_segment = Segment { + parent: None, + lsn: 0, + size: Some(0), + needed: false, // determined later + }; + + ScenarioBuilder { + segments: vec![init_segment], + branches: HashMap::from([(initial_branch.into(), 0)]), + } + } + + /// Advances the branch with the named operation, by the relative LSN and logical size bytes. + pub fn modify_branch(&mut self, branch: &str, lsn_bytes: u64, size_bytes: i64) { + let lastseg_id = *self.branches.get(branch).unwrap(); + let newseg_id = self.segments.len(); + let lastseg = &mut self.segments[lastseg_id]; + + let newseg = Segment { + parent: Some(lastseg_id), + lsn: lastseg.lsn + lsn_bytes, + size: Some((lastseg.size.unwrap() as i64 + size_bytes) as u64), + needed: false, + }; + + self.segments.push(newseg); + *self.branches.get_mut(branch).expect("read already") = newseg_id; + } + + pub fn insert(&mut self, branch: &str, bytes: u64) { + self.modify_branch(branch, bytes, bytes as i64); + } + + pub fn update(&mut self, branch: &str, bytes: u64) { + self.modify_branch(branch, bytes, 0i64); + } + + pub fn _delete(&mut self, branch: &str, bytes: u64) { + self.modify_branch(branch, bytes, -(bytes as i64)); + } + + /// Panics if the parent branch cannot be found. + pub fn branch(&mut self, parent: &str, name: &str) { + // Find the right segment + let branchseg_id = *self + .branches + .get(parent) + .expect("should had found the parent by key"); + let _branchseg = &mut self.segments[branchseg_id]; + + // Create branch name for it + self.branches.insert(name.to_string(), branchseg_id); + } + + pub fn calculate(&mut self, retention_period: u64) -> (StorageModel, SizeResult) { + // Phase 1: Mark all the segments that need to be retained + for (_branch, &last_seg_id) in self.branches.iter() { + let last_seg = &self.segments[last_seg_id]; + let cutoff_lsn = last_seg.lsn.saturating_sub(retention_period); + let mut seg_id = last_seg_id; + loop { + let seg = &mut self.segments[seg_id]; + if seg.lsn <= cutoff_lsn { + break; + } + seg.needed = true; + if let Some(prev_seg_id) = seg.parent { + seg_id = prev_seg_id; + } else { + break; + } + } + } + + // Perform the calculation + let storage_model = StorageModel { + segments: self.segments.clone(), + }; + let size_result = storage_model.calculate(); + (storage_model, size_result) + } +} + +// Main branch only. Some updates on it. +#[test] +fn scenario_1() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Calculate the synthetic size with retention horizon 1000 + let (_model, result) = scenario.calculate(1000); + + // The end of the branch is at LSN 10000. Need to retain + // a logical snapshot at LSN 9000, plus the WAL between 9000-10000. + // The logical snapshot has size 5000. + assert_eq!(result.total_size, 5000 + 1000); +} + +// Main branch only. Some updates on it. +#[test] +fn scenario_2() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Branch + scenario.branch("main", "child"); + scenario.update("child", 1_000); + + // More updates on parent + scenario.update("main", 1_000); + + // + // The history looks like this now: + // + // 10000 11000 + // *----*----*--------------* main + // | + // | 11000 + // +-------------- child + // + // + // With retention horizon 1000, we need to retain logical snapshot + // at the branch point, size 5000, and the WAL from 10000-11000 on + // both branches. + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 5000 + 1000 + 1000); +} + +// Like 2, but more updates on main +#[test] +fn scenario_3() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Branch + scenario.branch("main", "child"); + scenario.update("child", 1_000); + + // More updates on parent + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // + // The history looks like this now: + // + // 10000 15000 + // *----*----*------------------------------------* main + // | + // | 11000 + // +-------------- child + // + // + // With retention horizon 1000, it's still cheapest to retain + // - snapshot at branch point (size 5000) + // - WAL on child between 10000-11000 + // - WAL on main between 10000-15000 + // + // This is in total 5000 + 1000 + 5000 + // + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 5000 + 1000 + 5000); +} + +// Diverged branches +#[test] +fn scenario_4() { + // Create main branch + let mut scenario = ScenarioBuilder::new("main"); + + // Bulk load 5 GB of data to it + scenario.insert("main", 5_000); + + // Stream of updates + for _ in 0..5 { + scenario.update("main", 1_000); + } + + // Branch + scenario.branch("main", "child"); + scenario.update("child", 1_000); + + // More updates on parent + for _ in 0..8 { + scenario.update("main", 1_000); + } + + // + // The history looks like this now: + // + // 10000 18000 + // *----*----*------------------------------------* main + // | + // | 11000 + // +-------------- child + // + // + // With retention horizon 1000, it's now cheapest to retain + // separate snapshots on both branches: + // - snapshot on main branch at LSN 17000 (size 5000) + // - WAL on main between 17000-18000 + // - snapshot on child branch at LSN 10000 (size 5000) + // - WAL on child between 10000-11000 + // + // This is in total 5000 + 1000 + 5000 + 1000 = 12000 + // + // (If we used the the method from the previous scenario, and + // kept only snapshot at the branch point, we'd need to keep + // all the WAL between 10000-18000 on the main branch, so + // the total size would be 5000 + 1000 + 8000 = 14000. The + // calculation always picks the cheapest alternative) + + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 5000 + 1000 + 5000 + 1000); +} + +#[test] +fn scenario_5() { + let mut scenario = ScenarioBuilder::new("a"); + scenario.insert("a", 5000); + scenario.branch("a", "b"); + scenario.update("b", 4000); + scenario.update("a", 2000); + scenario.branch("a", "c"); + scenario.insert("c", 4000); + scenario.insert("a", 2000); + + let (_model, result) = scenario.calculate(1000); + + assert_eq!(result.total_size, 17000); +} + +#[test] +fn scenario_6() { + let branches = [ + "7ff1edab8182025f15ae33482edb590a", + "b1719e044db05401a05a2ed588a3ad3f", + "0xb68d6691c895ad0a70809470020929ef", + ]; + + // compared to other scenarios, this one uses bytes instead of kB + + let mut scenario = ScenarioBuilder::new(""); + + scenario.branch("", branches[0]); // at 0 + scenario.modify_branch(branches[0], 108951064, 43696128); // at 108951064 + scenario.branch(branches[0], branches[1]); // at 108951064 + scenario.modify_branch(branches[1], 15560408, -1851392); // at 124511472 + scenario.modify_branch(branches[0], 174464360, -1531904); // at 283415424 + scenario.branch(branches[0], branches[2]); // at 283415424 + scenario.modify_branch(branches[2], 15906192, 8192); // at 299321616 + scenario.modify_branch(branches[0], 18909976, 32768); // at 302325400 + + let (model, result) = scenario.calculate(100_000); + + // FIXME: We previously calculated 333_792_000. But with this PR, we get + // a much lower number. At a quick look at the model output and the + // calculations here, the new result seems correct to me. + eprintln!( + " MODEL: {}", + serde_json::to_string(&model.segments).unwrap() + ); + eprintln!( + "RESULT: {}", + serde_json::to_string(&result.segments).unwrap() + ); + + assert_eq!(result.total_size, 136_236_928); +} diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index fc271fe83b..e68ceb2dc6 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -437,6 +437,13 @@ paths: type: boolean description: | When true, skip calculation and only provide the model inputs (for debugging). Defaults to false. + - name: retention_period + in: query + required: false + schema: + type: integer + description: | + Override the default retention period (in bytes) used for size calculation. get: description: | Calculate tenant's size, which is a mixture of WAL (bytes) and logical_size (bytes). diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 6a9232e097..7cd7e81fe1 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -7,6 +7,7 @@ use hyper::{Body, Request, Response, Uri}; use metrics::launch_timestamp::LaunchTimestamp; use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest; use remote_storage::GenericRemoteStorage; +use tenant_size_model::{SizeResult, StorageModel}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::http::request::{get_request_param, must_get_query_param, parse_query_param}; @@ -20,6 +21,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp; use crate::task_mgr::TaskKind; use crate::tenant::config::TenantConfOpt; use crate::tenant::mgr::TenantMapInsertError; +use crate::tenant::size::ModelInputs; use crate::tenant::storage_layer::LayerAccessStatsReset; use crate::tenant::{PageReconstructError, Timeline}; use crate::{config::PageServerConf, tenant::mgr}; @@ -479,11 +481,19 @@ async fn tenant_status(request: Request) -> Result, ApiErro /// to debug any of the calculations. Requires `tenant_id` request parameter, supports /// `inputs_only=true|false` (default false) which supports debugging failure to calculate model /// values. +/// +/// 'retention_period' query parameter overrides the cutoff that is used to calculate the size +/// (only if it is shorter than the real cutoff). +/// +/// Note: we don't update the cached size and prometheus metric here. +/// The retention period might be different, and it's nice to have a method to just calculate it +/// without modifying anything anyway. async fn tenant_size_handler(request: Request) -> Result, ApiError> { let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; check_permission(&request, Some(tenant_id))?; - let inputs_only: Option = parse_query_param(&request, "inputs_only")?; + let retention_period: Option = parse_query_param(&request, "retention_period")?; + let headers = request.headers(); let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let tenant = mgr::get_tenant(tenant_id, true) @@ -492,24 +502,29 @@ async fn tenant_size_handler(request: Request) -> Result, A // this can be long operation let inputs = tenant - .gather_size_inputs(&ctx) + .gather_size_inputs(retention_period, &ctx) .await .map_err(ApiError::InternalServerError)?; - let size = if !inputs_only.unwrap_or(false) { - Some( - tenant - .calc_and_update_cached_synthetic_size(&inputs) - .map_err(ApiError::InternalServerError)?, - ) - } else { - None - }; + let mut sizes = None; + if !inputs_only.unwrap_or(false) { + let storage_model = inputs + .calculate_model() + .map_err(ApiError::InternalServerError)?; + let size = storage_model.calculate(); - /// Private response type with the additional "unstable" `inputs` field. - /// - /// The type is described with `id` and `size` in the openapi_spec file, but the `inputs` is - /// intentionally left out. The type resides in the pageserver not to expose `ModelInputs`. + // If request header expects html, return html + if headers["Accept"] == "text/html" { + return synthetic_size_html_response(inputs, storage_model, size); + } + sizes = Some(size); + } else if headers["Accept"] == "text/html" { + return Err(ApiError::BadRequest(anyhow!( + "inputs_only parameter is incompatible with html output request" + ))); + } + + /// The type resides in the pageserver not to expose `ModelInputs`. #[serde_with::serde_as] #[derive(serde::Serialize)] struct TenantHistorySize { @@ -519,6 +534,9 @@ async fn tenant_size_handler(request: Request) -> Result, A /// /// Will be none if `?inputs_only=true` was given. size: Option, + /// Size of each segment used in the model. + /// Will be null if `?inputs_only=true` was given. + segment_sizes: Option>, inputs: crate::tenant::size::ModelInputs, } @@ -526,7 +544,8 @@ async fn tenant_size_handler(request: Request) -> Result, A StatusCode::OK, TenantHistorySize { id: tenant_id, - size, + size: sizes.as_ref().map(|x| x.total_size), + segment_sizes: sizes.map(|x| x.segments), inputs, }, ) @@ -591,6 +610,62 @@ async fn evict_timeline_layer_handler(request: Request) -> Result Result, ApiError> { + let mut timeline_ids: Vec = Vec::new(); + let mut timeline_map: HashMap = HashMap::new(); + for (index, ti) in inputs.timeline_inputs.iter().enumerate() { + timeline_map.insert(ti.timeline_id, index); + timeline_ids.push(ti.timeline_id.to_string()); + } + let seg_to_branch: Vec = inputs + .segments + .iter() + .map(|seg| *timeline_map.get(&seg.timeline_id).unwrap()) + .collect(); + + let svg = + tenant_size_model::svg::draw_svg(&storage_model, &timeline_ids, &seg_to_branch, &sizes) + .map_err(ApiError::InternalServerError)?; + + let mut response = String::new(); + + use std::fmt::Write; + write!(response, "\n\n").unwrap(); + write!(response, "
\n{svg}\n
").unwrap(); + writeln!(response, "Project size: {}", sizes.total_size).unwrap(); + writeln!(response, "
").unwrap();
+    writeln!(
+        response,
+        "{}",
+        serde_json::to_string_pretty(&inputs).unwrap()
+    )
+    .unwrap();
+    writeln!(
+        response,
+        "{}",
+        serde_json::to_string_pretty(&sizes.segments).unwrap()
+    )
+    .unwrap();
+    writeln!(response, "
").unwrap(); + write!(response, "\n\n").unwrap(); + + html_response(StatusCode::OK, response) +} + +pub fn html_response(status: StatusCode, data: String) -> Result, ApiError> { + let response = Response::builder() + .status(status) + .header(hyper::header::CONTENT_TYPE, "text/html") + .body(Body::from(data.as_bytes().to_vec())) + .map_err(|e| ApiError::InternalServerError(e.into()))?; + Ok(response) +} + // Helper function to standardize the error messages we produce on bad durations // // Intended to be used with anyhow's `with_context`, e.g.: @@ -1019,7 +1094,7 @@ pub fn make_router( .get("/v1/tenant", tenant_list_handler) .post("/v1/tenant", tenant_create_handler) .get("/v1/tenant/:tenant_id", tenant_status) - .get("/v1/tenant/:tenant_id/size", tenant_size_handler) + .get("/v1/tenant/:tenant_id/synthetic_size", tenant_size_handler) .put("/v1/tenant/config", update_tenant_config_handler) .get("/v1/tenant/:tenant_id/config", get_tenant_config_handler) .get("/v1/tenant/:tenant_id/timeline", timeline_list_handler) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 23210b98d5..9e9c98ad62 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2418,6 +2418,9 @@ impl Tenant { #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] pub async fn gather_size_inputs( &self, + // `max_retention_period` overrides the cutoff that is used to calculate the size + // (only if it is shorter than the real cutoff). + max_retention_period: Option, ctx: &RequestContext, ) -> anyhow::Result { let logical_sizes_at_once = self @@ -2425,32 +2428,41 @@ impl Tenant { .concurrent_tenant_size_logical_size_queries .inner(); - // TODO: Having a single mutex block concurrent reads is unfortunate, but since the queries - // are for testing/experimenting, we tolerate this. + // TODO: Having a single mutex block concurrent reads is not great for performance. + // + // But the only case where we need to run multiple of these at once is when we + // request a size for a tenant manually via API, while another background calculation + // is in progress (which is not a common case). // // See more for on the issue #2748 condenced out of the initial PR review. let mut shared_cache = self.cached_logical_sizes.lock().await; - size::gather_inputs(self, logical_sizes_at_once, &mut shared_cache, ctx).await + size::gather_inputs( + self, + logical_sizes_at_once, + max_retention_period, + &mut shared_cache, + ctx, + ) + .await } - /// Calculate synthetic tenant size + /// Calculate synthetic tenant size and cache the result. /// This is periodically called by background worker. /// result is cached in tenant struct #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] pub async fn calculate_synthetic_size(&self, ctx: &RequestContext) -> anyhow::Result { - let inputs = self.gather_size_inputs(ctx).await?; + let inputs = self.gather_size_inputs(None, ctx).await?; - self.calc_and_update_cached_synthetic_size(&inputs) - } - - /// Calculate synthetic size , cache it and set metric value - pub fn calc_and_update_cached_synthetic_size( - &self, - inputs: &size::ModelInputs, - ) -> anyhow::Result { let size = inputs.calculate()?; + self.set_cached_synthetic_size(size); + + Ok(size) + } + + /// Cache given synthetic size and update the metric value + pub fn set_cached_synthetic_size(&self, size: u64) { self.cached_synthetic_tenant_size .store(size, Ordering::Relaxed); @@ -2458,8 +2470,6 @@ impl Tenant { .get_metric_with_label_values(&[&self.tenant_id.to_string()]) .unwrap() .set(size); - - Ok(size) } pub fn get_cached_synthetic_size(&self) -> u64 { diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 2fed4f88b3..2c5efe283b 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -1,8 +1,9 @@ use std::cmp; +use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::Context; +use anyhow::{bail, Context}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; @@ -10,35 +11,80 @@ use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; use super::Tenant; +use crate::tenant::Timeline; use utils::id::TimelineId; use utils::lsn::Lsn; use tracing::*; +use tenant_size_model::{Segment, StorageModel}; + /// Inputs to the actual tenant sizing model /// /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to /// be a transferrable format between execution environments and developer. +/// +/// This tracks more information than the actual StorageModel that calculation +/// needs. We will convert this into a StorageModel when it's time to perform +/// the calculation. +/// #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct ModelInputs { - updates: Vec, - retention_period: u64, + pub segments: Vec, + pub timeline_inputs: Vec, +} - /// Relevant lsns per timeline. - /// - /// This field is not required for deserialization purposes, which is mostly used in tests. The - /// LSNs explain the outcome (updates) but are not needed in size calculation. - #[serde_as(as = "HashMap")] - #[serde(default)] - timeline_inputs: HashMap, +/// A [`Segment`], with some extra information for display purposes +#[serde_with::serde_as] +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct SegmentMeta { + pub segment: Segment, + #[serde_as(as = "serde_with::DisplayFromStr")] + pub timeline_id: TimelineId, + pub kind: LsnKind, +} + +impl SegmentMeta { + fn size_needed(&self) -> bool { + match self.kind { + LsnKind::BranchStart => { + // If we don't have a later GcCutoff point on this branch, and + // no ancestor, calculate size for the branch start point. + self.segment.needed && self.segment.parent.is_none() + } + LsnKind::BranchPoint => true, + LsnKind::GcCutOff => true, + LsnKind::BranchEnd => false, + } + } +} + +#[derive( + Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize, +)] +pub enum LsnKind { + /// A timeline starting here + BranchStart, + /// A child timeline branches off from here + BranchPoint, + /// GC cutoff point + GcCutOff, + /// Last record LSN + BranchEnd, } /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as /// part of [`ModelInputs`] from the HTTP api, explaining the inputs. #[serde_with::serde_as] #[derive(Debug, serde::Serialize, serde::Deserialize)] -struct TimelineInputs { +pub struct TimelineInputs { + #[serde_as(as = "serde_with::DisplayFromStr")] + pub timeline_id: TimelineId, + + #[serde_as(as = "Option")] + pub ancestor_id: Option, + #[serde_as(as = "serde_with::DisplayFromStr")] ancestor_lsn: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] @@ -49,118 +95,14 @@ struct TimelineInputs { horizon_cutoff: Lsn, #[serde_as(as = "serde_with::DisplayFromStr")] pitr_cutoff: Lsn, + + /// Cutoff point based on GC settings #[serde_as(as = "serde_with::DisplayFromStr")] next_gc_cutoff: Lsn, -} -// Adjust BranchFrom sorting so that we always process ancestor -// before descendants. This is needed to correctly calculate size of -// descendant timelines. -// -// Note that we may have multiple BranchFroms at the same LSN, so we -// need to sort them in the tree order. -// -// see updates_sort_with_branches_at_same_lsn test below -fn sort_updates_in_tree_order(updates: Vec) -> anyhow::Result> { - let mut sorted_updates = Vec::with_capacity(updates.len()); - let mut known_timelineids = HashSet::new(); - let mut i = 0; - while i < updates.len() { - let curr_upd = &updates[i]; - - if let Command::BranchFrom(parent_id) = curr_upd.command { - let parent_id = match parent_id { - Some(parent_id) if known_timelineids.contains(&parent_id) => { - // we have already processed ancestor - // process this BranchFrom Update normally - known_timelineids.insert(curr_upd.timeline_id); - sorted_updates.push(*curr_upd); - i += 1; - continue; - } - None => { - known_timelineids.insert(curr_upd.timeline_id); - sorted_updates.push(*curr_upd); - i += 1; - continue; - } - Some(parent_id) => parent_id, - }; - - let mut j = i; - - // we have not processed ancestor yet. - // there is a chance that it is at the same Lsn - if !known_timelineids.contains(&parent_id) { - let mut curr_lsn_branchfroms: HashMap> = - HashMap::new(); - - // inspect all branchpoints at the same lsn - while j < updates.len() && updates[j].lsn == curr_upd.lsn { - let lookahead_upd = &updates[j]; - j += 1; - - if let Command::BranchFrom(lookahead_parent_id) = lookahead_upd.command { - match lookahead_parent_id { - Some(lookahead_parent_id) - if !known_timelineids.contains(&lookahead_parent_id) => - { - // we have not processed ancestor yet - // store it for later - let es = - curr_lsn_branchfroms.entry(lookahead_parent_id).or_default(); - es.push((lookahead_upd.timeline_id, j)); - } - _ => { - // we have already processed ancestor - // process this BranchFrom Update normally - known_timelineids.insert(lookahead_upd.timeline_id); - sorted_updates.push(*lookahead_upd); - } - } - } - } - - // process BranchFroms in the tree order - // check that we don't have a cycle if somet entry is orphan - // (this should not happen, but better to be safe) - let mut processed_some_entry = true; - while processed_some_entry { - processed_some_entry = false; - - curr_lsn_branchfroms.retain(|parent_id, branchfroms| { - if known_timelineids.contains(parent_id) { - for (timeline_id, j) in branchfroms { - known_timelineids.insert(*timeline_id); - sorted_updates.push(updates[*j - 1]); - } - processed_some_entry = true; - false - } else { - true - } - }); - } - - if !curr_lsn_branchfroms.is_empty() { - // orphans are expected to be rare and transient between tenant reloads - // for example, an broken ancestor without the child branch being broken. - anyhow::bail!( - "orphan branch(es) detected in BranchFroms: {curr_lsn_branchfroms:?}" - ); - } - } - - assert!(j > i); - i = j; - } else { - // not a BranchFrom, keep the same order - sorted_updates.push(*curr_upd); - i += 1; - } - } - - Ok(sorted_updates) + /// Cutoff point calculated from the user-supplied 'max_retention_period' + #[serde_as(as = "Option")] + retention_param_cutoff: Option, } /// Gathers the inputs for the tenant sizing model. @@ -181,257 +123,257 @@ fn sort_updates_in_tree_order(updates: Vec) -> anyhow::Result, + max_retention_period: Option, logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, ctx: &RequestContext, ) -> anyhow::Result { - // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to - // our advantage with `?` error handling. - let mut joinset = tokio::task::JoinSet::new(); - // refresh is needed to update gc related pitr_cutoff and horizon_cutoff tenant .refresh_gc_info(ctx) .await .context("Failed to refresh gc_info before gathering inputs")?; + // Collect information about all the timelines let timelines = tenant.list_timelines(); if timelines.is_empty() { // perhaps the tenant has just been created, and as such doesn't have any data yet return Ok(ModelInputs { - updates: vec![], - retention_period: 0, - timeline_inputs: HashMap::default(), + segments: vec![], + timeline_inputs: Vec::new(), }); } + // Build a map of branch points. + let mut branchpoints: HashMap> = HashMap::new(); + for timeline in timelines.iter() { + if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() { + branchpoints + .entry(ancestor_id) + .or_default() + .insert(timeline.get_ancestor_lsn()); + } + } + + // These become the final result. + let mut timeline_inputs = Vec::with_capacity(timelines.len()); + let mut segments: Vec = Vec::new(); + + // + // Build Segments representing each timeline. As we do that, also remember + // the branchpoints and branch startpoints in 'branchpoint_segments' and + // 'branchstart_segments' + // + + // BranchPoint segments of each timeline + // (timeline, branchpoint LSN) -> segment_id + let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new(); + + // timeline, Branchpoint seg id, (ancestor, ancestor LSN) + type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>); + let mut branchstart_segments: Vec = Vec::new(); + + for timeline in timelines.iter() { + let timeline_id = timeline.timeline_id; + let last_record_lsn = timeline.get_last_record_lsn(); + let ancestor_lsn = timeline.get_ancestor_lsn(); + + // there's a race between the update (holding tenant.gc_lock) and this read but it + // might not be an issue, because it's not for Timeline::gc + let gc_info = timeline.gc_info.read().unwrap(); + + // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a + // new gc run, which we have no control over. however differently from `Timeline::gc` + // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not + // actually removing files. + let mut next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff); + + // If the caller provided a shorter retention period, use that instead of the GC cutoff. + let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period { + let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period)); + if next_gc_cutoff < param_cutoff { + next_gc_cutoff = param_cutoff; + } + Some(param_cutoff) + } else { + None + }; + + // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we + // want to query any logical size before initdb_lsn. + let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn); + + // Build "interesting LSNs" on this timeline + let mut lsns: Vec<(Lsn, LsnKind)> = gc_info + .retain_lsns + .iter() + .filter(|&&lsn| lsn > ancestor_lsn) + .copied() + // this assumes there are no other retain_lsns than the branchpoints + .map(|lsn| (lsn, LsnKind::BranchPoint)) + .collect::>(); + + // Add branch points we collected earlier, just in case there were any that were + // not present in retain_lsns. We will remove any duplicates below later. + if let Some(this_branchpoints) = branchpoints.get(&timeline_id) { + lsns.extend( + this_branchpoints + .iter() + .map(|lsn| (*lsn, LsnKind::BranchPoint)), + ) + } + + // Add a point for the GC cutoff + let branch_start_needed = next_gc_cutoff <= branch_start_lsn; + if !branch_start_needed { + lsns.push((next_gc_cutoff, LsnKind::GcCutOff)); + } + + lsns.sort_unstable(); + lsns.dedup(); + + // + // Create Segments for the interesting points. + // + + // Timeline start point + let ancestor = timeline + .get_ancestor_timeline_id() + .map(|ancestor_id| (ancestor_id, ancestor_lsn)); + branchstart_segments.push((timeline_id, segments.len(), ancestor)); + segments.push(SegmentMeta { + segment: Segment { + parent: None, // filled in later + lsn: branch_start_lsn.0, + size: None, // filled in later + needed: branch_start_needed, + }, + timeline_id: timeline.timeline_id, + kind: LsnKind::BranchStart, + }); + + // GC cutoff point, and any branch points, i.e. points where + // other timelines branch off from this timeline. + let mut parent = segments.len() - 1; + for (lsn, kind) in lsns { + if kind == LsnKind::BranchPoint { + branchpoint_segments.insert((timeline_id, lsn), segments.len()); + } + segments.push(SegmentMeta { + segment: Segment { + parent: Some(parent), + lsn: lsn.0, + size: None, + needed: lsn > next_gc_cutoff, + }, + timeline_id: timeline.timeline_id, + kind, + }); + parent += 1; + } + + // Current end of the timeline + segments.push(SegmentMeta { + segment: Segment { + parent: Some(parent), + lsn: last_record_lsn.0, + size: None, // Filled in later, if necessary + needed: true, + }, + timeline_id: timeline.timeline_id, + kind: LsnKind::BranchEnd, + }); + + timeline_inputs.push(TimelineInputs { + timeline_id: timeline.timeline_id, + ancestor_id: timeline.get_ancestor_timeline_id(), + ancestor_lsn, + last_record: last_record_lsn, + // this is not used above, because it might not have updated recently enough + latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), + horizon_cutoff: gc_info.horizon_cutoff, + pitr_cutoff: gc_info.pitr_cutoff, + next_gc_cutoff, + retention_param_cutoff, + }); + } + + // We now have all segments from the timelines in 'segments'. The timelines + // haven't been linked to each other yet, though. Do that. + for (_timeline_id, seg_id, ancestor) in branchstart_segments { + // Look up the branch point + if let Some(ancestor) = ancestor { + let parent_id = *branchpoint_segments.get(&ancestor).unwrap(); + segments[seg_id].segment.parent = Some(parent_id); + } + } + + // We left the 'size' field empty in all of the Segments so far. + // Now find logical sizes for all of the points that might need or benefit from them. + fill_logical_sizes(&timelines, &mut segments, limit, logical_size_cache, ctx).await?; + + Ok(ModelInputs { + segments, + timeline_inputs, + }) +} + +/// Augment 'segments' with logical sizes +/// +/// this will probably conflict with on-demand downloaded layers, or at least force them all +/// to be downloaded +/// +async fn fill_logical_sizes( + timelines: &[Arc], + segments: &mut [SegmentMeta], + limit: &Arc, + logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, + ctx: &RequestContext, +) -> anyhow::Result<()> { + let timeline_hash: HashMap> = HashMap::from_iter( + timelines + .iter() + .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))), + ); + // record the used/inserted cache keys here, to remove extras not to start leaking // after initial run the cache should be quite stable, but live timelines will eventually // require new lsns to be inspected. - let mut needed_cache = HashSet::<(TimelineId, Lsn)>::new(); + let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option>::new(); - let mut updates = Vec::new(); + // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to + // our advantage with `?` error handling. + let mut joinset = tokio::task::JoinSet::new(); - // record the per timeline values useful to debug the model inputs, also used to track - // ancestor_lsn without keeping a hold of Timeline - let mut timeline_inputs = HashMap::with_capacity(timelines.len()); - - // used to determine the `retention_period` for the size model - let mut max_cutoff_distance = None; - - // mapping from (TimelineId, Lsn) => if this branch point has been handled already via - // GcInfo::retain_lsns or if it needs to have its logical_size calculated. - let mut referenced_branch_froms = HashMap::<(TimelineId, Lsn), bool>::new(); - - for timeline in timelines { - if !timeline.is_active() { - anyhow::bail!( - "timeline {} is not active, cannot calculate tenant_size now", - timeline.timeline_id - ); + // For each point that would benefit from having a logical size available, + // spawn a Task to fetch it, unless we have it cached already. + for seg in segments.iter() { + if !seg.size_needed() { + continue; } - let last_record_lsn = timeline.get_last_record_lsn(); + let timeline_id = seg.timeline_id; + let lsn = Lsn(seg.segment.lsn); - let (interesting_lsns, horizon_cutoff, pitr_cutoff, next_gc_cutoff) = { - // there's a race between the update (holding tenant.gc_lock) and this read but it - // might not be an issue, because it's not for Timeline::gc - let gc_info = timeline.gc_info.read().unwrap(); - - // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a - // new gc run, which we have no control over. however differently from `Timeline::gc` - // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not - // actually removing files. - let next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff); - - // the minimum where we should find the next_gc_cutoff for our calculations. - // - // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we - // want to query any logical size before initdb_lsn. - let cutoff_minimum = cmp::max(timeline.get_ancestor_lsn(), timeline.initdb_lsn); - - let maybe_cutoff = if next_gc_cutoff > cutoff_minimum { - Some((next_gc_cutoff, LsnKind::GcCutOff)) - } else { - None - }; - - // this assumes there are no other lsns than the branchpoints - let lsns = gc_info - .retain_lsns - .iter() - .inspect(|&&lsn| { - trace!( - timeline_id=%timeline.timeline_id, - "retained lsn: {lsn:?}, is_before_ancestor_lsn={}", - lsn < timeline.get_ancestor_lsn() - ) - }) - .filter(|&&lsn| lsn > timeline.get_ancestor_lsn()) - .copied() - .map(|lsn| (lsn, LsnKind::BranchPoint)) - .chain(maybe_cutoff) - .collect::>(); - - ( - lsns, - gc_info.horizon_cutoff, - gc_info.pitr_cutoff, - next_gc_cutoff, - ) - }; - - // update this to have a retention_period later for the tenant_size_model - // tenant_size_model compares this to the last segments start_lsn - if let Some(cutoff_distance) = last_record_lsn.checked_sub(next_gc_cutoff) { - match max_cutoff_distance.as_mut() { - Some(max) => { - *max = std::cmp::max(*max, cutoff_distance); - } - _ => { - max_cutoff_distance = Some(cutoff_distance); - } - } - } - - // all timelines branch from something, because it might be impossible to pinpoint - // which is the tenant_size_model's "default" branch. - - let ancestor_lsn = timeline.get_ancestor_lsn(); - - updates.push(Update { - lsn: ancestor_lsn, - command: Command::BranchFrom(timeline.get_ancestor_timeline_id()), - timeline_id: timeline.timeline_id, - }); - - if let Some(parent_timeline_id) = timeline.get_ancestor_timeline_id() { - // refresh_gc_info will update branchpoints and pitr_cutoff but only do it for branches - // which are over gc_horizon. for example, a "main" branch which never received any - // updates apart from initdb not have branch points recorded. - referenced_branch_froms - .entry((parent_timeline_id, timeline.get_ancestor_lsn())) - .or_default(); - } - - for (lsn, _kind) in &interesting_lsns { - // mark this visited so don't need to re-process this parent - *referenced_branch_froms - .entry((timeline.timeline_id, *lsn)) - .or_default() = true; - - if let Some(size) = logical_size_cache.get(&(timeline.timeline_id, *lsn)) { - updates.push(Update { - lsn: *lsn, - timeline_id: timeline.timeline_id, - command: Command::Update(*size), - }); - - needed_cache.insert((timeline.timeline_id, *lsn)); - } else { - let timeline = Arc::clone(&timeline); + if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) { + let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned(); + if cached_size.is_none() { + let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap()); let parallel_size_calcs = Arc::clone(limit); let ctx = ctx.attached_child(); joinset.spawn(calculate_logical_size( parallel_size_calcs, timeline, - *lsn, + lsn, ctx, )); } - } - - timeline_inputs.insert( - timeline.timeline_id, - TimelineInputs { - ancestor_lsn, - last_record: last_record_lsn, - // this is not used above, because it might not have updated recently enough - latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(), - horizon_cutoff, - pitr_cutoff, - next_gc_cutoff, - }, - ); - } - - // iterate over discovered branch points and make sure we are getting logical sizes at those - // points. - for ((timeline_id, lsn), handled) in referenced_branch_froms.iter() { - if *handled { - continue; - } - - let timeline_id = *timeline_id; - let lsn = *lsn; - - match timeline_inputs.get(&timeline_id) { - Some(inputs) if inputs.ancestor_lsn == lsn => { - // we don't need an update at this branch point which is also point where - // timeline_id branch was branched from. - continue; - } - Some(_) => {} - None => { - // we should have this because we have iterated through all of the timelines - anyhow::bail!("missing timeline_input for {timeline_id}") - } - } - - if let Some(size) = logical_size_cache.get(&(timeline_id, lsn)) { - updates.push(Update { - lsn, - timeline_id, - command: Command::Update(*size), - }); - - needed_cache.insert((timeline_id, lsn)); - } else { - let timeline = tenant - .get_timeline(timeline_id, false) - .context("find referenced ancestor timeline")?; - let parallel_size_calcs = Arc::clone(limit); - joinset.spawn(calculate_logical_size( - parallel_size_calcs, - timeline.clone(), - lsn, - ctx.attached_child(), - )); - - if let Some(parent_id) = timeline.get_ancestor_timeline_id() { - // we should not find new ones because we iterated tenants all timelines - anyhow::ensure!( - timeline_inputs.contains_key(&parent_id), - "discovered new timeline {parent_id} (parent of {timeline_id})" - ); - } - }; - } - - // finally add in EndOfBranch for all timelines where their last_record_lsn is not a branch - // point. this is needed by the model. - for (timeline_id, inputs) in timeline_inputs.iter() { - let lsn = inputs.last_record; - - if referenced_branch_froms.contains_key(&(*timeline_id, lsn)) { - // this means that the (timeline_id, last_record_lsn) represents a branch point - // we do not want to add EndOfBranch updates for these points because it doesn't fit - // into the current tenant_size_model. - continue; - } - - if lsn > inputs.ancestor_lsn { - // all timelines also have an end point if they have made any progress - updates.push(Update { - lsn, - command: Command::EndOfBranch, - timeline_id: *timeline_id, - }); + e.insert(cached_size); } } + // Perform the size lookups let mut have_any_error = false; - while let Some(res) = joinset.join_next().await { // each of these come with Result, JoinError> // because of spawn + spawn_blocking @@ -460,19 +402,13 @@ pub(super) async fn gather_inputs( debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated"); logical_size_cache.insert((timeline.timeline_id, lsn), size); - needed_cache.insert((timeline.timeline_id, lsn)); - - updates.push(Update { - lsn, - timeline_id: timeline.timeline_id, - command: Command::Update(size), - }); + sizes_needed.insert((timeline.timeline_id, lsn), Some(size)); } } } // prune any keys not needed anymore; we record every used key and added key. - logical_size_cache.retain(|key, _| needed_cache.contains(key)); + logical_size_cache.retain(|key, _| sizes_needed.contains_key(key)); if have_any_error { // we cannot complete this round, because we are missing data. @@ -480,105 +416,47 @@ pub(super) async fn gather_inputs( anyhow::bail!("failed to calculate some logical_sizes"); } - // the data gathered to updates is per lsn, regardless of the branch, so we can use it to - // our advantage, not requiring a sorted container or graph walk. - // - // for branch points, which come as multiple updates at the same LSN, the Command::Update - // is needed before a branch is made out of that branch Command::BranchFrom. this is - // handled by the variant order in `Command`. - // - updates.sort_unstable(); - - // And another sort to handle Command::BranchFrom ordering - // in case when there are multiple branches at the same LSN. - let sorted_updates = sort_updates_in_tree_order(updates)?; - - let retention_period = match max_cutoff_distance { - Some(max) => max.0, - None => { - anyhow::bail!("the first branch should have a gc_cutoff after it's branch point at 0") + // Insert the looked up sizes to the Segments + for seg in segments.iter_mut() { + if !seg.size_needed() { + continue; } - }; - Ok(ModelInputs { - updates: sorted_updates, - retention_period, - timeline_inputs, - }) + let timeline_id = seg.timeline_id; + let lsn = Lsn(seg.segment.lsn); + + if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) { + seg.segment.size = Some(*size); + } else { + bail!("could not find size at {} in timeline {}", lsn, timeline_id); + } + } + Ok(()) } impl ModelInputs { + pub fn calculate_model(&self) -> anyhow::Result { + // Convert SegmentMetas into plain Segments + let storage = StorageModel { + segments: self + .segments + .iter() + .map(|seg| seg.segment.clone()) + .collect(), + }; + + Ok(storage) + } + + // calculate total project size pub fn calculate(&self) -> anyhow::Result { - // Option is used for "naming" the branches because it is assumed to be - // impossible to always determine the a one main branch. - let mut storage = tenant_size_model::Storage::>::new(None); + let storage = self.calculate_model()?; + let sizes = storage.calculate(); - for update in &self.updates { - let Update { - lsn, - command: op, - timeline_id, - } = update; - - let Lsn(now) = *lsn; - match op { - Command::Update(sz) => { - storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?; - } - Command::EndOfBranch => { - storage.insert_point(&Some(*timeline_id), "".into(), now, None)?; - } - Command::BranchFrom(parent) => { - // This branch command may fail if it cannot find a parent to branch from. - storage.branch(parent, Some(*timeline_id))?; - } - } - } - - Ok(storage.calculate(self.retention_period)?.total_children()) + Ok(sizes.total_size) } } -/// A point of interest in the tree of branches -#[serde_with::serde_as] -#[derive( - Debug, PartialEq, PartialOrd, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize, -)] -struct Update { - #[serde_as(as = "serde_with::DisplayFromStr")] - lsn: utils::lsn::Lsn, - command: Command, - #[serde_as(as = "serde_with::DisplayFromStr")] - timeline_id: TimelineId, -} - -#[serde_with::serde_as] -#[derive(PartialOrd, PartialEq, Eq, Ord, Clone, Copy, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "snake_case")] -enum Command { - Update(u64), - BranchFrom(#[serde_as(as = "Option")] Option), - EndOfBranch, -} - -impl std::fmt::Debug for Command { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - // custom one-line implementation makes it more enjoyable to read {:#?} avoiding 3 - // linebreaks - match self { - Self::Update(arg0) => write!(f, "Update({arg0})"), - Self::BranchFrom(arg0) => write!(f, "BranchFrom({arg0:?})"), - Self::EndOfBranch => write!(f, "EndOfBranch"), - } - } -} - -#[derive(Debug, Clone, Copy)] -enum LsnKind { - BranchPoint, - GcCutOff, -} - /// Newtype around the tuple that carries the timeline at lsn logical size calculation. struct TimelineAtLsnSizeResult( Arc, @@ -604,227 +482,230 @@ async fn calculate_logical_size( Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) } -#[test] -fn updates_sort() { - use std::str::FromStr; - use utils::id::TimelineId; - use utils::lsn::Lsn; - - let ids = [ - TimelineId::from_str("7ff1edab8182025f15ae33482edb590a").unwrap(), - TimelineId::from_str("b1719e044db05401a05a2ed588a3ad3f").unwrap(), - TimelineId::from_str("b68d6691c895ad0a70809470020929ef").unwrap(), - ]; - - // try through all permutations - let ids = [ - [&ids[0], &ids[1], &ids[2]], - [&ids[0], &ids[2], &ids[1]], - [&ids[1], &ids[0], &ids[2]], - [&ids[1], &ids[2], &ids[0]], - [&ids[2], &ids[0], &ids[1]], - [&ids[2], &ids[1], &ids[0]], - ]; - - for ids in ids { - // apply a fixture which uses a permutation of ids - let commands = [ - Update { - lsn: Lsn(0), - command: Command::BranchFrom(None), - timeline_id: *ids[0], - }, - Update { - lsn: Lsn::from_str("0/67E7618").unwrap(), - command: Command::Update(43696128), - timeline_id: *ids[0], - }, - Update { - lsn: Lsn::from_str("0/67E7618").unwrap(), - command: Command::BranchFrom(Some(*ids[0])), - timeline_id: *ids[1], - }, - Update { - lsn: Lsn::from_str("0/76BE4F0").unwrap(), - command: Command::Update(41844736), - timeline_id: *ids[1], - }, - Update { - lsn: Lsn::from_str("0/10E49380").unwrap(), - command: Command::Update(42164224), - timeline_id: *ids[0], - }, - Update { - lsn: Lsn::from_str("0/10E49380").unwrap(), - command: Command::BranchFrom(Some(*ids[0])), - timeline_id: *ids[2], - }, - Update { - lsn: Lsn::from_str("0/11D74910").unwrap(), - command: Command::Update(42172416), - timeline_id: *ids[2], - }, - Update { - lsn: Lsn::from_str("0/12051E98").unwrap(), - command: Command::Update(42196992), - timeline_id: *ids[0], - }, - ]; - - let mut sorted = commands; - - // these must sort in the same order, regardless of how the ids sort - // which is why the timeline_id is the last field - sorted.sort_unstable(); - - assert_eq!(commands, sorted, "{:#?} vs. {:#?}", commands, sorted); - } -} - #[test] fn verify_size_for_multiple_branches() { // this is generated from integration test test_tenant_size_with_multiple_branches, but this way // it has the stable lsn's // - // timelineinputs have been left out, because those explain the inputs, but don't participate - // in further size calculations. - let doc = r#"{"updates":[{"lsn":"0/0","command":{"branch_from":null},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"update":25763840},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/176FA40","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/1819818","command":{"update":26075136},"timeline_id":"10b532a550540bc15385eac4edde416a"},{"lsn":"0/18B5E40","command":{"update":26427392},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"update":26492928},"timeline_id":"cd9d9409c216e64bf580904facedb01b"},{"lsn":"0/18D3DF0","command":{"branch_from":"cd9d9409c216e64bf580904facedb01b"},"timeline_id":"230fc9d756f7363574c0d66533564dcc"},{"lsn":"0/220F438","command":{"update":25239552},"timeline_id":"230fc9d756f7363574c0d66533564dcc"}],"retention_period":131072}"#; - + // The timeline_inputs don't participate in the size calculation, and are here just to explain + // the inputs. + let doc = r#" +{ + "segments": [ + { + "segment": { + "parent": 9, + "lsn": 26033560, + "size": null, + "needed": false + }, + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 0, + "lsn": 35720400, + "size": 25206784, + "needed": false + }, + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 1, + "lsn": 35851472, + "size": null, + "needed": true + }, + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "kind": "BranchEnd" + }, + { + "segment": { + "parent": 7, + "lsn": 24566168, + "size": null, + "needed": false + }, + "timeline_id": "454626700469f0a9914949b9d018e876", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 3, + "lsn": 25261936, + "size": 26050560, + "needed": false + }, + "timeline_id": "454626700469f0a9914949b9d018e876", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 4, + "lsn": 25393008, + "size": null, + "needed": true + }, + "timeline_id": "454626700469f0a9914949b9d018e876", + "kind": "BranchEnd" + }, + { + "segment": { + "parent": null, + "lsn": 23694408, + "size": null, + "needed": false + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 6, + "lsn": 24566168, + "size": 25739264, + "needed": false + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchPoint" + }, + { + "segment": { + "parent": 7, + "lsn": 25902488, + "size": 26402816, + "needed": false + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 8, + "lsn": 26033560, + "size": 26468352, + "needed": true + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchPoint" + }, + { + "segment": { + "parent": 9, + "lsn": 26033560, + "size": null, + "needed": true + }, + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "kind": "BranchEnd" + } + ], + "timeline_inputs": [ + { + "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce", + "ancestor_lsn": "0/18D3D98", + "last_record": "0/2230CD0", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/2210CD0", + "pitr_cutoff": "0/2210CD0", + "next_gc_cutoff": "0/2210CD0", + "retention_param_cutoff": null + }, + { + "timeline_id": "454626700469f0a9914949b9d018e876", + "ancestor_lsn": "0/176D998", + "last_record": "0/1837770", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/1817770", + "pitr_cutoff": "0/1817770", + "next_gc_cutoff": "0/1817770", + "retention_param_cutoff": null + }, + { + "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f", + "ancestor_lsn": "0/0", + "last_record": "0/18D3D98", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/18B3D98", + "pitr_cutoff": "0/18B3D98", + "next_gc_cutoff": "0/18B3D98", + "retention_param_cutoff": null + } + ] +} +"#; let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); - assert_eq!(inputs.calculate().unwrap(), 36_409_872); + assert_eq!(inputs.calculate().unwrap(), 37_851_408); } #[test] -fn updates_sort_with_branches_at_same_lsn() { - use std::str::FromStr; - use Command::{BranchFrom, EndOfBranch}; - - macro_rules! lsn { - ($e:expr) => { - Lsn::from_str($e).unwrap() - }; +fn verify_size_for_one_branch() { + let doc = r#" +{ + "segments": [ + { + "segment": { + "parent": null, + "lsn": 0, + "size": null, + "needed": false + }, + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "kind": "BranchStart" + }, + { + "segment": { + "parent": 0, + "lsn": 305547335776, + "size": 220054675456, + "needed": false + }, + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "kind": "GcCutOff" + }, + { + "segment": { + "parent": 1, + "lsn": 305614444640, + "size": null, + "needed": true + }, + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "kind": "BranchEnd" } + ], + "timeline_inputs": [ + { + "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd", + "ancestor_lsn": "0/0", + "last_record": "47/280A5860", + "latest_gc_cutoff": "47/240A5860", + "horizon_cutoff": "47/240A5860", + "pitr_cutoff": "47/240A5860", + "next_gc_cutoff": "47/240A5860", + "retention_param_cutoff": "0/0" + } + ] +}"#; - let ids = [ - TimelineId::from_str("00000000000000000000000000000000").unwrap(), - TimelineId::from_str("11111111111111111111111111111111").unwrap(), - TimelineId::from_str("22222222222222222222222222222222").unwrap(), - TimelineId::from_str("33333333333333333333333333333333").unwrap(), - TimelineId::from_str("44444444444444444444444444444444").unwrap(), - ]; + let model: ModelInputs = serde_json::from_str(doc).unwrap(); - // issue https://github.com/neondatabase/neon/issues/3179 - let commands = vec![ - Update { - lsn: lsn!("0/0"), - command: BranchFrom(None), - timeline_id: ids[0], - }, - Update { - lsn: lsn!("0/169AD58"), - command: Command::Update(25387008), - timeline_id: ids[0], - }, - // next three are wrongly sorted, because - // ids[1] is branched from before ids[1] exists - // and ids[2] is branched from before ids[2] exists - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[0])), - timeline_id: ids[2], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[2])), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CA85B8"), - command: Command::Update(28925952), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: Command::Update(29024256), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[4], - }, - Update { - lsn: lsn!("0/22DCE70"), - command: Command::Update(32546816), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/230CE70"), - command: EndOfBranch, - timeline_id: ids[3], - }, - ]; + let res = model.calculate_model().unwrap().calculate(); - let expected = vec![ - Update { - lsn: lsn!("0/0"), - command: BranchFrom(None), - timeline_id: ids[0], - }, - Update { - lsn: lsn!("0/169AD58"), - command: Command::Update(25387008), - timeline_id: ids[0], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[0])), - timeline_id: ids[2], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[2])), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/169AD58"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/1CA85B8"), - command: Command::Update(28925952), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: Command::Update(29024256), - timeline_id: ids[1], - }, - Update { - lsn: lsn!("0/1CD85B8"), - command: BranchFrom(Some(ids[1])), - timeline_id: ids[4], - }, - Update { - lsn: lsn!("0/22DCE70"), - command: Command::Update(32546816), - timeline_id: ids[3], - }, - Update { - lsn: lsn!("0/230CE70"), - command: EndOfBranch, - timeline_id: ids[3], - }, - ]; + println!("calculated synthetic size: {}", res.total_size); + println!("result: {:?}", serde_json::to_string(&res.segments)); - let sorted_commands = sort_updates_in_tree_order(commands).unwrap(); - - assert_eq!(sorted_commands, expected); + use utils::lsn::Lsn; + let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap(); + let last_lsn: Lsn = "47/280A5860".parse().unwrap(); + println!( + "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}", + u64::from(latest_gc_cutoff_lsn), + u64::from(last_lsn) + ); + assert_eq!(res.total_size, 220121784320); } diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index c943bf0a27..21c6ede27e 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -27,8 +27,7 @@ use std::fs::OpenOptions; use std::io::prelude::*; use std::io::{Error, ErrorKind}; use std::ops::{Deref, DerefMut}; -use std::os::fd::RawFd; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::prelude::CommandExt; use std::path::PathBuf; use std::process::Stdio; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b35252243e..0620ad8a35 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1217,7 +1217,7 @@ class PageserverHttpClient(requests.Session): """ Returns the tenant size, together with the model inputs as the second tuple item. """ - res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/size") + res = self.get(f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size") self.verbose_error(res) res = res.json() assert isinstance(res, dict) @@ -1228,6 +1228,16 @@ class PageserverHttpClient(requests.Session): assert type(inputs) is dict return (size, inputs) + def tenant_size_debug(self, tenant_id: TenantId) -> str: + """ + Returns the tenant size debug info, as an HTML string + """ + res = self.get( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/synthetic_size", + headers={"Accept": "text/html"}, + ) + return res.text + def timeline_list( self, tenant_id: TenantId, diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index bb3bca8782..8c2996f491 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -1,13 +1,13 @@ -from typing import Any, List, Tuple +from pathlib import Path +from typing import List, Tuple import pytest from fixtures.log_helper import log -from fixtures.metrics import parse_metrics from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn from fixtures.types import Lsn -def test_empty_tenant_size(neon_simple_env: NeonEnv): +def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path): env = neon_simple_env (tenant_id, _) = env.neon_cli.create_tenant() http_client = env.pageserver.http_client() @@ -18,6 +18,9 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): main_branch_name = "main" + branch_name, main_timeline_id = env.neon_cli.list_timelines(tenant_id)[0] + assert branch_name == main_branch_name + with env.postgres.create_start( main_branch_name, tenant_id=tenant_id, @@ -39,12 +42,44 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv): size, inputs = http_client.tenant_size_and_modelinputs(tenant_id) assert size == initial_size, "tenant_size should not be affected by shutdown of compute" - expected_commands: List[Any] = [{"branch_from": None}, "end_of_branch"] - actual_commands: List[Any] = list(map(lambda x: x["command"], inputs["updates"])) # type: ignore - assert actual_commands == expected_commands + expected_inputs = { + "segments": [ + { + "segment": {"parent": None, "lsn": 23694408, "size": 25362432, "needed": True}, + "timeline_id": f"{main_timeline_id}", + "kind": "BranchStart", + }, + { + "segment": {"parent": 0, "lsn": 23694528, "size": None, "needed": True}, + "timeline_id": f"{main_timeline_id}", + "kind": "BranchEnd", + }, + ], + "timeline_inputs": [ + { + "timeline_id": f"{main_timeline_id}", + "ancestor_id": None, + "ancestor_lsn": "0/0", + "last_record": "0/1698CC0", + "latest_gc_cutoff": "0/1698C48", + "horizon_cutoff": "0/0", + "pitr_cutoff": "0/0", + "next_gc_cutoff": "0/0", + "retention_param_cutoff": None, + } + ], + } + expected_inputs = mask_model_inputs(expected_inputs) + actual_inputs = mask_model_inputs(inputs) + + assert expected_inputs == actual_inputs + + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) -def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): +def test_branched_empty_timeline_size(neon_simple_env: NeonEnv, test_output_dir: Path): """ Issue found in production. Because the ancestor branch was under gc_horizon, the branchpoint was "dangling" and the computation could not be @@ -75,8 +110,12 @@ def test_branched_empty_timeline_size(neon_simple_env: NeonEnv): assert size_after_branching > initial_size + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) -def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): + +def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv, test_output_dir: Path): """ More general version of test_branched_empty_timeline_size @@ -128,9 +167,13 @@ def test_branched_from_many_empty_parents_size(neon_simple_env: NeonEnv): size_after_writes = http_client.tenant_size(tenant_id) assert size_after_writes > initial_size + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + @pytest.mark.skip("This should work, but is left out because assumed covered by other tests") -def test_branch_point_within_horizon(neon_simple_env: NeonEnv): +def test_branch_point_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path): """ gc_horizon = 15 @@ -167,9 +210,13 @@ def test_branch_point_within_horizon(neon_simple_env: NeonEnv): assert size_before_branching < size_after + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + @pytest.mark.skip("This should work, but is left out because assumed covered by other tests") -def test_parent_within_horizon(neon_simple_env: NeonEnv): +def test_parent_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path): """ gc_horizon = 5 @@ -179,7 +226,7 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv): """ env = neon_simple_env - gc_horizon = 200_000 + gc_horizon = 5_000 (tenant_id, main_id) = env.neon_cli.create_tenant(conf={"gc_horizon": str(gc_horizon)}) http_client = env.pageserver.http_client() @@ -212,9 +259,13 @@ def test_parent_within_horizon(neon_simple_env: NeonEnv): assert size_before_branching < size_after + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + @pytest.mark.skip("This should work, but is left out because assumed covered by other tests") -def test_only_heads_within_horizon(neon_simple_env: NeonEnv): +def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Path): """ gc_horizon = small @@ -253,8 +304,14 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv): latest_size = size_now + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) -def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): + +def test_single_branch_get_tenant_size_grows( + neon_env_builder: NeonEnvBuilder, test_output_dir: Path +): """ Operate on single branch reading the tenants size after each transaction. """ @@ -279,7 +336,20 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): collected_responses: List[Tuple[Lsn, int]] = [] + size_debug_file = open(test_output_dir / "size_debug.html", "w") + + def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int): + if current_lsn - initdb_lsn > gc_horizon: + assert ( + size >= prev + ), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size" + else: + assert ( + size > prev + ), "tenant_size should grow, because we continue to add WAL to initial snapshot size" + with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg: + initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) with pg.cursor() as cur: cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)") @@ -297,13 +367,19 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) - size = http_client.tenant_size(tenant_id) + size, sizes = http_client.tenant_size_and_modelinputs(tenant_id) + + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) if len(collected_responses) > 0: prev = collected_responses[-1][1] if size == 0: assert prev == 0 else: + # branch start shouldn't be past gc_horizon yet + # thus the size should grow as we insert more data + assert current_lsn - initdb_lsn <= gc_horizon assert size > prev collected_responses.append((current_lsn, size)) @@ -323,9 +399,15 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id) - size = http_client.tenant_size(tenant_id) + size, sizes = http_client.tenant_size_and_modelinputs(tenant_id) + + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + prev = collected_responses[-1][1] - assert size > prev, "tenant_size should grow with updates" + + check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev) + collected_responses.append((current_lsn, size)) while True: @@ -340,9 +422,9 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): size = http_client.tenant_size(tenant_id) prev = collected_responses[-1][1] - assert ( - size > prev - ), "even though rows have been deleted, the tenant_size should increase" + + check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev) + collected_responses.append((current_lsn, size)) with pg.cursor() as cur: @@ -352,7 +434,9 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): size = http_client.tenant_size(tenant_id) prev = collected_responses[-1][1] - assert size > prev, "dropping table grows tenant_size" + + check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev) + collected_responses.append((current_lsn, size)) # this isn't too many lines to forget for a while. observed while @@ -364,24 +448,17 @@ def test_single_branch_get_tenant_size_grows(neon_env_builder: NeonEnvBuilder): env.pageserver.stop() env.pageserver.start() + size_debug_file.close() + size_after = http_client.tenant_size(tenant_id) prev = collected_responses[-1][1] assert size_after == prev, "size after restarting pageserver should not have changed" - ps_metrics = parse_metrics(http_client.get_metrics(), "pageserver") - tenant_metric_filter = { - "tenant_id": str(tenant_id), - } - tenant_size_metric = int( - ps_metrics.query_one("pageserver_tenant_synthetic_size", filter=tenant_metric_filter).value - ) - - assert tenant_size_metric == size_after, "API size value should be equal to metric size value" - - -def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder): +def test_get_tenant_size_with_multiple_branches( + neon_env_builder: NeonEnvBuilder, test_output_dir: Path +): """ Reported size goes up while branches or rows are being added, goes down after removing branches. """ @@ -481,6 +558,10 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder size_after = http_client.tenant_size(tenant_id) assert size_after == size_after_thinning_branch + size_debug_file_before = open(test_output_dir / "size_debug_before.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file_before.write(size_debug) + # teardown, delete branches, and the size should be going down http_client.timeline_delete(tenant_id, first_branch_timeline_id) @@ -493,3 +574,38 @@ def test_get_tenant_size_with_multiple_branches(neon_env_builder: NeonEnvBuilder assert size_after_deleting_second < size_after_continuing_on_main assert size_after_deleting_second > size_after_first_branch + + size_debug_file = open(test_output_dir / "size_debug.html", "w") + size_debug = http_client.tenant_size_debug(tenant_id) + size_debug_file.write(size_debug) + + +# Helper for tests that compare timeline_inputs +# We don't want to compare the exact values, because they can be unstable +# and cause flaky tests. So replace the values with useful invariants. +def mask_model_inputs(x): + if isinstance(x, dict): + newx = {} + for k, v in x.items(): + if k == "size": + if v is None or v == 0: + # no change + newx[k] = v + elif v < 0: + newx[k] = "<0" + else: + newx[k] = ">0" + elif k.endswith("lsn") or k.endswith("cutoff") or k == "last_record": + if v is None or v == 0 or v == "0/0": + # no change + newx[k] = v + else: + newx[k] = "masked" + else: + newx[k] = mask_model_inputs(v) + return newx + elif isinstance(x, list): + newlist = [mask_model_inputs(v) for v in x] + return newlist + else: + return x