mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-06 21:12:55 +00:00
pageserver/secondary: deregister IO metrics (#11283)
## Problem IO metrics for secondary locations do not get deregistered when the timeline is removed. ## Summary of changes Stash the request context to be used for downloads in `SecondaryTimelineDetail`. These objects match the lifetime of the secondary timeline location pretty well. When the timeline is removed, deregister the metrics too. Closes https://github.com/neondatabase/neon/issues/11156
This commit is contained in:
@@ -1248,13 +1248,13 @@ pub(crate) static STORAGE_IO_TIME_METRIC: Lazy<StorageIoTime> = Lazy::new(Storag
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
#[repr(usize)]
|
||||
enum StorageIoSizeOperation {
|
||||
pub(crate) enum StorageIoSizeOperation {
|
||||
Read,
|
||||
Write,
|
||||
}
|
||||
|
||||
impl StorageIoSizeOperation {
|
||||
const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
pub(crate) const VARIANTS: &'static [&'static str] = &["read", "write"];
|
||||
|
||||
fn as_str(&self) -> &'static str {
|
||||
Self::VARIANTS[*self as usize]
|
||||
@@ -1262,7 +1262,7 @@ impl StorageIoSizeOperation {
|
||||
}
|
||||
|
||||
// Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1
|
||||
static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
pub(crate) static STORAGE_IO_SIZE: Lazy<UIntGaugeVec> = Lazy::new(|| {
|
||||
register_uint_gauge_vec!(
|
||||
"pageserver_io_operations_bytes_total",
|
||||
"Total amount of bytes read/written in IO operations",
|
||||
|
||||
@@ -167,10 +167,17 @@ impl SecondaryTenant {
|
||||
|
||||
self.validate_metrics();
|
||||
|
||||
// Metrics are subtracted from and/or removed eagerly.
|
||||
// Deletions are done in the background via [`BackgroundPurges::spawn`].
|
||||
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
|
||||
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
|
||||
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
|
||||
|
||||
self.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
|
||||
}
|
||||
|
||||
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use crate::metrics::{STORAGE_IO_SIZE, StorageIoSizeOperation};
|
||||
use camino::Utf8PathBuf;
|
||||
use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
@@ -124,15 +125,53 @@ impl OnDiskState {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(super) struct SecondaryDetailTimeline {
|
||||
on_disk_layers: HashMap<LayerName, OnDiskState>,
|
||||
|
||||
/// We remember when layers were evicted, to prevent re-downloading them.
|
||||
pub(super) evicted_at: HashMap<LayerName, SystemTime>,
|
||||
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl Clone for SecondaryDetailTimeline {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
on_disk_layers: self.on_disk_layers.clone(),
|
||||
evicted_at: self.evicted_at.clone(),
|
||||
// This is a bit awkward. The downloader code operates on a snapshot
|
||||
// of the secondary list to avoid locking it for extended periods of time.
|
||||
// No particularly strong reason to chose [`RequestContext::detached_child`],
|
||||
// but makes more sense than [`RequestContext::attached_child`].
|
||||
ctx: self
|
||||
.ctx
|
||||
.detached_child(self.ctx.task_kind(), self.ctx.download_behavior()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for SecondaryDetailTimeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SecondaryDetailTimeline")
|
||||
.field("on_disk_layers", &self.on_disk_layers)
|
||||
.field("evicted_at", &self.evicted_at)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl SecondaryDetailTimeline {
|
||||
pub(super) fn empty(ctx: RequestContext) -> Self {
|
||||
SecondaryDetailTimeline {
|
||||
on_disk_layers: Default::default(),
|
||||
evicted_at: Default::default(),
|
||||
ctx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn context(&self) -> &RequestContext {
|
||||
&self.ctx
|
||||
}
|
||||
|
||||
pub(super) fn remove_layer(
|
||||
&mut self,
|
||||
name: &LayerName,
|
||||
@@ -258,18 +297,50 @@ impl SecondaryDetail {
|
||||
|
||||
pub(super) fn remove_timeline(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
let removed = self.timelines.remove(timeline_id);
|
||||
if let Some(removed) = removed {
|
||||
resident_metric.sub(
|
||||
removed
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
Self::clear_timeline_metrics(tenant_shard_id, timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn drain_timelines(
|
||||
&mut self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
for (timeline_id, removed) in self.timelines.drain() {
|
||||
Self::clear_timeline_metrics(tenant_shard_id, &timeline_id, removed, resident_metric);
|
||||
}
|
||||
}
|
||||
|
||||
fn clear_timeline_metrics(
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline_id: &TimelineId,
|
||||
detail: SecondaryDetailTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
) {
|
||||
resident_metric.sub(
|
||||
detail
|
||||
.on_disk_layers
|
||||
.values()
|
||||
.map(|l| l.metadata.file_size)
|
||||
.sum(),
|
||||
);
|
||||
|
||||
let shard_id = format!("{}", tenant_shard_id.shard_slug());
|
||||
let tenant_id = tenant_shard_id.tenant_id.to_string();
|
||||
let timeline_id = timeline_id.to_string();
|
||||
for op in StorageIoSizeOperation::VARIANTS {
|
||||
let _ = STORAGE_IO_SIZE.remove_label_values(&[
|
||||
op,
|
||||
tenant_id.as_str(),
|
||||
shard_id.as_str(),
|
||||
timeline_id.as_str(),
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,6 +798,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
last_heatmap,
|
||||
timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -774,7 +846,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
let ctx = &ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline.timeline_id);
|
||||
let timeline_state = timeline_states
|
||||
.remove(&timeline.timeline_id)
|
||||
.expect("Just populated above");
|
||||
@@ -917,7 +988,11 @@ impl<'a> TenantDownloader<'a> {
|
||||
for delete_timeline in &delete_timelines {
|
||||
// We haven't removed from disk yet, but optimistically remove from in-memory state: if removal
|
||||
// from disk fails that will be a fatal error.
|
||||
detail.remove_timeline(delete_timeline, &self.secondary_state.resident_size_metric);
|
||||
detail.remove_timeline(
|
||||
self.secondary_state.get_tenant_shard_id(),
|
||||
delete_timeline,
|
||||
&self.secondary_state.resident_size_metric,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1013,7 +1088,6 @@ impl<'a> TenantDownloader<'a> {
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
@@ -1044,7 +1118,12 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline_id, layer, ctx)
|
||||
.download_layer(
|
||||
tenant_shard_id,
|
||||
&timeline_id,
|
||||
layer,
|
||||
timeline_state.context(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
@@ -1155,13 +1234,16 @@ impl<'a> TenantDownloader<'a> {
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.hot_layers().count());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline)
|
||||
.await;
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_insert_with(|| {
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline_id);
|
||||
SecondaryDetailTimeline::empty(ctx)
|
||||
});
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
touched.into_iter().for_each(|t| {
|
||||
@@ -1295,10 +1377,12 @@ async fn init_timeline_state(
|
||||
last_heatmap: Option<&HeatMapTimeline>,
|
||||
heatmap: &HeatMapTimeline,
|
||||
resident_metric: &UIntGauge,
|
||||
ctx: &RequestContext,
|
||||
) -> SecondaryDetailTimeline {
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::default();
|
||||
let ctx = ctx.with_scope_secondary_timeline(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut detail = SecondaryDetailTimeline::empty(ctx);
|
||||
|
||||
let timeline_path = conf.timeline_path(tenant_shard_id, &heatmap.timeline_id);
|
||||
let mut dir = match tokio::fs::read_dir(&timeline_path).await {
|
||||
Ok(d) => d,
|
||||
Err(e) => {
|
||||
|
||||
Reference in New Issue
Block a user