From cf2e695f497d2a4417622d2d396c8850f639c6a1 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Wed, 16 Apr 2025 14:51:48 -0400 Subject: [PATCH] feat(pageserver): gc-compaction meta statistics (#11601) ## Problem We currently only have gc-compaction statistics for each single sub-compaction job. ## Summary of changes Add meta statistics across all sub-compaction jobs scheduled. Signed-off-by: Alex Chi Z --- pageserver/src/tenant/timeline/compaction.rs | 121 ++++++++++++++++++- 1 file changed, 117 insertions(+), 4 deletions(-) diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 92b24a73c9..ff85a33055 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -7,7 +7,7 @@ use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::{Duration, Instant, SystemTime}; use super::layer_manager::LayerManager; use super::{ @@ -77,7 +77,7 @@ const COMPACTION_DELTA_THRESHOLD: usize = 5; /// shard split, which gets expensive for large tenants. const ANCESTOR_COMPACTION_REWRITE_THRESHOLD: f64 = 0.3; -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +#[derive(Default, Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)] pub struct GcCompactionJobId(pub usize); impl std::fmt::Display for GcCompactionJobId { @@ -105,6 +105,43 @@ pub enum GcCompactionQueueItem { Notify(GcCompactionJobId, Option), } +/// Statistics for gc-compaction meta jobs, which contains several sub compaction jobs. +#[derive(Debug, Clone, Serialize, Default)] +pub struct GcCompactionMetaStatistics { + /// The total number of sub compaction jobs. + pub total_sub_compaction_jobs: usize, + /// The total number of sub compaction jobs that failed. + pub failed_sub_compaction_jobs: usize, + /// The total number of sub compaction jobs that succeeded. + pub succeeded_sub_compaction_jobs: usize, + /// The layer size before compaction. + pub before_compaction_layer_size: u64, + /// The layer size after compaction. + pub after_compaction_layer_size: u64, + /// The start time of the meta job. + pub start_time: Option, + /// The end time of the meta job. + pub end_time: Option, + /// The duration of the meta job. + pub duration_secs: f64, + /// The id of the meta job. + pub meta_job_id: GcCompactionJobId, + /// The LSN below which the layers are compacted, used to compute the statistics. + pub below_lsn: Lsn, +} + +impl GcCompactionMetaStatistics { + fn finalize(&mut self) { + let end_time = SystemTime::now(); + if let Some(start_time) = self.start_time { + if let Ok(duration) = end_time.duration_since(start_time) { + self.duration_secs = duration.as_secs_f64(); + } + } + self.end_time = Some(end_time); + } +} + impl GcCompactionQueueItem { pub fn into_compact_info_resp( self, @@ -142,6 +179,7 @@ struct GcCompactionQueueInner { queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>, guards: HashMap, last_id: GcCompactionJobId, + meta_statistics: Option, } impl GcCompactionQueueInner { @@ -173,6 +211,7 @@ impl GcCompactionQueue { queued: VecDeque::new(), guards: HashMap::new(), last_id: GcCompactionJobId(0), + meta_statistics: None, }), consumer_lock: tokio::sync::Mutex::new(()), } @@ -357,6 +396,23 @@ impl GcCompactionQueue { Ok(()) } + async fn collect_layer_below_lsn( + &self, + timeline: &Arc, + lsn: Lsn, + ) -> Result { + let guard = timeline.layers.read().await; + let layer_map = guard.layer_map()?; + let layers = layer_map.iter_historic_layers().collect_vec(); + let mut size = 0; + for layer in layers { + if layer.lsn_range.start <= lsn { + size += layer.file_size(); + } + } + Ok(size) + } + /// Notify the caller the job has finished and unblock GC. fn notify_and_unblock(&self, id: GcCompactionJobId) { info!("compaction job id={} finished", id); @@ -366,6 +422,16 @@ impl GcCompactionQueue { let _ = tx.send(()); } } + if let Some(ref meta_statistics) = guard.meta_statistics { + if meta_statistics.meta_job_id == id { + if let Ok(stats) = serde_json::to_string(&meta_statistics) { + info!( + "gc-compaction meta statistics for job id = {}: {}", + id, stats + ); + } + } + } } fn clear_running_job(&self) { @@ -405,7 +471,11 @@ impl GcCompactionQueue { let mut pending_tasks = Vec::new(); // gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate. // And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN. - let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max(); + let expected_l2_lsn = jobs + .iter() + .map(|job| job.compact_lsn_range.end) + .max() + .unwrap(); for job in jobs { // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` // until we do further refactors to allow directly call `compact_with_gc`. @@ -430,9 +500,13 @@ impl GcCompactionQueue { if !auto { pending_tasks.push(GcCompactionQueueItem::Notify(id, None)); } else { - pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn)); + pending_tasks.push(GcCompactionQueueItem::Notify(id, Some(expected_l2_lsn))); } + let layer_size = self + .collect_layer_below_lsn(timeline, expected_l2_lsn) + .await?; + { let mut guard = self.inner.lock().unwrap(); let mut tasks = Vec::new(); @@ -444,7 +518,16 @@ impl GcCompactionQueue { for item in tasks { guard.queued.push_front(item); } + guard.meta_statistics = Some(GcCompactionMetaStatistics { + meta_job_id: id, + start_time: Some(SystemTime::now()), + before_compaction_layer_size: layer_size, + below_lsn: expected_l2_lsn, + total_sub_compaction_jobs: jobs_len, + ..Default::default() + }); } + info!( "scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len @@ -573,6 +656,10 @@ impl GcCompactionQueue { Err(err) => { warn!(%err, "failed to run gc-compaction subcompaction job"); self.clear_running_job(); + let mut guard = self.inner.lock().unwrap(); + if let Some(ref mut meta_statistics) = guard.meta_statistics { + meta_statistics.failed_sub_compaction_jobs += 1; + } return Err(err); } }; @@ -582,8 +669,34 @@ impl GcCompactionQueue { // we need to clean things up before returning from the function. yield_for_l0 = true; } + { + let mut guard = self.inner.lock().unwrap(); + if let Some(ref mut meta_statistics) = guard.meta_statistics { + meta_statistics.succeeded_sub_compaction_jobs += 1; + } + } } GcCompactionQueueItem::Notify(id, l2_lsn) => { + let below_lsn = { + let mut guard = self.inner.lock().unwrap(); + if let Some(ref mut meta_statistics) = guard.meta_statistics { + meta_statistics.below_lsn + } else { + Lsn::INVALID + } + }; + let layer_size = if below_lsn != Lsn::INVALID { + self.collect_layer_below_lsn(timeline, below_lsn).await? + } else { + 0 + }; + { + let mut guard = self.inner.lock().unwrap(); + if let Some(ref mut meta_statistics) = guard.meta_statistics { + meta_statistics.after_compaction_layer_size = layer_size; + meta_statistics.finalize(); + } + } self.notify_and_unblock(id); if let Some(l2_lsn) = l2_lsn { let current_l2_lsn = timeline