diff --git a/libs/tenant_size_model/src/lib.rs b/libs/tenant_size_model/src/lib.rs index 8943d8cf15..92bec8aebe 100644 --- a/libs/tenant_size_model/src/lib.rs +++ b/libs/tenant_size_model/src/lib.rs @@ -2,7 +2,6 @@ use std::borrow::Cow; use std::collections::HashMap; use anyhow::Context; -use log::info; /// Pricing model or history size builder. /// diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index aa11985cbe..d94d013749 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -44,6 +44,129 @@ struct TimelineInputs { next_gc_cutoff: Lsn, } +// Adjust BracnchFrom sorting so that we always process ancestor +// before descendants. +// +// i.e. if we have following order +// Update { lsn: 0/0, command: BranchFrom(None), timeline_id: 1 }, +// Update { lsn: 0/169AD58, command: Update(25387008), timeline_id: 1 }, +// Update { lsn: 0/169AD58, command: BranchFrom(Some(2)), timeline_id: 3 }, +// Update { lsn: 0/169AD58, command: BranchFrom(Some(1)), timeline_id: 2 }, +// +// last two lines must be reordered to +// Update { lsn: 0/169AD58, command: BranchFrom(Some(1)), timeline_id: 2 }, +// Update { lsn: 0/169AD58, command: BranchFrom(Some(2)), timeline_id: 3 }, +// +fn sort_updates_in_tree_order(updates: Vec) -> anyhow::Result> { + let mut sorted_updates = Vec::with_capacity(updates.len()); + + info!("Sorting updates in tree order {:?}", updates); + + let mut known_timelineids = HashSet::new(); + let mut i = 0; + while i < updates.len() { + let curr_upd = &updates[i]; + + if let Command::BranchFrom(parent_id) = curr_upd.command { + // root timeline, branches from None + let parent_id = match parent_id { + Some(parent_id) if known_timelineids.contains(&parent_id) => { + // we have already processed ancestor + // process this BranchFrom Update normally + known_timelineids.insert(curr_upd.timeline_id); + sorted_updates.push(*curr_upd); + i += 1; + continue; + } + None => { + // root timeline, branches from None + known_timelineids.insert(curr_upd.timeline_id); + sorted_updates.push(*curr_upd); + i += 1; + continue; + } + Some(parent_id) => parent_id, + }; + + let mut j = i; + + // we have not processed ancestor yet. + // there is a chance that it is at the same Lsn + if !known_timelineids.contains(&parent_id) { + info!("Found possibly orphan branch {:?}", curr_upd); + + let mut curr_lsn_branchfroms: HashMap> = + HashMap::new(); + + // inspect all branchpoints at the same lsn + while j < updates.len() && updates[j].lsn == curr_upd.lsn { + let lookahead_upd = &updates[j]; + j += 1; + + if let Command::BranchFrom(lookahead_parent_id) = lookahead_upd.command { + match lookahead_parent_id { + Some(lookahead_parent_id) + if !known_timelineids.contains(&lookahead_parent_id) => + { + // we have not processed ancestor yet + // store it for later + curr_lsn_branchfroms + .entry(lookahead_parent_id) + .and_modify(|e| { + e.push((lookahead_upd.timeline_id, j)); + }) + .or_insert_with(|| vec![(lookahead_upd.timeline_id, j)]); + } + _ => { + // we have already processed ancestor + // process this BranchFrom Update normally + known_timelineids.insert(lookahead_upd.timeline_id); + sorted_updates.push(*lookahead_upd); + } + } + } + } + + // process BranchFroms in the tree order + // check that we don't have a cycle if somet entry is orphan + // (this should not happen, but better to be safe) + let mut processed_some_entry = true; + while processed_some_entry { + processed_some_entry = false; + + curr_lsn_branchfroms.retain(|parent_id, branchfroms| { + if known_timelineids.contains(parent_id) { + for (timeline_id, j) in branchfroms { + known_timelineids.insert(*timeline_id); + sorted_updates.push(updates[*j - 1]); + } + processed_some_entry = true; + false + } else { + true + } + }); + } + + if !curr_lsn_branchfroms.is_empty() { + anyhow::bail!("orphan branch detected in BranchFroms"); + } + } + + assert!(j > i); + i = j; + } else { + // not a BranchFrom, keep the same order + sorted_updates.push(*curr_upd); + i += 1; + } + } + + info!("Sorting updates in tree order done"); + + Ok(sorted_updates) +} + /// Gathers the inputs for the tenant sizing model. /// /// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which @@ -68,6 +191,8 @@ pub(super) async fn gather_inputs( // our advantage with `?` error handling. let mut joinset = tokio::task::JoinSet::new(); + info!("start gathering inputs for tenant {}", tenant.tenant_id); + let timelines = tenant .refresh_gc_info() .await @@ -267,7 +392,9 @@ pub(super) async fn gather_inputs( // for branch points, which come as multiple updates at the same LSN, the Command::Update // is needed before a branch is made out of that branch Command::BranchFrom. this is // handled by the variant order in `Command`. + // updates.sort_unstable(); + let sorted_updates = sort_updates_in_tree_order(updates)?; let retention_period = match max_cutoff_distance { Some(max) => max.0, @@ -276,8 +403,10 @@ pub(super) async fn gather_inputs( } }; + info!("done gathering inputs for tenant {}", tenant.tenant_id); + Ok(ModelInputs { - updates, + updates: sorted_updates, retention_period, timeline_inputs, }) @@ -295,6 +424,7 @@ impl ModelInputs { command: op, timeline_id, } = update; + let Lsn(now) = *lsn; match op { Command::Update(sz) => { @@ -304,7 +434,8 @@ impl ModelInputs { storage.insert_point(&Some(*timeline_id), "".into(), now, None); } Command::BranchFrom(parent) => { - storage.branch(parent, Some(*timeline_id)); + // This branch command may fail if it cannot find a parent to branch from. + storage.branch(parent, Some(*timeline_id))?; } } } @@ -372,6 +503,7 @@ async fn calculate_logical_size( let size_res = timeline .spawn_ondemand_logical_size_calculation(lsn) + .instrument(info_span!("spawn_ondemand_logical_size_calculation")) .await?; Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res)) }