diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs index 92bec8aebe..b156e1be9d 100644 --- a/libs/tenant_size_model/src/lib.rs +++ b/libs/tenant_size_model/src/lib.rs @@ -134,22 +134,25 @@ impl Storage { op: Cow<'static, str>, lsn: u64, size: Option, - ) where + ) -> anyhow::Result<()> + where K: std::borrow::Borrow, Q: std::hash::Hash + Eq + std::fmt::Debug, { - let lastseg_id = *self.branches.get(branch).unwrap(); + let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; let newseg_id = self.segments.len(); let lastseg = &mut self.segments[lastseg_id]; assert!(lsn > lastseg.end_lsn); + let Some(start_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; + let newseg = Segment { op, parent: Some(lastseg_id), start_lsn: lastseg.end_lsn, end_lsn: lsn, - start_size: lastseg.end_size.unwrap(), + start_size, end_size: size, children_after: Vec::new(), needed: false, @@ -158,6 +161,8 @@ impl Storage { self.segments.push(newseg); *self.branches.get_mut(branch).expect("read already") = newseg_id; + + Ok(()) } /// Advances the branch with the named operation, by the relative LSN and logical size bytes. @@ -167,21 +172,24 @@ impl Storage { op: Cow<'static, str>, lsn_bytes: u64, size_bytes: i64, - ) where + ) -> anyhow::Result<()> + where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - let lastseg_id = *self.branches.get(branch).unwrap(); + let Some(lastseg_id) = self.branches.get(branch).copied() else { anyhow::bail!("branch not found: {branch:?}") }; let newseg_id = self.segments.len(); let lastseg = &mut self.segments[lastseg_id]; + let Some(last_end_size) = lastseg.end_size else { anyhow::bail!("no end_size on latest segment for {branch:?}") }; + let newseg = Segment { op, parent: Some(lastseg_id), start_lsn: lastseg.end_lsn, end_lsn: lastseg.end_lsn + lsn_bytes, - start_size: lastseg.end_size.unwrap(), - end_size: Some((lastseg.end_size.unwrap() as i64 + size_bytes) as u64), + start_size: last_end_size, + end_size: Some((last_end_size as i64 + size_bytes) as u64), children_after: Vec::new(), needed: false, }; @@ -189,33 +197,33 @@ impl Storage { self.segments.push(newseg); *self.branches.get_mut(branch).expect("read already") = newseg_id; + Ok(()) } - pub fn insert(&mut self, branch: &Q, bytes: u64) + pub fn insert(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "insert".into(), bytes, bytes as i64); + self.modify_branch(branch, "insert".into(), bytes, bytes as i64) } - pub fn update(&mut self, branch: &Q, bytes: u64) + pub fn update(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "update".into(), bytes, 0i64); + self.modify_branch(branch, "update".into(), bytes, 0i64) } - pub fn delete(&mut self, branch: &Q, bytes: u64) + pub fn delete(&mut self, branch: &Q, bytes: u64) -> anyhow::Result<()> where K: std::borrow::Borrow, - Q: std::hash::Hash + Eq, + Q: std::hash::Hash + Eq + std::fmt::Debug, { - self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)); + self.modify_branch(branch, "delete".into(), bytes, -(bytes as i64)) } - /// Panics if the parent branch cannot be found. pub fn branch(&mut self, parent: &Q, name: K) -> anyhow::Result<()> where K: std::borrow::Borrow + std::fmt::Debug, @@ -236,7 +244,7 @@ impl Storage { Ok(()) } - pub fn calculate(&mut self, retention_period: u64) -> SegmentSize { + pub fn calculate(&mut self, retention_period: u64) -> anyhow::Result { // Phase 1: Mark all the segments that need to be retained for (_branch, &last_seg_id) in self.branches.iter() { let last_seg = &self.segments[last_seg_id]; @@ -261,7 +269,7 @@ impl Storage { self.size_from_snapshot_later(0) } - fn size_from_wal(&self, seg_id: usize) -> SegmentSize { + fn size_from_wal(&self, seg_id: usize) -> anyhow::Result { let seg = &self.segments[seg_id]; let this_size = seg.end_lsn - seg.start_lsn; @@ -272,10 +280,10 @@ impl Storage { for &child_id in seg.children_after.iter() { // try each child both ways let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id); + let p1 = self.size_from_wal(child_id)?; let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id); + let p2 = self.size_from_snapshot_later(child_id)?; if p1.total() < p2.total() { p1 } else { @@ -286,15 +294,15 @@ impl Storage { }; children.push(p); } - SegmentSize { + Ok(SegmentSize { seg_id, method: if seg.needed { WalNeeded } else { Wal }, this_size, children, - } + }) } - fn size_from_snapshot_later(&self, seg_id: usize) -> SegmentSize { + fn size_from_snapshot_later(&self, seg_id: usize) -> anyhow::Result { // If this is needed, then it's time to do the snapshot and continue // with wal method. let seg = &self.segments[seg_id]; @@ -305,10 +313,10 @@ impl Storage { for &child_id in seg.children_after.iter() { // try each child both ways let child = &self.segments[child_id]; - let p1 = self.size_from_wal(child_id); + let p1 = self.size_from_wal(child_id)?; let p = if !child.needed { - let p2 = self.size_from_snapshot_later(child_id); + let p2 = self.size_from_snapshot_later(child_id)?; if p1.total() < p2.total() { p1 } else { @@ -319,12 +327,12 @@ impl Storage { }; children.push(p); } - SegmentSize { + Ok(SegmentSize { seg_id, method: WalNeeded, this_size: seg.start_size, children, - } + }) } else { // If any of the direct children are "needed", need to be able to reconstruct here let mut children_needed = false; @@ -339,7 +347,7 @@ impl Storage { let method1 = if !children_needed { let mut children = Vec::new(); for child in seg.children_after.iter() { - children.push(self.size_from_snapshot_later(*child)); + children.push(self.size_from_snapshot_later(*child)?); } Some(SegmentSize { seg_id, @@ -355,20 +363,25 @@ impl Storage { let method2 = if children_needed || seg.children_after.len() >= 2 { let mut children = Vec::new(); for child in seg.children_after.iter() { - children.push(self.size_from_wal(*child)); + children.push(self.size_from_wal(*child)?); } + let Some(this_size) = seg.end_size else { anyhow::bail!("no end_size at junction {seg_id}") }; Some(SegmentSize { seg_id, method: SnapshotAfter, - this_size: seg.end_size.unwrap(), + this_size, children, }) } else { None }; - match (method1, method2) { - (None, None) => panic!(), + Ok(match (method1, method2) { + (None, None) => anyhow::bail!( + "neither method was applicable: children_after={}, children_needed={}", + seg.children_after.len(), + children_needed + ), (Some(method), None) => method, (None, Some(method)) => method, (Some(method1), Some(method2)) => { @@ -378,7 +391,7 @@ impl Storage { method2 } } - } + }) } } diff --git a/libs/tenant_size_model/src/main.rs b/libs/tenant_size_model/src/main.rs index 9378a98a09..e32dd055f4 100644 --- a/libs/tenant_size_model/src/main.rs +++ b/libs/tenant_size_model/src/main.rs @@ -7,118 +7,118 @@ use tenant_size_model::{Segment, SegmentSize, Storage}; // Main branch only. Some updates on it. -fn scenario_1() -> (Vec, SegmentSize) { +fn scenario_1() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Main branch only. Some updates on it. -fn scenario_2() -> (Vec, SegmentSize) { +fn scenario_2() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent - storage.update("main", 1_000); + storage.update("main", 1_000)?; - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Like 2, but more updates on main -fn scenario_3() -> (Vec, SegmentSize) { +fn scenario_3() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } // Diverged branches -fn scenario_4() -> (Vec, SegmentSize) { +fn scenario_4() -> anyhow::Result<(Vec, SegmentSize)> { // Create main branch let mut storage = Storage::new("main"); // Bulk load 5 GB of data to it - storage.insert("main", 5_000); + storage.insert("main", 5_000)?; // Stream of updates for _ in 0..5 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } // Branch - storage.branch("main", "child").unwrap(); - storage.update("child", 1_000); + storage.branch("main", "child")?; + storage.update("child", 1_000)?; // More updates on parent for _ in 0..8 { - storage.update("main", 1_000); + storage.update("main", 1_000)?; } - let size = storage.calculate(1000); + let size = storage.calculate(1000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } -fn scenario_5() -> (Vec, SegmentSize) { +fn scenario_5() -> anyhow::Result<(Vec, SegmentSize)> { let mut storage = Storage::new("a"); - storage.insert("a", 5000); - storage.branch("a", "b").unwrap(); - storage.update("b", 4000); - storage.update("a", 2000); - storage.branch("a", "c").unwrap(); - storage.insert("c", 4000); - storage.insert("a", 2000); + storage.insert("a", 5000)?; + storage.branch("a", "b")?; + storage.update("b", 4000)?; + storage.update("a", 2000)?; + storage.branch("a", "c")?; + storage.insert("c", 4000)?; + storage.insert("a", 2000)?; - let size = storage.calculate(5000); + let size = storage.calculate(5000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } -fn scenario_6() -> (Vec, SegmentSize) { +fn scenario_6() -> anyhow::Result<(Vec, SegmentSize)> { use std::borrow::Cow; const NO_OP: Cow<'static, str> = Cow::Borrowed(""); @@ -133,18 +133,18 @@ fn scenario_6() -> (Vec, SegmentSize) { let mut storage = Storage::new(None); - storage.branch(&None, branches[0]).unwrap(); // at 0 - storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128); // at 108951064 - storage.branch(&branches[0], branches[1]).unwrap(); // at 108951064 - storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392); // at 124511472 - storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904); // at 283415424 - storage.branch(&branches[0], branches[2]).unwrap(); // at 283415424 - storage.modify_branch(&branches[2], NO_OP, 15906192, 8192); // at 299321616 - storage.modify_branch(&branches[0], NO_OP, 18909976, 32768); // at 302325400 + storage.branch(&None, branches[0])?; // at 0 + storage.modify_branch(&branches[0], NO_OP, 108951064, 43696128)?; // at 108951064 + storage.branch(&branches[0], branches[1])?; // at 108951064 + storage.modify_branch(&branches[1], NO_OP, 15560408, -1851392)?; // at 124511472 + storage.modify_branch(&branches[0], NO_OP, 174464360, -1531904)?; // at 283415424 + storage.branch(&branches[0], branches[2])?; // at 283415424 + storage.modify_branch(&branches[2], NO_OP, 15906192, 8192)?; // at 299321616 + storage.modify_branch(&branches[0], NO_OP, 18909976, 32768)?; // at 302325400 - let size = storage.calculate(100_000); + let size = storage.calculate(100_000)?; - (storage.into_segments(), size) + Ok((storage.into_segments(), size)) } fn main() { @@ -163,7 +163,8 @@ fn main() { eprintln!("invalid scenario {}", other); std::process::exit(1); } - }; + } + .unwrap(); graphviz_tree(&segments, &size); } @@ -251,7 +252,7 @@ fn graphviz_tree(segments: &[Segment], tree: &SegmentSize) { #[test] fn scenarios_return_same_size() { - type ScenarioFn = fn() -> (Vec, SegmentSize); + type ScenarioFn = fn() -> anyhow::Result<(Vec, SegmentSize)>; let truths: &[(u32, ScenarioFn, _)] = &[ (line!(), scenario_1, 8000), (line!(), scenario_2, 9000), @@ -262,7 +263,7 @@ fn scenarios_return_same_size() { ]; for (line, scenario, expected) in truths { - let (_, size) = scenario(); + let (_, size) = scenario().unwrap(); assert_eq!(*expected, size.total_children(), "scenario on line {line}"); } } diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 2181d6d4dc..61cb32fc76 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -194,6 +194,15 @@ pub(super) async fn gather_inputs( let timelines = tenant.list_timelines(); + if timelines.is_empty() { + // perhaps the tenant has just been created, and as such doesn't have any data yet + return Ok(ModelInputs { + updates: vec![], + retention_period: 0, + timeline_inputs: HashMap::default(), + }); + } + // record the used/inserted cache keys here, to remove extras not to start leaking // after initial run the cache should be quite stable, but live timelines will eventually // require new lsns to be inspected. @@ -505,10 +514,10 @@ impl ModelInputs { let Lsn(now) = *lsn; match op { Command::Update(sz) => { - storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz)); + storage.insert_point(&Some(*timeline_id), "".into(), now, Some(*sz))?; } Command::EndOfBranch => { - storage.insert_point(&Some(*timeline_id), "".into(), now, None); + storage.insert_point(&Some(*timeline_id), "".into(), now, None)?; } Command::BranchFrom(parent) => { // This branch command may fail if it cannot find a parent to branch from. @@ -517,7 +526,7 @@ impl ModelInputs { } } - Ok(storage.calculate(self.retention_period).total_children()) + Ok(storage.calculate(self.retention_period)?.total_children()) } }