diff --git a/pageserver/src/layered_repository/timeline.rs b/pageserver/src/layered_repository/timeline.rs index 2d396024a0..e27619cc83 100644 --- a/pageserver/src/layered_repository/timeline.rs +++ b/pageserver/src/layered_repository/timeline.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result}; use bytes::Bytes; use fail::fail_point; use itertools::Itertools; +use metrics::core::{AtomicU64, GenericCounter}; use once_cell::sync::Lazy; use tracing::*; @@ -223,6 +224,70 @@ impl From for RepositoryTimeline { } } +struct TimelineMetrics { + pub reconstruct_time_histo: Histogram, + pub materialized_page_cache_hit_counter: GenericCounter, + pub flush_time_histo: Histogram, + pub compact_time_histo: Histogram, + pub create_images_time_histo: Histogram, + pub init_logical_size_histo: Histogram, + pub load_layer_map_histo: Histogram, + pub last_record_gauge: IntGauge, + pub wait_lsn_time_histo: Histogram, + pub current_physical_size_gauge: UIntGauge, +} + +impl TimelineMetrics { + fn new(tenant_id: &ZTenantId, timeline_id: &ZTimelineId) -> Self { + let tenant_id = tenant_id.to_string(); + let timeline_id = timeline_id.to_string(); + + let reconstruct_time_histo = RECONSTRUCT_TIME + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); + let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); + let flush_time_histo = STORAGE_TIME + .get_metric_with_label_values(&["layer flush", &tenant_id, &timeline_id]) + .unwrap(); + let compact_time_histo = STORAGE_TIME + .get_metric_with_label_values(&["compact", &tenant_id, &timeline_id]) + .unwrap(); + let create_images_time_histo = STORAGE_TIME + .get_metric_with_label_values(&["create images", &tenant_id, &timeline_id]) + .unwrap(); + let init_logical_size_histo = STORAGE_TIME + .get_metric_with_label_values(&["init logical size", &tenant_id, &timeline_id]) + .unwrap(); + let load_layer_map_histo = STORAGE_TIME + .get_metric_with_label_values(&["load layer map", &tenant_id, &timeline_id]) + .unwrap(); + let last_record_gauge = LAST_RECORD_LSN + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); + let wait_lsn_time_histo = WAIT_LSN_TIME + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); + let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE + .get_metric_with_label_values(&[&tenant_id, &timeline_id]) + .unwrap(); + + TimelineMetrics { + reconstruct_time_histo, + materialized_page_cache_hit_counter, + flush_time_histo, + compact_time_histo, + create_images_time_histo, + init_logical_size_histo, + load_layer_map_histo, + last_record_gauge, + wait_lsn_time_histo, + current_physical_size_gauge, + } + } +} + pub struct LayeredTimeline { conf: &'static PageServerConf, tenant_conf: Arc>, @@ -269,14 +334,7 @@ pub struct LayeredTimeline { ancestor_lsn: Lsn, // Metrics - reconstruct_time_histo: Histogram, - materialized_page_cache_hit_counter: IntCounter, - flush_time_histo: Histogram, - compact_time_histo: Histogram, - create_images_time_histo: Histogram, - last_record_gauge: IntGauge, - wait_lsn_time_histo: Histogram, - current_physical_size_gauge: UIntGauge, + metrics: TimelineMetrics, /// If `true`, will backup its files that appear after each checkpointing to the remote storage. upload_layers: AtomicBool, @@ -426,7 +484,7 @@ impl Timeline for LayeredTimeline { "wait_lsn called by WAL receiver thread" ); - self.wait_lsn_time_histo.observe_closure_duration( + self.metrics.wait_lsn_time_histo.observe_closure_duration( || self.last_record_lsn .wait_for_timeout(lsn, self.conf.wait_lsn_timeout) .with_context(|| { @@ -468,7 +526,8 @@ impl Timeline for LayeredTimeline { self.get_reconstruct_data(key, lsn, &mut reconstruct_state)?; - self.reconstruct_time_histo + self.metrics + .reconstruct_time_histo .observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state)) } @@ -530,7 +589,7 @@ impl Timeline for LayeredTimeline { } fn get_physical_size(&self) -> u64 { - self.current_physical_size_gauge.get() + self.metrics.current_physical_size_gauge.get() } fn get_physical_size_non_incremental(&self) -> anyhow::Result { @@ -604,43 +663,6 @@ impl LayeredTimeline { walredo_mgr: Arc, upload_layers: bool, ) -> LayeredTimeline { - let reconstruct_time_histo = RECONSTRUCT_TIME - .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) - .unwrap(); - let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT - .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) - .unwrap(); - let flush_time_histo = STORAGE_TIME - .get_metric_with_label_values(&[ - "layer flush", - &tenant_id.to_string(), - &timeline_id.to_string(), - ]) - .unwrap(); - let compact_time_histo = STORAGE_TIME - .get_metric_with_label_values(&[ - "compact", - &tenant_id.to_string(), - &timeline_id.to_string(), - ]) - .unwrap(); - let create_images_time_histo = STORAGE_TIME - .get_metric_with_label_values(&[ - "create images", - &tenant_id.to_string(), - &timeline_id.to_string(), - ]) - .unwrap(); - let last_record_gauge = LAST_RECORD_LSN - .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) - .unwrap(); - let wait_lsn_time_histo = WAIT_LSN_TIME - .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) - .unwrap(); - let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE - .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) - .unwrap(); - let mut result = LayeredTimeline { conf, tenant_conf, @@ -663,14 +685,7 @@ impl LayeredTimeline { ancestor_timeline: ancestor, ancestor_lsn: metadata.ancestor_lsn(), - reconstruct_time_histo, - materialized_page_cache_hit_counter, - flush_time_histo, - compact_time_histo, - create_images_time_histo, - last_record_gauge, - wait_lsn_time_histo, - current_physical_size_gauge, + metrics: TimelineMetrics::new(&tenant_id, &timeline_id), upload_layers: AtomicBool::new(upload_layers), @@ -706,6 +721,8 @@ impl LayeredTimeline { let mut layers = self.layers.write().unwrap(); let mut num_layers = 0; + let timer = self.metrics.load_layer_map_histo.start_timer(); + // Scan timeline directory and create ImageFileName and DeltaFilename // structs representing all files on disk let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); @@ -777,7 +794,11 @@ impl LayeredTimeline { "loaded layer map with {} layers at {}, total physical size: {}", num_layers, disk_consistent_lsn, total_physical_size ); - self.current_physical_size_gauge.set(total_physical_size); + self.metrics + .current_physical_size_gauge + .set(total_physical_size); + + timer.stop_and_record(); Ok(()) } @@ -808,12 +829,16 @@ impl LayeredTimeline { } } + let timer = self.metrics.init_logical_size_histo.start_timer(); + // Have to calculate it the hard way let last_lsn = self.get_last_record_lsn(); let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?; self.current_logical_size .store(logical_size as isize, AtomicOrdering::SeqCst); debug!("calculated logical size the hard way: {}", logical_size); + + timer.stop_and_record(); Ok(()) } @@ -878,7 +903,7 @@ impl LayeredTimeline { ValueReconstructResult::Continue => { // If we reached an earlier cached page image, we're done. if cont_lsn == cached_lsn + 1 { - self.materialized_page_cache_hit_counter.inc_by(1); + self.metrics.materialized_page_cache_hit_counter.inc_by(1); return Ok(()); } if prev_lsn <= cont_lsn { @@ -1074,7 +1099,7 @@ impl LayeredTimeline { fn finish_write(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); - self.last_record_gauge.set(new_lsn.0 as i64); + self.metrics.last_record_gauge.set(new_lsn.0 as i64); self.last_record_lsn.advance(new_lsn); } @@ -1178,7 +1203,7 @@ impl LayeredTimeline { } }; - let timer = self.flush_time_histo.start_timer(); + let timer = self.metrics.flush_time_histo.start_timer(); loop { let layers = self.layers.read().unwrap(); @@ -1349,7 +1374,7 @@ impl LayeredTimeline { // update the timeline's physical size let sz = new_delta_path.metadata()?.len(); - self.current_physical_size_gauge.add(sz); + self.metrics.current_physical_size_gauge.add(sz); // update metrics NUM_PERSISTENT_FILES_CREATED.inc_by(1); PERSISTENT_BYTES_WRITTEN.inc_by(sz); @@ -1418,7 +1443,7 @@ impl LayeredTimeline { } // 3. Compact - let timer = self.compact_time_histo.start_timer(); + let timer = self.metrics.compact_time_histo.start_timer(); self.compact_level0(target_file_size)?; timer.stop_and_record(); } @@ -1494,7 +1519,7 @@ impl LayeredTimeline { lsn: Lsn, force: bool, ) -> Result> { - let timer = self.create_images_time_histo.start_timer(); + let timer = self.metrics.create_images_time_histo.start_timer(); let mut image_layers: Vec = Vec::new(); let mut layer_paths_to_upload = HashSet::new(); for partition in partitioning.parts.iter() { @@ -1538,7 +1563,8 @@ impl LayeredTimeline { let mut layers = self.layers.write().unwrap(); for l in image_layers { - self.current_physical_size_gauge + self.metrics + .current_physical_size_gauge .add(l.path().metadata()?.len()); layers.insert_historic(Arc::new(l)); } @@ -1788,7 +1814,8 @@ impl LayeredTimeline { let new_delta_path = l.path(); // update the timeline's physical size - self.current_physical_size_gauge + self.metrics + .current_physical_size_gauge .add(new_delta_path.metadata()?.len()); new_layer_paths.insert(new_delta_path); @@ -1801,7 +1828,9 @@ impl LayeredTimeline { drop(all_keys_iter); for l in deltas_to_compact { if let Some(path) = l.local_path() { - self.current_physical_size_gauge.sub(path.metadata()?.len()); + self.metrics + .current_physical_size_gauge + .sub(path.metadata()?.len()); layer_paths_do_delete.insert(path); } l.delete()?; @@ -2058,7 +2087,9 @@ impl LayeredTimeline { let mut layer_paths_to_delete = HashSet::with_capacity(layers_to_remove.len()); for doomed_layer in layers_to_remove { if let Some(path) = doomed_layer.local_path() { - self.current_physical_size_gauge.sub(path.metadata()?.len()); + self.metrics + .current_physical_size_gauge + .sub(path.metadata()?.len()); layer_paths_to_delete.insert(path); } doomed_layer.delete()?;