diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index cc444c479a..155b39ad24 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -53,6 +53,15 @@ pub enum StorageTimeOperation { CreateTenant, } +pub static STORAGE_PHYSICAL_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + "pageserver_storage_physical_size_sum", + "Physical size of different types of storage files", + &["type", "tenant_id", "timeline_id"], + ) + .expect("failed to define a metric") +}); + pub static STORAGE_TIME_SUM_PER_TIMELINE: Lazy = Lazy::new(|| { register_counter_vec!( "pageserver_storage_operations_seconds_sum", @@ -392,6 +401,8 @@ const STORAGE_IO_TIME_OPERATIONS: &[&str] = &[ const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"]; +pub const STORAGE_PHYSICAL_SIZE_FILE_TYPE: &[&str] = &["image", "delta", "partial-image"]; + pub static STORAGE_IO_TIME: Lazy = Lazy::new(|| { register_histogram_vec!( "pageserver_io_operations_seconds", @@ -884,6 +895,7 @@ impl Drop for TimelineMetrics { let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, timeline_id]); let _ = EVICTIONS.remove_label_values(&[tenant_id, timeline_id]); let _ = READ_NUM_FS_LAYERS.remove_label_values(&[tenant_id, timeline_id]); + let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]); self.evictions_with_low_residence_duration .write() @@ -906,6 +918,9 @@ impl Drop for TimelineMetrics { for op in SMGR_QUERY_TIME_OPERATIONS { let _ = SMGR_QUERY_TIME.remove_label_values(&[op, tenant_id, timeline_id]); } + for ty in STORAGE_PHYSICAL_SIZE_FILE_TYPE { + let _ = STORAGE_PHYSICAL_SIZE.remove_label_values(&[ty, tenant_id, timeline_id]); + } } } diff --git a/pageserver/src/tenant/layer_cache.rs b/pageserver/src/tenant/layer_cache.rs index 060cfa09d8..49479fe836 100644 --- a/pageserver/src/tenant/layer_cache.rs +++ b/pageserver/src/tenant/layer_cache.rs @@ -1,9 +1,11 @@ use super::storage_layer::{PersistentLayer, PersistentLayerDesc, PersistentLayerKey, RemoteLayer}; use super::Timeline; +use crate::metrics::{STORAGE_PHYSICAL_SIZE, STORAGE_PHYSICAL_SIZE_FILE_TYPE}; use crate::tenant::layer_map::{self, LayerMap}; use anyhow::Result; use std::sync::{Mutex, Weak}; use std::{collections::HashMap, sync::Arc}; +use utils::id::{TenantId, TimelineId}; pub struct LayerCache { /// Layer removal lock. @@ -21,6 +23,11 @@ pub struct LayerCache { #[allow(unused)] timeline: Weak, + pub tenant_id: TenantId, + pub timeline_id: TimelineId, + pub tenant_id_str: String, + pub timeline_id_str: String, + mapping: Mutex>>, } @@ -33,11 +40,16 @@ pub struct DeleteGuard(Arc>); impl LayerCache { pub fn new(timeline: Weak) -> Self { + let timeline_arc = timeline.upgrade().unwrap(); Self { layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())), layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())), mapping: Mutex::new(HashMap::new()), timeline, + tenant_id: timeline_arc.tenant_id, + timeline_id: timeline_arc.timeline_id, + tenant_id_str: timeline_arc.tenant_id.to_string(), + timeline_id_str: timeline_arc.timeline_id.to_string(), } } @@ -67,18 +79,21 @@ impl LayerCache { /// Should only be called when initializing the timeline. Bypass checks and layer operation lock. pub fn remove_local_when_init(&self, layer: Arc) { + self.metrics_size_sub(&*layer); let mut guard = self.mapping.lock().unwrap(); guard.remove(&layer.layer_desc().key()); } /// Should only be called when initializing the timeline. Bypass checks and layer operation lock. pub fn populate_remote_when_init(&self, layer: Arc) { + self.metrics_size_add(&*layer); let mut guard = self.mapping.lock().unwrap(); guard.insert(layer.layer_desc().key(), layer); } /// Should only be called when initializing the timeline. Bypass checks and layer operation lock. pub fn populate_local_when_init(&self, layer: Arc) { + self.metrics_size_add(&*layer); let mut guard = self.mapping.lock().unwrap(); guard.insert(layer.layer_desc().key(), layer); } @@ -130,6 +145,7 @@ impl LayerCache { /// Called within write path. When compaction and image layer creation we will create new layers. pub fn create_new_layer(&self, layer: Arc) { + self.metrics_size_add(&*layer); let mut guard = self.mapping.lock().unwrap(); guard.insert(layer.layer_desc().key(), layer); } @@ -137,7 +153,38 @@ impl LayerCache { /// Called within write path. When GC and compaction we will remove layers and delete them on disk. /// Will move logic to delete files here later. pub fn delete_layer(&self, layer: Arc) { + self.metrics_size_sub(&*layer); let mut guard = self.mapping.lock().unwrap(); guard.remove(&layer.layer_desc().key()); } + + fn metrics_size_add(&self, layer: &dyn PersistentLayer) { + STORAGE_PHYSICAL_SIZE + .with_label_values(&[ + Self::get_layer_type(layer), + &self.tenant_id_str, + &self.timeline_id_str, + ]) + .add(layer.file_size() as i64); + } + + fn metrics_size_sub(&self, layer: &dyn PersistentLayer) { + STORAGE_PHYSICAL_SIZE + .with_label_values(&[ + Self::get_layer_type(layer), + &self.tenant_id_str, + &self.timeline_id_str, + ]) + .sub(layer.file_size() as i64); + } + + fn get_layer_type(layer: &dyn PersistentLayer) -> &'static str { + if layer.is_delta() { + &STORAGE_PHYSICAL_SIZE_FILE_TYPE[1] + } else if layer.is_incremental() { + &STORAGE_PHYSICAL_SIZE_FILE_TYPE[2] + } else { + &STORAGE_PHYSICAL_SIZE_FILE_TYPE[0] + } + } } diff --git a/pageserver/src/tenant/layer_map.rs b/pageserver/src/tenant/layer_map.rs index dab570352a..be299c223a 100644 --- a/pageserver/src/tenant/layer_map.rs +++ b/pageserver/src/tenant/layer_map.rs @@ -95,11 +95,56 @@ pub struct LayerMap { l0_delta_layers: Vec>, /// All sorted runs. For tiered compaction. - pub sorted_runs: Vec<(usize, Vec>)>, + pub sorted_runs: SortedRuns, +} +#[derive(Default)] +pub struct SortedRuns { + pub runs: Vec<(usize, Vec>)>, next_tier_id: usize, } +impl SortedRuns { + /// Create a new sorted run and insert it at the top of the LSM tree. + pub fn create_new_run(&mut self, layers: Vec>) -> usize { + let tier_id = self.next_tier_id(); + self.runs.insert(0, (tier_id, layers)); + tier_id + } + + /// Create a new sorted run and insert it at the bottom of the LSM tree. + pub fn create_new_bottom_run(&mut self, layers: Vec>) -> usize { + let tier_id = self.next_tier_id(); + self.runs.push((tier_id, layers)); + tier_id + } + + pub fn compute_tier_sizes(&self) -> Vec<(usize, u64)> { + self.runs + .iter() + .map(|(tier_id, layers)| (*tier_id, layers.iter().map(|layer| layer.file_size()).sum())) + .collect::>() + } + + /// Remove a sorted run from the LSM tree. + pub fn remove_run(&mut self, tier_id: usize) { + self.runs.retain(|(id, _)| *id != tier_id); + } + + /// Remove layers and the corresponding sorted runs. + pub fn insert_run_at(&mut self, idx: usize, layers: Vec>) {} + + pub fn num_of_tiers(&self) -> usize { + self.runs.len() + } + + pub fn next_tier_id(&mut self) -> usize { + let ret = self.next_tier_id; + self.next_tier_id += 1; + ret + } +} + /// The primary update API for the layer map. /// /// Batching historic layer insertions and removals is good for @@ -127,16 +172,10 @@ impl BatchedUpdates<'_> { } /// Get a reference to the current sorted runs. - pub fn sorted_runs(&mut self) -> &mut Vec<(usize, Vec>)> { + pub fn sorted_runs(&mut self) -> &mut SortedRuns { &mut self.layer_map.sorted_runs } - pub fn next_tier_id(&mut self) -> usize { - let ret = self.layer_map.next_tier_id; - self.layer_map.next_tier_id += 1; - ret - } - /// /// Remove an on-disk layer from the map. /// @@ -677,7 +716,7 @@ impl LayerMap { } println!("sorted_runs:"); - for (lvl, (tier_id, layer)) in self.sorted_runs.iter().enumerate() { + for (lvl, (tier_id, layer)) in self.sorted_runs.runs.iter().enumerate() { println!("tier {}", tier_id); for layer in layer { layer.dump(verbose, ctx)?; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 87ae722a1c..ef4aa8d133 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -3081,6 +3081,9 @@ impl Timeline { LayerResidenceEventReason::LayerCreate, ); batch_updates.insert_historic_new(l.layer_desc().clone()); + batch_updates + .sorted_runs() + .create_new_run(vec![l.layer_desc().clone().into()]); self.lcache.create_new_layer(l); batch_updates.flush(); @@ -3340,9 +3343,8 @@ impl Timeline { // add this layer to the end of all sorted runs; this is only done when initializing with init_lsn // for now, and therefore the sorted runs are empty. - assert!(updates.sorted_runs().is_empty()); - let tier_id = updates.next_tier_id(); - updates.sorted_runs().push((tier_id, sorted_run)); + assert_eq!(updates.sorted_runs().num_of_tiers(), 0); + updates.sorted_runs().create_new_bottom_run(sorted_run); updates.flush(); drop_wlock(guard); timer.stop_and_record(); @@ -3740,98 +3742,8 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { - let CompactLevel0Phase1Result { - new_layers, - deltas_to_compact, - } = self - .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) - .await?; - - if new_layers.is_empty() && deltas_to_compact.is_empty() { - // If L0 does not need to be compacted, look into other layers - return self - .compact_tiered(layer_removal_cs, target_file_size, ctx) - .await; - } - - // Before deleting any layers, we need to wait for their upload ops to finish. - // See storage_sync module level comment on consistency. - // Do it here because we don't want to hold self.layers.write() while waiting. - if let Some(remote_client) = &self.remote_client { - debug!("waiting for upload ops to complete"); - remote_client - .wait_completion() - .await - .context("wait for layer upload ops to complete")?; - } - - let mut guard = self.layers.write().await; - let (layers, _) = &mut *guard; - let mut updates = layers.batch_update(); - let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); - - let tier_id = updates.next_tier_id(); - updates.sorted_runs().insert( - 0, - ( - tier_id, - new_layers - .iter() - .map(|l| Arc::new(l.layer_desc().clone())) - .collect(), - ), - ); - - for l in new_layers { - let new_delta_path = l.path(); - - let metadata = new_delta_path.metadata().with_context(|| { - format!( - "read file metadata for new created layer {}", - new_delta_path.display() - ) - })?; - - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_upload( - &l.filename(), - &LayerFileMetadata::new(metadata.len()), - )?; - } - - // update the timeline's physical size - self.metrics - .resident_physical_size_gauge - .add(metadata.len()); - - new_layer_paths.insert(new_delta_path, LayerFileMetadata::new(metadata.len())); - let x: Arc = Arc::new(l); - x.access_stats().record_residence_event( - &updates, - LayerResidenceStatus::Resident, - LayerResidenceEventReason::LayerCreate, - ); - updates.insert_historic_new(x.layer_desc().clone()); - self.lcache.create_new_layer(x); - } - - // Now that we have reshuffled the data to set of new delta layers, we can - // delete the old ones - let mut layer_names_to_delete = Vec::with_capacity(deltas_to_compact.len()); - for l in deltas_to_compact { - layer_names_to_delete.push(l.filename()); - self.delete_historic_layer_new(layer_removal_cs.clone(), l, &mut updates)?; - } - - updates.flush(); - drop_wlock(guard); - - // Also schedule the deletions in remote storage - if let Some(remote_client) = &self.remote_client { - remote_client.schedule_layer_file_deletion(&layer_names_to_delete)?; - } - - Ok(()) + self.compact_tiered(layer_removal_cs, target_file_size, ctx) + .await } fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option> { @@ -3877,30 +3789,23 @@ impl Timeline { let (layers, _) = &*guard; // Precondition: only compact if enough layers have accumulated. - let threshold = 3; + let threshold = 8; assert!(threshold >= 2); info!("getting tiered compaction task"); layers.dump(false, ctx)?; - if layers.sorted_runs.len() < threshold { + if layers.sorted_runs.num_of_tiers() < threshold { info!( - level0_deltas = layers.sorted_runs.len(), + level0_deltas = layers.sorted_runs.num_of_tiers(), threshold, "too few sorted runs to compact" ); return Ok(None); } // Gather the files to compact in this iteration. - - let tier_sizes: Vec<(usize, u64)> = layers - .sorted_runs - .iter() - .map(|(tier_id, layers)| { - (*tier_id, layers.iter().map(|layer| layer.file_size()).sum()) - }) - .collect::>(); + let tier_sizes: Vec<(usize, u64)> = layers.sorted_runs.compute_tier_sizes(); let Some(tier_to_compact) = Self::get_compact_task(tier_sizes) else { return Ok(None); @@ -3913,7 +3818,7 @@ impl Timeline { } let mut deltas_to_compact_layers = vec![]; - for (tier_id, layers) in layers.sorted_runs.iter() { + for (tier_id, layers) in layers.sorted_runs.runs.iter() { if tier_to_compact.contains(tier_id) { deltas_to_compact_layers.extend(layers.iter().cloned()); } @@ -4197,7 +4102,7 @@ impl Timeline { let mut new_tier_at_index = None; let mut layers_to_delete = vec![]; let mut layer_names_to_delete = vec![]; - for (tier_id, tier) in updates.sorted_runs() { + for (tier_id, tier) in &updates.sorted_runs().runs { if *tier_id == new_tier_at { new_tier_at_index = Some(new_sorted_runs.len()); } @@ -4252,9 +4157,9 @@ impl Timeline { self.lcache.create_new_layer(l); } - let new_tier_id = updates.next_tier_id(); + let new_tier_id = updates.sorted_runs().next_tier_id(); new_sorted_runs.insert(new_tier_at_index, (new_tier_id, new_layer_descs)); - *updates.sorted_runs() = new_sorted_runs; + updates.sorted_runs().runs = new_sorted_runs; updates.flush(); drop_wlock(guard);