split out timeline metrics, track layer map loading and size calculation

This commit is contained in:
Dmitry Rodionov
2022-08-15 23:16:35 +03:00
committed by Dmitry Rodionov
parent 648e8bbefe
commit b21f7382cc

View File

@@ -4,6 +4,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use fail::fail_point;
use itertools::Itertools;
use metrics::core::{AtomicU64, GenericCounter};
use once_cell::sync::Lazy;
use tracing::*;
@@ -223,6 +224,70 @@ impl From<LayeredTimelineEntry> for RepositoryTimeline<LayeredTimeline> {
}
}
struct TimelineMetrics {
pub reconstruct_time_histo: Histogram,
pub materialized_page_cache_hit_counter: GenericCounter<AtomicU64>,
pub flush_time_histo: Histogram,
pub compact_time_histo: Histogram,
pub create_images_time_histo: Histogram,
pub init_logical_size_histo: Histogram,
pub load_layer_map_histo: Histogram,
pub last_record_gauge: IntGauge,
pub wait_lsn_time_histo: Histogram,
pub current_physical_size_gauge: UIntGauge,
}
impl TimelineMetrics {
fn new(tenant_id: &ZTenantId, timeline_id: &ZTimelineId) -> Self {
let tenant_id = tenant_id.to_string();
let timeline_id = timeline_id.to_string();
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let flush_time_histo = STORAGE_TIME
.get_metric_with_label_values(&["layer flush", &tenant_id, &timeline_id])
.unwrap();
let compact_time_histo = STORAGE_TIME
.get_metric_with_label_values(&["compact", &tenant_id, &timeline_id])
.unwrap();
let create_images_time_histo = STORAGE_TIME
.get_metric_with_label_values(&["create images", &tenant_id, &timeline_id])
.unwrap();
let init_logical_size_histo = STORAGE_TIME
.get_metric_with_label_values(&["init logical size", &tenant_id, &timeline_id])
.unwrap();
let load_layer_map_histo = STORAGE_TIME
.get_metric_with_label_values(&["load layer map", &tenant_id, &timeline_id])
.unwrap();
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let wait_lsn_time_histo = WAIT_LSN_TIME
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id, &timeline_id])
.unwrap();
TimelineMetrics {
reconstruct_time_histo,
materialized_page_cache_hit_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
init_logical_size_histo,
load_layer_map_histo,
last_record_gauge,
wait_lsn_time_histo,
current_physical_size_gauge,
}
}
}
pub struct LayeredTimeline {
conf: &'static PageServerConf,
tenant_conf: Arc<RwLock<TenantConfOpt>>,
@@ -269,14 +334,7 @@ pub struct LayeredTimeline {
ancestor_lsn: Lsn,
// Metrics
reconstruct_time_histo: Histogram,
materialized_page_cache_hit_counter: IntCounter,
flush_time_histo: Histogram,
compact_time_histo: Histogram,
create_images_time_histo: Histogram,
last_record_gauge: IntGauge,
wait_lsn_time_histo: Histogram,
current_physical_size_gauge: UIntGauge,
metrics: TimelineMetrics,
/// If `true`, will backup its files that appear after each checkpointing to the remote storage.
upload_layers: AtomicBool,
@@ -426,7 +484,7 @@ impl Timeline for LayeredTimeline {
"wait_lsn called by WAL receiver thread"
);
self.wait_lsn_time_histo.observe_closure_duration(
self.metrics.wait_lsn_time_histo.observe_closure_duration(
|| self.last_record_lsn
.wait_for_timeout(lsn, self.conf.wait_lsn_timeout)
.with_context(|| {
@@ -468,7 +526,8 @@ impl Timeline for LayeredTimeline {
self.get_reconstruct_data(key, lsn, &mut reconstruct_state)?;
self.reconstruct_time_histo
self.metrics
.reconstruct_time_histo
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
}
@@ -530,7 +589,7 @@ impl Timeline for LayeredTimeline {
}
fn get_physical_size(&self) -> u64 {
self.current_physical_size_gauge.get()
self.metrics.current_physical_size_gauge.get()
}
fn get_physical_size_non_incremental(&self) -> anyhow::Result<u64> {
@@ -604,43 +663,6 @@ impl LayeredTimeline {
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
) -> LayeredTimeline {
let reconstruct_time_histo = RECONSTRUCT_TIME
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let flush_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"layer flush",
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let compact_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"compact",
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let create_images_time_histo = STORAGE_TIME
.get_metric_with_label_values(&[
"create images",
&tenant_id.to_string(),
&timeline_id.to_string(),
])
.unwrap();
let last_record_gauge = LAST_RECORD_LSN
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let wait_lsn_time_histo = WAIT_LSN_TIME
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let current_physical_size_gauge = CURRENT_PHYSICAL_SIZE
.get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()])
.unwrap();
let mut result = LayeredTimeline {
conf,
tenant_conf,
@@ -663,14 +685,7 @@ impl LayeredTimeline {
ancestor_timeline: ancestor,
ancestor_lsn: metadata.ancestor_lsn(),
reconstruct_time_histo,
materialized_page_cache_hit_counter,
flush_time_histo,
compact_time_histo,
create_images_time_histo,
last_record_gauge,
wait_lsn_time_histo,
current_physical_size_gauge,
metrics: TimelineMetrics::new(&tenant_id, &timeline_id),
upload_layers: AtomicBool::new(upload_layers),
@@ -706,6 +721,8 @@ impl LayeredTimeline {
let mut layers = self.layers.write().unwrap();
let mut num_layers = 0;
let timer = self.metrics.load_layer_map_histo.start_timer();
// Scan timeline directory and create ImageFileName and DeltaFilename
// structs representing all files on disk
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
@@ -777,7 +794,11 @@ impl LayeredTimeline {
"loaded layer map with {} layers at {}, total physical size: {}",
num_layers, disk_consistent_lsn, total_physical_size
);
self.current_physical_size_gauge.set(total_physical_size);
self.metrics
.current_physical_size_gauge
.set(total_physical_size);
timer.stop_and_record();
Ok(())
}
@@ -808,12 +829,16 @@ impl LayeredTimeline {
}
}
let timer = self.metrics.init_logical_size_histo.start_timer();
// Have to calculate it the hard way
let last_lsn = self.get_last_record_lsn();
let logical_size = self.get_current_logical_size_non_incremental(last_lsn)?;
self.current_logical_size
.store(logical_size as isize, AtomicOrdering::SeqCst);
debug!("calculated logical size the hard way: {}", logical_size);
timer.stop_and_record();
Ok(())
}
@@ -878,7 +903,7 @@ impl LayeredTimeline {
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
self.materialized_page_cache_hit_counter.inc_by(1);
self.metrics.materialized_page_cache_hit_counter.inc_by(1);
return Ok(());
}
if prev_lsn <= cont_lsn {
@@ -1074,7 +1099,7 @@ impl LayeredTimeline {
fn finish_write(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());
self.last_record_gauge.set(new_lsn.0 as i64);
self.metrics.last_record_gauge.set(new_lsn.0 as i64);
self.last_record_lsn.advance(new_lsn);
}
@@ -1178,7 +1203,7 @@ impl LayeredTimeline {
}
};
let timer = self.flush_time_histo.start_timer();
let timer = self.metrics.flush_time_histo.start_timer();
loop {
let layers = self.layers.read().unwrap();
@@ -1349,7 +1374,7 @@ impl LayeredTimeline {
// update the timeline's physical size
let sz = new_delta_path.metadata()?.len();
self.current_physical_size_gauge.add(sz);
self.metrics.current_physical_size_gauge.add(sz);
// update metrics
NUM_PERSISTENT_FILES_CREATED.inc_by(1);
PERSISTENT_BYTES_WRITTEN.inc_by(sz);
@@ -1418,7 +1443,7 @@ impl LayeredTimeline {
}
// 3. Compact
let timer = self.compact_time_histo.start_timer();
let timer = self.metrics.compact_time_histo.start_timer();
self.compact_level0(target_file_size)?;
timer.stop_and_record();
}
@@ -1494,7 +1519,7 @@ impl LayeredTimeline {
lsn: Lsn,
force: bool,
) -> Result<HashSet<PathBuf>> {
let timer = self.create_images_time_histo.start_timer();
let timer = self.metrics.create_images_time_histo.start_timer();
let mut image_layers: Vec<ImageLayer> = Vec::new();
let mut layer_paths_to_upload = HashSet::new();
for partition in partitioning.parts.iter() {
@@ -1538,7 +1563,8 @@ impl LayeredTimeline {
let mut layers = self.layers.write().unwrap();
for l in image_layers {
self.current_physical_size_gauge
self.metrics
.current_physical_size_gauge
.add(l.path().metadata()?.len());
layers.insert_historic(Arc::new(l));
}
@@ -1788,7 +1814,8 @@ impl LayeredTimeline {
let new_delta_path = l.path();
// update the timeline's physical size
self.current_physical_size_gauge
self.metrics
.current_physical_size_gauge
.add(new_delta_path.metadata()?.len());
new_layer_paths.insert(new_delta_path);
@@ -1801,7 +1828,9 @@ impl LayeredTimeline {
drop(all_keys_iter);
for l in deltas_to_compact {
if let Some(path) = l.local_path() {
self.current_physical_size_gauge.sub(path.metadata()?.len());
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_do_delete.insert(path);
}
l.delete()?;
@@ -2058,7 +2087,9 @@ impl LayeredTimeline {
let mut layer_paths_to_delete = HashSet::with_capacity(layers_to_remove.len());
for doomed_layer in layers_to_remove {
if let Some(path) = doomed_layer.local_path() {
self.current_physical_size_gauge.sub(path.metadata()?.len());
self.metrics
.current_physical_size_gauge
.sub(path.metadata()?.len());
layer_paths_to_delete.insert(path);
}
doomed_layer.delete()?;