review code cleanup:

- handle errors in calculate_synthetic_size_worker. Don't exit the bgworker if one tenant failed.

- add cached_synthetic_tenant_size to cache values calculated by the bgworker

- code cleanup: remove unneeded info! messages, clean comments

- handle collect_metrics_task() error. Don't exit collect_metrics worker if one task failed.

 - add unit test to cover case when we have multiple branches at the same lsn
This commit is contained in:
Anastasia Lubennikova
2023-01-12 19:08:42 +02:00
parent 148e020fb9
commit 26f39c03f2
5 changed files with 180 additions and 50 deletions

1
Cargo.lock generated
View File

@@ -3616,7 +3616,6 @@ name = "tenant_size_model"
version = "0.1.0"
dependencies = [
"anyhow",
"log",
"workspace_hack",
]

View File

@@ -7,5 +7,4 @@ license = "Apache-2.0"
[dependencies]
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
log = { version = "0.4", features = ["std", "serde"] }
anyhow = "1.0.68"

View File

@@ -179,7 +179,10 @@ pub async fn collect_metrics(
return Ok(());
},
_ = ticker.tick() => {
collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await?;
if let Err(err) = collect_metrics_task(&client, &mut cached_metrics, metric_collection_endpoint, node_id).await
{
error!("metrics collection failed: {err:?}");
}
}
}
}
@@ -271,9 +274,9 @@ pub async fn collect_metrics_task(
tenant_remote_size,
));
// TODO add SyntheticStorageSize metric
let tenant_synthetic_size = tenant.calculate_synthetic_size().await?;
info!("tenant_synthetic_size: {}", tenant_synthetic_size);
// Note that this metric is calculated in a separate bgworker
// Here we only use cached value, which may lag behind the real latest one
let tenant_synthetic_size = tenant.get_cached_synthetic_size();
current_metrics.push((
ConsumptionMetricsKey {
tenant_id,
@@ -291,7 +294,7 @@ pub async fn collect_metrics_task(
});
if current_metrics.is_empty() {
info!("no new metrics to send");
trace!("no new metrics to send");
return Ok(());
}
@@ -364,7 +367,6 @@ pub async fn calculate_synthetic_size_worker(
loop {
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
info!("calculate_synthetic_size_worker received cancellation request");
return Ok(());
},
_ = ticker.tick() => {
@@ -377,9 +379,17 @@ pub async fn calculate_synthetic_size_worker(
continue;
}
let tenant = mgr::get_tenant(tenant_id, true).await?;
info!("spawn calculate_synthetic_size for tenant {}", tenant_id);
tenant.calculate_synthetic_size().await?;
match mgr::get_tenant(tenant_id, true).await
{
Ok(tenant) => {
if let Err(e) = tenant.calculate_synthetic_size().await {
error!("failed to calculate synthetic size for tenant {}: {}", tenant_id, e);
}
},
Err(err) => {
error!("tenant {} is not found: {err:?}", tenant_id);
}
}
}
}

View File

@@ -38,6 +38,8 @@ use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::MutexGuard;
use std::sync::{Mutex, RwLock};
@@ -139,6 +141,7 @@ pub struct Tenant {
/// Cached logical sizes updated updated on each [`Tenant::gather_size_inputs`].
cached_logical_sizes: tokio::sync::Mutex<HashMap<(TimelineId, Lsn), u64>>,
cached_synthetic_tenant_size: Arc<AtomicU64>,
}
/// A timeline with some of its files on disk, being initialized.
@@ -1722,6 +1725,7 @@ impl Tenant {
remote_storage,
state,
cached_logical_sizes: tokio::sync::Mutex::new(HashMap::new()),
cached_synthetic_tenant_size: Arc::new(AtomicU64::new(0)),
}
}
@@ -2361,23 +2365,22 @@ impl Tenant {
}
/// Calculate synthetic tenant size
/// This is periodically called by background worker
///
/// This is periodically called by background worker.
/// result is cached in tenant struct
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
pub async fn calculate_synthetic_size(&self) -> anyhow::Result<u64> {
let inputs = self.gather_size_inputs().await?;
let size = inputs
.calculate()
.unwrap_or_else(|e| panic!("err {}, inputs {:?}", e, inputs)); // FIXME this panic is debug only.
let size = inputs.calculate()?;
info!(
"calculate_synthetic_size for tenant {} size: {}",
self.tenant_id, size,
);
self.cached_synthetic_tenant_size
.store(size, Ordering::Relaxed);
Ok(size)
}
pub fn get_cached_synthetic_size(&self) -> u64 {
self.cached_synthetic_tenant_size.load(Ordering::Relaxed)
}
}
fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {

View File

@@ -44,24 +44,17 @@ struct TimelineInputs {
next_gc_cutoff: Lsn,
}
// Adjust BracnchFrom sorting so that we always process ancestor
// before descendants.
// Adjust BranchFrom sorting so that we always process ancestor
// before descendants. This is needed to correctly calculate size of
// descendant timelines.
//
// i.e. if we have following order
// Update { lsn: 0/0, command: BranchFrom(None), timeline_id: 1 },
// Update { lsn: 0/169AD58, command: Update(25387008), timeline_id: 1 },
// Update { lsn: 0/169AD58, command: BranchFrom(Some(2)), timeline_id: 3 },
// Update { lsn: 0/169AD58, command: BranchFrom(Some(1)), timeline_id: 2 },
//
// last two lines must be reordered to
// Update { lsn: 0/169AD58, command: BranchFrom(Some(1)), timeline_id: 2 },
// Update { lsn: 0/169AD58, command: BranchFrom(Some(2)), timeline_id: 3 },
// Note that we may have multiple BranchFroms at the same LSN, so we
// need to sort them in the tree order.
//
// see updates_sort_with_branches_at_same_lsn test below
fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update>> {
let mut sorted_updates = Vec::with_capacity(updates.len());
info!("Sorting updates in tree order {:?}", updates);
let mut known_timelineids = HashSet::new();
let mut i = 0;
while i < updates.len() {
@@ -79,7 +72,6 @@ fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update
continue;
}
None => {
// root timeline, branches from None
known_timelineids.insert(curr_upd.timeline_id);
sorted_updates.push(*curr_upd);
i += 1;
@@ -93,8 +85,6 @@ fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update
// we have not processed ancestor yet.
// there is a chance that it is at the same Lsn
if !known_timelineids.contains(&parent_id) {
info!("Found possibly orphan branch {:?}", curr_upd);
let mut curr_lsn_branchfroms: HashMap<TimelineId, Vec<(TimelineId, usize)>> =
HashMap::new();
@@ -110,12 +100,9 @@ fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update
{
// we have not processed ancestor yet
// store it for later
curr_lsn_branchfroms
.entry(lookahead_parent_id)
.and_modify(|e| {
e.push((lookahead_upd.timeline_id, j));
})
.or_insert_with(|| vec![(lookahead_upd.timeline_id, j)]);
let es =
curr_lsn_branchfroms.entry(lookahead_parent_id).or_default();
es.push((lookahead_upd.timeline_id, j));
}
_ => {
// we have already processed ancestor
@@ -149,7 +136,11 @@ fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update
}
if !curr_lsn_branchfroms.is_empty() {
anyhow::bail!("orphan branch detected in BranchFroms");
// orphans are expected to be rare and transient between tenant reloads
// for example, an broken ancestor without the child branch being broken.
anyhow::bail!(
"orphan branch(es) detected in BranchFroms: {curr_lsn_branchfroms:?}"
);
}
}
@@ -162,8 +153,6 @@ fn sort_updates_in_tree_order(updates: Vec<Update>) -> anyhow::Result<Vec<Update
}
}
info!("Sorting updates in tree order done");
Ok(sorted_updates)
}
@@ -191,8 +180,6 @@ pub(super) async fn gather_inputs(
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
info!("start gathering inputs for tenant {}", tenant.tenant_id);
let timelines = tenant
.refresh_gc_info()
.await
@@ -403,8 +390,6 @@ pub(super) async fn gather_inputs(
}
};
info!("done gathering inputs for tenant {}", tenant.tenant_id);
Ok(ModelInputs {
updates: sorted_updates,
retention_period,
@@ -595,3 +580,137 @@ fn verify_size_for_multiple_branches() {
assert_eq!(inputs.calculate().unwrap(), 36_409_872);
}
#[test]
fn updates_sort_with_branches_at_same_lsn() {
use std::str::FromStr;
use Command::{BranchFrom, EndOfBranch};
macro_rules! lsn {
($e:expr) => {
Lsn::from_str($e).unwrap()
};
}
let ids = [
TimelineId::from_str("00000000000000000000000000000000").unwrap(),
TimelineId::from_str("11111111111111111111111111111111").unwrap(),
TimelineId::from_str("22222222222222222222222222222222").unwrap(),
TimelineId::from_str("33333333333333333333333333333333").unwrap(),
TimelineId::from_str("44444444444444444444444444444444").unwrap(),
];
// issue https://github.com/neondatabase/neon/issues/3179
let commands = vec![
Update {
lsn: lsn!("0/0"),
command: BranchFrom(None),
timeline_id: ids[0],
},
Update {
lsn: lsn!("0/169AD58"),
command: Command::Update(25387008),
timeline_id: ids[0],
},
// next three are wrongly sorted, because
// ids[1] is branched from before ids[1] exists
// and ids[2] is branched from before ids[2] exists
Update {
lsn: lsn!("0/169AD58"),
command: BranchFrom(Some(ids[1])),
timeline_id: ids[3],
},
Update {
lsn: lsn!("0/169AD58"),
command: BranchFrom(Some(ids[0])),
timeline_id: ids[2],
},
Update {
lsn: lsn!("0/169AD58"),
command: BranchFrom(Some(ids[2])),
timeline_id: ids[1],
},
Update {
lsn: lsn!("0/1CA85B8"),
command: Command::Update(28925952),
timeline_id: ids[1],
},
Update {
lsn: lsn!("0/1CD85B8"),
command: Command::Update(29024256),
timeline_id: ids[1],
},
Update {
lsn: lsn!("0/1CD85B8"),
command: BranchFrom(Some(ids[1])),
timeline_id: ids[4],
},
Update {
lsn: lsn!("0/22DCE70"),
command: Command::Update(32546816),
timeline_id: ids[3],
},
Update {
lsn: lsn!("0/230CE70"),
command: EndOfBranch,
timeline_id: ids[3],
},
];
let expected = vec![
Update {
lsn: lsn!("0/0"),
command: BranchFrom(None),
timeline_id: ids[0],
},
Update {
lsn: lsn!("0/169AD58"),
command: Command::Update(25387008),
timeline_id: ids[0],
},
Update {
lsn: lsn!("0/169AD58"),
command: BranchFrom(Some(ids[0])),
timeline_id: ids[2],
},
Update {
lsn: lsn!("0/169AD58"),
command: BranchFrom(Some(ids[2])),
timeline_id: ids[1],
},
Update {
lsn: lsn!("0/169AD58"),
command: BranchFrom(Some(ids[1])),
timeline_id: ids[3],
},
Update {
lsn: lsn!("0/1CA85B8"),
command: Command::Update(28925952),
timeline_id: ids[1],
},
Update {
lsn: lsn!("0/1CD85B8"),
command: Command::Update(29024256),
timeline_id: ids[1],
},
Update {
lsn: lsn!("0/1CD85B8"),
command: BranchFrom(Some(ids[1])),
timeline_id: ids[4],
},
Update {
lsn: lsn!("0/22DCE70"),
command: Command::Update(32546816),
timeline_id: ids[3],
},
Update {
lsn: lsn!("0/230CE70"),
command: EndOfBranch,
timeline_id: ids[3],
},
];
let sorted_commands = sort_updates_in_tree_order(commands).unwrap();
assert_eq!(sorted_commands, expected);
}