Files
neon/pageserver/src/tenant/size.rs
Joonas Koivunen ed9a114bde fix: find gc cutoff points without holding Tenant::gc_cs (#7585)
The current implementation of finding timeline gc cutoff Lsn(s) is done
while holding `Tenant::gc_cs`. In recent incidents long create branch
times were caused by holding the `Tenant::gc_cs` over extremely long
`Timeline::find_lsn_by_timestamp`. The fix is to find the GC cutoff
values before taking the `Tenant::gc_cs` lock. This change is safe to do
because the GC cutoff values and the branch points have no dependencies
on each other. In the case of `Timeline::find_gc_cutoff` taking a long
time with this change, we should no longer see `Tenant::gc_cs`
interfering with branch creation.

Additionally, the `Tenant::refresh_gc_info` is now tolerant of timeline
deletions (or any other failures to find the pitr_cutoff). This helps
with the synthetic size calculation being constantly completed instead
of having a break for a timely timeline deletion.

Fixes: #7560
Fixes: #7587
2024-05-03 14:57:26 +03:00

729 lines
23 KiB
Rust

use std::cmp;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use anyhow::{bail, Context};
use tokio::sync::oneshot::error::RecvError;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use crate::context::RequestContext;
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
use super::{LogicalSizeCalculationCause, Tenant};
use crate::tenant::Timeline;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use tracing::*;
use tenant_size_model::{Segment, StorageModel};
/// Inputs to the actual tenant sizing model
///
/// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
/// be a transferrable format between execution environments and developer.
///
/// This tracks more information than the actual StorageModel that calculation
/// needs. We will convert this into a StorageModel when it's time to perform
/// the calculation.
///
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct ModelInputs {
pub segments: Vec<SegmentMeta>,
pub timeline_inputs: Vec<TimelineInputs>,
}
/// A [`Segment`], with some extra information for display purposes
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SegmentMeta {
pub segment: Segment,
pub timeline_id: TimelineId,
pub kind: LsnKind,
}
impl SegmentMeta {
fn size_needed(&self) -> bool {
match self.kind {
LsnKind::BranchStart => {
// If we don't have a later GcCutoff point on this branch, and
// no ancestor, calculate size for the branch start point.
self.segment.needed && self.segment.parent.is_none()
}
LsnKind::BranchPoint => true,
LsnKind::GcCutOff => true,
LsnKind::BranchEnd => false,
}
}
}
#[derive(
Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
)]
pub enum LsnKind {
/// A timeline starting here
BranchStart,
/// A child timeline branches off from here
BranchPoint,
/// GC cutoff point
GcCutOff,
/// Last record LSN
BranchEnd,
}
/// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
/// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct TimelineInputs {
pub timeline_id: TimelineId,
pub ancestor_id: Option<TimelineId>,
ancestor_lsn: Lsn,
last_record: Lsn,
latest_gc_cutoff: Lsn,
horizon_cutoff: Lsn,
pitr_cutoff: Lsn,
/// Cutoff point based on GC settings
next_gc_cutoff: Lsn,
/// Cutoff point calculated from the user-supplied 'max_retention_period'
retention_param_cutoff: Option<Lsn>,
}
/// Gathers the inputs for the tenant sizing model.
///
/// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
/// is updated on-demand, during the start of this calculation and separate from the
/// [`TimelineInputs::latest_gc_cutoff`].
///
/// For timelines in general:
///
/// ```text
/// 0-----|---------|----|------------| · · · · · |·> lsn
/// initdb_lsn branchpoints* next_gc_cutoff latest
/// ```
///
/// Until gc_horizon_cutoff > `Timeline::last_record_lsn` for any of the tenant's timelines, the
/// tenant size will be zero.
pub(super) async fn gather_inputs(
tenant: &Tenant,
limit: &Arc<Semaphore>,
max_retention_period: Option<u64>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<ModelInputs> {
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
tenant
.refresh_gc_info(cancel, ctx)
.await
.context("Failed to refresh gc_info before gathering inputs")?;
// Collect information about all the timelines
let mut timelines = tenant.list_timelines();
if timelines.is_empty() {
// perhaps the tenant has just been created, and as such doesn't have any data yet
return Ok(ModelInputs {
segments: vec![],
timeline_inputs: Vec::new(),
});
}
// Filter out timelines that are not active
//
// There may be a race when a timeline is dropped,
// but it is unlikely to cause any issues. In the worst case,
// the calculation will error out.
timelines.retain(|t| t.is_active());
// Build a map of branch points.
let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
for timeline in timelines.iter() {
if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
branchpoints
.entry(ancestor_id)
.or_default()
.insert(timeline.get_ancestor_lsn());
}
}
// These become the final result.
let mut timeline_inputs = Vec::with_capacity(timelines.len());
let mut segments: Vec<SegmentMeta> = Vec::new();
//
// Build Segments representing each timeline. As we do that, also remember
// the branchpoints and branch startpoints in 'branchpoint_segments' and
// 'branchstart_segments'
//
// BranchPoint segments of each timeline
// (timeline, branchpoint LSN) -> segment_id
let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
// timeline, Branchpoint seg id, (ancestor, ancestor LSN)
type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
for timeline in timelines.iter() {
let timeline_id = timeline.timeline_id;
let last_record_lsn = timeline.get_last_record_lsn();
let ancestor_lsn = timeline.get_ancestor_lsn();
// there's a race between the update (holding tenant.gc_lock) and this read but it
// might not be an issue, because it's not for Timeline::gc
let gc_info = timeline.gc_info.read().unwrap();
// similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
// new gc run, which we have no control over. however differently from `Timeline::gc`
// we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
// actually removing files.
//
// We only consider [`GcInfo::pitr_cutoff`], and not [`GcInfo::horizon_cutoff`], because from
// a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
// than a space bound (horizon cutoff). This means that if someone drops a database and waits for their
// PITR interval, they will see synthetic size decrease, even if we are still storing data inside
// horizon_cutoff.
let pitr_cutoff = gc_info.cutoffs.pitr;
let horizon_cutoff = gc_info.cutoffs.horizon;
let mut next_gc_cutoff = pitr_cutoff;
// If the caller provided a shorter retention period, use that instead of the GC cutoff.
let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
if next_gc_cutoff < param_cutoff {
next_gc_cutoff = param_cutoff;
}
Some(param_cutoff)
} else {
None
};
// next_gc_cutoff in parent branch are not of interest (right now at least), nor do we
// want to query any logical size before initdb_lsn.
let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
// Build "interesting LSNs" on this timeline
let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
.retain_lsns
.iter()
.filter(|&&lsn| lsn > ancestor_lsn)
.copied()
// this assumes there are no other retain_lsns than the branchpoints
.map(|lsn| (lsn, LsnKind::BranchPoint))
.collect::<Vec<_>>();
drop(gc_info);
// Add branch points we collected earlier, just in case there were any that were
// not present in retain_lsns. We will remove any duplicates below later.
if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
lsns.extend(
this_branchpoints
.iter()
.map(|lsn| (*lsn, LsnKind::BranchPoint)),
)
}
// Add a point for the GC cutoff
let branch_start_needed = next_gc_cutoff <= branch_start_lsn;
if !branch_start_needed {
lsns.push((next_gc_cutoff, LsnKind::GcCutOff));
}
lsns.sort_unstable();
lsns.dedup();
//
// Create Segments for the interesting points.
//
// Timeline start point
let ancestor = timeline
.get_ancestor_timeline_id()
.map(|ancestor_id| (ancestor_id, ancestor_lsn));
branchstart_segments.push((timeline_id, segments.len(), ancestor));
segments.push(SegmentMeta {
segment: Segment {
parent: None, // filled in later
lsn: branch_start_lsn.0,
size: None, // filled in later
needed: branch_start_needed,
},
timeline_id: timeline.timeline_id,
kind: LsnKind::BranchStart,
});
// GC cutoff point, and any branch points, i.e. points where
// other timelines branch off from this timeline.
let mut parent = segments.len() - 1;
for (lsn, kind) in lsns {
if kind == LsnKind::BranchPoint {
branchpoint_segments.insert((timeline_id, lsn), segments.len());
}
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
lsn: lsn.0,
size: None,
needed: lsn > next_gc_cutoff,
},
timeline_id: timeline.timeline_id,
kind,
});
parent += 1;
}
// Current end of the timeline
segments.push(SegmentMeta {
segment: Segment {
parent: Some(parent),
lsn: last_record_lsn.0,
size: None, // Filled in later, if necessary
needed: true,
},
timeline_id: timeline.timeline_id,
kind: LsnKind::BranchEnd,
});
timeline_inputs.push(TimelineInputs {
timeline_id: timeline.timeline_id,
ancestor_id: timeline.get_ancestor_timeline_id(),
ancestor_lsn,
last_record: last_record_lsn,
// this is not used above, because it might not have updated recently enough
latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
horizon_cutoff,
pitr_cutoff,
next_gc_cutoff,
retention_param_cutoff,
});
}
// We now have all segments from the timelines in 'segments'. The timelines
// haven't been linked to each other yet, though. Do that.
for (_timeline_id, seg_id, ancestor) in branchstart_segments {
// Look up the branch point
if let Some(ancestor) = ancestor {
let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
segments[seg_id].segment.parent = Some(parent_id);
}
}
// We left the 'size' field empty in all of the Segments so far.
// Now find logical sizes for all of the points that might need or benefit from them.
fill_logical_sizes(
&timelines,
&mut segments,
limit,
logical_size_cache,
cause,
ctx,
)
.await?;
Ok(ModelInputs {
segments,
timeline_inputs,
})
}
/// Augment 'segments' with logical sizes
///
/// this will probably conflict with on-demand downloaded layers, or at least force them all
/// to be downloaded
///
async fn fill_logical_sizes(
timelines: &[Arc<Timeline>],
segments: &mut [SegmentMeta],
limit: &Arc<Semaphore>,
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
cause: LogicalSizeCalculationCause,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
timelines
.iter()
.map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
);
// record the used/inserted cache keys here, to remove extras not to start leaking
// after initial run the cache should be quite stable, but live timelines will eventually
// require new lsns to be inspected.
let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
// with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
// our advantage with `?` error handling.
let mut joinset = tokio::task::JoinSet::new();
// For each point that would benefit from having a logical size available,
// spawn a Task to fetch it, unless we have it cached already.
for seg in segments.iter() {
if !seg.size_needed() {
continue;
}
let timeline_id = seg.timeline_id;
let lsn = Lsn(seg.segment.lsn);
if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
if cached_size.is_none() {
let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
let parallel_size_calcs = Arc::clone(limit);
let ctx = ctx.attached_child();
joinset.spawn(
calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
.in_current_span(),
);
}
e.insert(cached_size);
}
}
// Perform the size lookups
let mut have_any_error = false;
while let Some(res) = joinset.join_next().await {
// each of these come with Result<anyhow::Result<_>, JoinError>
// because of spawn + spawn_blocking
match res {
Err(join_error) if join_error.is_cancelled() => {
unreachable!("we are not cancelling any of the futures, nor should be");
}
Err(join_error) => {
// cannot really do anything, as this panic is likely a bug
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
have_any_error = true;
}
Ok(Err(recv_result_error)) => {
// cannot really do anything, as this panic is likely a bug
error!("failed to receive logical size query result: {recv_result_error:#}");
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
warn!(
timeline_id=%timeline.timeline_id,
"failed to calculate logical size at {lsn}: {error:#}"
);
}
have_any_error = true;
}
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
logical_size_cache.insert((timeline.timeline_id, lsn), size);
sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
}
}
}
// prune any keys not needed anymore; we record every used key and added key.
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
if have_any_error {
// we cannot complete this round, because we are missing data.
// we have however cached all we were able to request calculation on.
anyhow::bail!("failed to calculate some logical_sizes");
}
// Insert the looked up sizes to the Segments
for seg in segments.iter_mut() {
if !seg.size_needed() {
continue;
}
let timeline_id = seg.timeline_id;
let lsn = Lsn(seg.segment.lsn);
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
seg.segment.size = Some(*size);
} else {
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
}
}
Ok(())
}
impl ModelInputs {
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
// Convert SegmentMetas into plain Segments
let storage = StorageModel {
segments: self
.segments
.iter()
.map(|seg| seg.segment.clone())
.collect(),
};
Ok(storage)
}
// calculate total project size
pub fn calculate(&self) -> anyhow::Result<u64> {
let storage = self.calculate_model()?;
let sizes = storage.calculate();
Ok(sizes.total_size)
}
}
/// Newtype around the tuple that carries the timeline at lsn logical size calculation.
struct TimelineAtLsnSizeResult(
Arc<crate::tenant::Timeline>,
utils::lsn::Lsn,
Result<u64, CalculateLogicalSizeError>,
);
#[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
async fn calculate_logical_size(
limit: Arc<tokio::sync::Semaphore>,
timeline: Arc<crate::tenant::Timeline>,
lsn: utils::lsn::Lsn,
cause: LogicalSizeCalculationCause,
ctx: RequestContext,
) -> Result<TimelineAtLsnSizeResult, RecvError> {
let _permit = tokio::sync::Semaphore::acquire_owned(limit)
.await
.expect("global semaphore should not had been closed");
let size_res = timeline
.spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
.instrument(info_span!("spawn_ondemand_logical_size_calculation"))
.await?;
Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
}
#[test]
fn verify_size_for_multiple_branches() {
// this is generated from integration test test_tenant_size_with_multiple_branches, but this way
// it has the stable lsn's
//
// The timeline_inputs don't participate in the size calculation, and are here just to explain
// the inputs.
let doc = r#"
{
"segments": [
{
"segment": {
"parent": 9,
"lsn": 26033560,
"size": null,
"needed": false
},
"timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
"kind": "BranchStart"
},
{
"segment": {
"parent": 0,
"lsn": 35720400,
"size": 25206784,
"needed": false
},
"timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
"kind": "GcCutOff"
},
{
"segment": {
"parent": 1,
"lsn": 35851472,
"size": null,
"needed": true
},
"timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
"kind": "BranchEnd"
},
{
"segment": {
"parent": 7,
"lsn": 24566168,
"size": null,
"needed": false
},
"timeline_id": "454626700469f0a9914949b9d018e876",
"kind": "BranchStart"
},
{
"segment": {
"parent": 3,
"lsn": 25261936,
"size": 26050560,
"needed": false
},
"timeline_id": "454626700469f0a9914949b9d018e876",
"kind": "GcCutOff"
},
{
"segment": {
"parent": 4,
"lsn": 25393008,
"size": null,
"needed": true
},
"timeline_id": "454626700469f0a9914949b9d018e876",
"kind": "BranchEnd"
},
{
"segment": {
"parent": null,
"lsn": 23694408,
"size": null,
"needed": false
},
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
"kind": "BranchStart"
},
{
"segment": {
"parent": 6,
"lsn": 24566168,
"size": 25739264,
"needed": false
},
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
"kind": "BranchPoint"
},
{
"segment": {
"parent": 7,
"lsn": 25902488,
"size": 26402816,
"needed": false
},
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
"kind": "GcCutOff"
},
{
"segment": {
"parent": 8,
"lsn": 26033560,
"size": 26468352,
"needed": true
},
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
"kind": "BranchPoint"
},
{
"segment": {
"parent": 9,
"lsn": 26033560,
"size": null,
"needed": true
},
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
"kind": "BranchEnd"
}
],
"timeline_inputs": [
{
"timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
"ancestor_lsn": "0/18D3D98",
"last_record": "0/2230CD0",
"latest_gc_cutoff": "0/1698C48",
"horizon_cutoff": "0/2210CD0",
"pitr_cutoff": "0/2210CD0",
"next_gc_cutoff": "0/2210CD0",
"retention_param_cutoff": null
},
{
"timeline_id": "454626700469f0a9914949b9d018e876",
"ancestor_lsn": "0/176D998",
"last_record": "0/1837770",
"latest_gc_cutoff": "0/1698C48",
"horizon_cutoff": "0/1817770",
"pitr_cutoff": "0/1817770",
"next_gc_cutoff": "0/1817770",
"retention_param_cutoff": null
},
{
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
"ancestor_lsn": "0/0",
"last_record": "0/18D3D98",
"latest_gc_cutoff": "0/1698C48",
"horizon_cutoff": "0/18B3D98",
"pitr_cutoff": "0/18B3D98",
"next_gc_cutoff": "0/18B3D98",
"retention_param_cutoff": null
}
]
}
"#;
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
}
#[test]
fn verify_size_for_one_branch() {
let doc = r#"
{
"segments": [
{
"segment": {
"parent": null,
"lsn": 0,
"size": null,
"needed": false
},
"timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
"kind": "BranchStart"
},
{
"segment": {
"parent": 0,
"lsn": 305547335776,
"size": 220054675456,
"needed": false
},
"timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
"kind": "GcCutOff"
},
{
"segment": {
"parent": 1,
"lsn": 305614444640,
"size": null,
"needed": true
},
"timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
"kind": "BranchEnd"
}
],
"timeline_inputs": [
{
"timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
"ancestor_lsn": "0/0",
"last_record": "47/280A5860",
"latest_gc_cutoff": "47/240A5860",
"horizon_cutoff": "47/240A5860",
"pitr_cutoff": "47/240A5860",
"next_gc_cutoff": "47/240A5860",
"retention_param_cutoff": "0/0"
}
]
}"#;
let model: ModelInputs = serde_json::from_str(doc).unwrap();
let res = model.calculate_model().unwrap().calculate();
println!("calculated synthetic size: {}", res.total_size);
println!("result: {:?}", serde_json::to_string(&res.segments));
use utils::lsn::Lsn;
let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
let last_lsn: Lsn = "47/280A5860".parse().unwrap();
println!(
"latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
u64::from(latest_gc_cutoff_lsn),
u64::from(last_lsn)
);
assert_eq!(res.total_size, 220121784320);
}