Fix logical size calculation:

sort updates in topological order so that the parent timeline always preceeds its children.
    fixes #3179
This commit is contained in:
Anastasia Lubennikova
2023-01-11 21:31:07 +02:00
parent 0675859bb0
commit 148e020fb9
2 changed files with 134 additions and 3 deletions

View File

@@ -2,7 +2,6 @@ use std::borrow::Cow;
use std::collections::HashMap;
use anyhow::Context;
use log::info;
/// Pricing model or history size builder.
///

View File

@@ -44,6 +44,129 @@ struct TimelineInputs {
next_gc_cutoff: Lsn,
}
// Adjust BracnchFrom sorting so that we always process ancestor
// before descendants.
//
// i.e. if we have following order
// Update { lsn: 0/0, command: BranchFrom(None), timeline_id: 1 },
// Update { lsn: 0/169AD58, command: Update(25387008), timeline_id: 1 },
// Update { lsn: 0/169AD58, command: BranchFrom(Some(2)), timeline_id: 3 },
// Update { lsn: 0/169AD58, command: BranchFrom(Some(1)), timeline_id: 2 },
//
// last two lines must be reordered to
// Update { lsn: 0/169AD58, command: BranchFrom(Some(1)), timeline_id: 2 },
// Update { lsn: 0/169AD58, command: BranchFrom(Some(2)), timeline_id: 3 },
//
fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update>> {
let mut sorted_updates = Vec::with_capacity(updates.len());
info!("Sorting updates in tree order {:?}", updates);
let mut known_timelineids = HashSet::new();
let mut i = 0;
while i < updates.len() {
let curr_upd = &updates[i];
if let Command::BranchFrom(parent_id) = curr_upd.command {
// root timeline, branches from None
let parent_id = match parent_id {
Some(parent_id) if known_timelineids.contains(&parent_id) => {
// we have already processed ancestor
// process this BranchFrom Update normally
known_timelineids.insert(curr_upd.timeline_id);
sorted_updates.push(*curr_upd);
i += 1;
continue;
}
None => {
// root timeline, branches from None
known_timelineids.insert(curr_upd.timeline_id);
sorted_updates.push(*curr_upd);
i += 1;
continue;
}
Some(parent_id) => parent_id,
};
let mut j = i;
// we have not processed ancestor yet.
// there is a chance that it is at the same Lsn
if !known_timelineids.contains(&parent_id) {
info!("Found possibly orphan branch {:?}", curr_upd);
let mut curr_lsn_branchfroms: HashMap<TimelineId, Vec<(TimelineId, usize)>> =
HashMap::new();
// inspect all branchpoints at the same lsn
while j < updates.len() && updates[j].lsn == curr_upd.lsn {
let lookahead_upd = &updates[j];
j += 1;
if let Command::BranchFrom(lookahead_parent_id) = lookahead_upd.command {
match lookahead_parent_id {
Some(lookahead_parent_id)
if !known_timelineids.contains(&lookahead_parent_id) =>
{
// we have not processed ancestor yet
// store it for later
curr_lsn_branchfroms
.entry(lookahead_parent_id)
.and_modify(|e| {
e.push((lookahead_upd.timeline_id, j));
})
.or_insert_with(|| vec![(lookahead_upd.timeline_id, j)]);
}
_ => {
// we have already processed ancestor
// process this BranchFrom Update normally
known_timelineids.insert(lookahead_upd.timeline_id);
sorted_updates.push(*lookahead_upd);
}
}
}
}
// process BranchFroms in the tree order
// check that we don't have a cycle if somet entry is orphan
// (this should not happen, but better to be safe)
let mut processed_some_entry = true;
while processed_some_entry {
processed_some_entry = false;
curr_lsn_branchfroms.retain(|parent_id, branchfroms| {
if known_timelineids.contains(parent_id) {
for (timeline_id, j) in branchfroms {
known_timelineids.insert(*timeline_id);
sorted_updates.push(updates[*j - 1]);
}
processed_some_entry = true;
false
} else {
true
}
});
}
if !curr_lsn_branchfroms.is_empty() {
anyhow::bail!("orphan branch detected in BranchFroms");
}
}
assert!(j > i);
i = j;
} else {
// not a BranchFrom, keep the same order
sorted_updates.push(*curr_upd);
i += 1;
}
}
info!("Sorting updates in tree order done");
Ok(sorted_updates)
}
/// Gathers the inputs for the tenant sizing model.
///
/// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
@@ -68,6 +191,8 @@ pub(super) async fn gather_inputs(
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
info!("start gathering inputs for tenant {}", tenant.tenant_id);
let timelines = tenant
.refresh_gc_info()
.await
@@ -267,7 +392,9 @@ pub(super) async fn gather_inputs(
// for branch points, which come as multiple updates at the same LSN, the Command::Update
// is needed before a branch is made out of that branch Command::BranchFrom. this is
// handled by the variant order in `Command`.
//
updates.sort_unstable();
let sorted_updates = sort_updates_in_tree_order(updates)?;
let retention_period = match max_cutoff_distance {
Some(max) => max.0,
@@ -276,8 +403,10 @@ pub(super) async fn gather_inputs(
}
};
info!("done gathering inputs for tenant {}", tenant.tenant_id);
Ok(ModelInputs {
updates,
updates: sorted_updates,
retention_period,
timeline_inputs,
})
@@ -295,6 +424,7 @@ impl ModelInputs {
command: op,
timeline_id,
} = update;
let Lsn(now) = *lsn;
match op {
Command::Update(sz) => {
@@ -304,7 +434,8 @@ impl ModelInputs {
storage.insert_point(&Some(*timeline_id), "".into(), now, None);
}
Command::BranchFrom(parent) => {
storage.branch(parent, Some(*timeline_id));
// This branch command may fail if it cannot find a parent to branch from.
storage.branch(parent, Some(*timeline_id))?;
}
}
}
@@ -372,6 +503,7 @@ async fn calculate_logical_size(
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}