diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index c075315683..cf60a1a404 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -385,6 +385,26 @@ static REMOTE_TIMELINE_CLIENT_CALLS_STARTED_HIST: Lazy = Lazy::new .expect("failed to define a metric") }); +static REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_remote_timeline_client_bytes_started", + "Incremented by the number of bytes associated with a remote timeline client operation. \ + The increment happens when the operation is scheduled.", + &["tenant_id", "timeline_id", "file_kind", "op_kind"], + ) + .expect("failed to define a metric") +}); + +static REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "pageserver_remote_timeline_client_bytes_finished", + "Incremented by the number of bytes associated with a remote timeline client operation. \ + The increment happens when the operation finishes (regardless of success/failure/shutdown).", + &["tenant_id", "timeline_id", "file_kind", "op_kind"], + ) + .expect("failed to define a metric") +}); + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum RemoteOpKind { Upload, @@ -739,6 +759,8 @@ pub struct RemoteTimelineClientMetrics { remote_operation_time: Mutex>, calls_unfinished_gauge: Mutex>, calls_started_hist: Mutex>, + bytes_started_counter: Mutex>, + bytes_finished_counter: Mutex>, } impl RemoteTimelineClientMetrics { @@ -749,6 +771,8 @@ impl RemoteTimelineClientMetrics { remote_operation_time: Mutex::new(HashMap::default()), calls_unfinished_gauge: Mutex::new(HashMap::default()), calls_started_hist: Mutex::new(HashMap::default()), + bytes_started_counter: Mutex::new(HashMap::default()), + bytes_finished_counter: Mutex::new(HashMap::default()), remote_physical_size_gauge: Mutex::new(None), } } @@ -787,6 +811,7 @@ impl RemoteTimelineClientMetrics { }); metric.clone() } + fn calls_unfinished_gauge( &self, file_kind: &RemoteOpFileKind, @@ -828,32 +853,125 @@ impl RemoteTimelineClientMetrics { }); metric.clone() } + + fn bytes_started_counter( + &self, + file_kind: &RemoteOpFileKind, + op_kind: &RemoteOpKind, + ) -> IntCounter { + // XXX would be nice to have an upgradable RwLock + let mut guard = self.bytes_started_counter.lock().unwrap(); + let key = (file_kind.as_str(), op_kind.as_str()); + let metric = guard.entry(key).or_insert_with(move || { + REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER + .get_metric_with_label_values(&[ + &self.tenant_id.to_string(), + &self.timeline_id.to_string(), + key.0, + key.1, + ]) + .unwrap() + }); + metric.clone() + } + + fn bytes_finished_counter( + &self, + file_kind: &RemoteOpFileKind, + op_kind: &RemoteOpKind, + ) -> IntCounter { + // XXX would be nice to have an upgradable RwLock + let mut guard = self.bytes_finished_counter.lock().unwrap(); + let key = (file_kind.as_str(), op_kind.as_str()); + let metric = guard.entry(key).or_insert_with(move || { + REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER + .get_metric_with_label_values(&[ + &self.tenant_id.to_string(), + &self.timeline_id.to_string(), + key.0, + key.1, + ]) + .unwrap() + }); + metric.clone() + } +} + +#[cfg(test)] +impl RemoteTimelineClientMetrics { + pub fn get_bytes_started_counter_value( + &self, + file_kind: &RemoteOpFileKind, + op_kind: &RemoteOpKind, + ) -> Option { + let guard = self.bytes_started_counter.lock().unwrap(); + let key = (file_kind.as_str(), op_kind.as_str()); + guard.get(&key).map(|counter| counter.get()) + } + + pub fn get_bytes_finished_counter_value( + &self, + file_kind: &RemoteOpFileKind, + op_kind: &RemoteOpKind, + ) -> Option { + let guard = self.bytes_finished_counter.lock().unwrap(); + let key = (file_kind.as_str(), op_kind.as_str()); + guard.get(&key).map(|counter| counter.get()) + } } /// See [`RemoteTimelineClientMetrics::call_begin`]. #[must_use] -pub(crate) struct RemoteTimelineClientCallMetricGuard(Option); +pub(crate) struct RemoteTimelineClientCallMetricGuard { + /// Decremented on drop. + calls_unfinished_metric: Option, + /// If Some(), this references the bytes_finished metric, and we increment it by the given `u64` on drop. + bytes_finished: Option<(IntCounter, u64)>, +} impl RemoteTimelineClientCallMetricGuard { - /// Consume this guard object without decrementing the metric. - /// The caller vouches to do this manually, so that the prior increment of the gauge will cancel out. + /// Consume this guard object without performing the metric updates it would do on `drop()`. + /// The caller vouches to do the metric updates manually. pub fn will_decrement_manually(mut self) { - self.0 = None; // prevent drop() from decrementing + let RemoteTimelineClientCallMetricGuard { + calls_unfinished_metric, + bytes_finished, + } = &mut self; + calls_unfinished_metric.take(); + bytes_finished.take(); } } impl Drop for RemoteTimelineClientCallMetricGuard { fn drop(&mut self) { - if let RemoteTimelineClientCallMetricGuard(Some(guard)) = self { + let RemoteTimelineClientCallMetricGuard { + calls_unfinished_metric, + bytes_finished, + } = self; + if let Some(guard) = calls_unfinished_metric.take() { guard.dec(); } + if let Some((bytes_finished_metric, value)) = bytes_finished { + bytes_finished_metric.inc_by(*value); + } } } +/// The enum variants communicate to the [`RemoteTimelineClientMetrics`] whether to +/// track the byte size of this call in applicable metric(s). +pub(crate) enum RemoteTimelineClientMetricsCallTrackSize { + /// Do not account for this call's byte size in any metrics. + /// The `reason` field is there to make the call sites self-documenting + /// about why they don't need the metric. + DontTrackSize { reason: &'static str }, + /// Track the byte size of the call in applicable metric(s). + Bytes(u64), +} + impl RemoteTimelineClientMetrics { - /// Increment the metrics that track ongoing calls to the remote timeline client instance. + /// Update the metrics that change when a call to the remote timeline client instance starts. /// - /// Drop the returned guard object once the operation is finished to decrement the values. + /// Drop the returned guard object once the operation is finished to updates corresponding metrics that track completions. /// Or, use [`RemoteTimelineClientCallMetricGuard::will_decrement_manually`] and [`call_end`] if that /// is more suitable. /// Never do both. @@ -861,24 +979,51 @@ impl RemoteTimelineClientMetrics { &self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind, + size: RemoteTimelineClientMetricsCallTrackSize, ) -> RemoteTimelineClientCallMetricGuard { - let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind); + let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind); self.calls_started_hist(file_kind, op_kind) - .observe(unfinished_metric.get() as f64); - unfinished_metric.inc(); - RemoteTimelineClientCallMetricGuard(Some(unfinished_metric)) + .observe(calls_unfinished_metric.get() as f64); + calls_unfinished_metric.inc(); // NB: inc after the histogram, see comment on underlying metric + + let bytes_finished = match size { + RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => { + // nothing to do + None + } + RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => { + self.bytes_started_counter(file_kind, op_kind).inc_by(size); + let finished_counter = self.bytes_finished_counter(file_kind, op_kind); + Some((finished_counter, size)) + } + }; + RemoteTimelineClientCallMetricGuard { + calls_unfinished_metric: Some(calls_unfinished_metric), + bytes_finished, + } } - /// Manually decrement the metric instead of using the guard object. + /// Manually udpate the metrics that track completions, instead of using the guard object. /// Using the guard object is generally preferable. /// See [`call_begin`] for more context. - pub(crate) fn call_end(&self, file_kind: &RemoteOpFileKind, op_kind: &RemoteOpKind) { - let unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind); + pub(crate) fn call_end( + &self, + file_kind: &RemoteOpFileKind, + op_kind: &RemoteOpKind, + size: RemoteTimelineClientMetricsCallTrackSize, + ) { + let calls_unfinished_metric = self.calls_unfinished_gauge(file_kind, op_kind); debug_assert!( - unfinished_metric.get() > 0, + calls_unfinished_metric.get() > 0, "begin and end should cancel out" ); - unfinished_metric.dec(); + calls_unfinished_metric.dec(); + match size { + RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { reason: _reason } => {} + RemoteTimelineClientMetricsCallTrackSize::Bytes(size) => { + self.bytes_finished_counter(file_kind, op_kind).inc_by(size); + } + } } } @@ -891,6 +1036,8 @@ impl Drop for RemoteTimelineClientMetrics { remote_operation_time, calls_unfinished_gauge, calls_started_hist, + bytes_started_counter, + bytes_finished_counter, } = self; for ((a, b, c), _) in remote_operation_time.get_mut().unwrap().drain() { let _ = REMOTE_OPERATION_TIME.remove_label_values(&[tenant_id, timeline_id, a, b, c]); @@ -911,6 +1058,22 @@ impl Drop for RemoteTimelineClientMetrics { b, ]); } + for ((a, b), _) in bytes_started_counter.get_mut().unwrap().drain() { + let _ = REMOTE_TIMELINE_CLIENT_BYTES_STARTED_COUNTER.remove_label_values(&[ + tenant_id, + timeline_id, + a, + b, + ]); + } + for ((a, b), _) in bytes_finished_counter.get_mut().unwrap().drain() { + let _ = REMOTE_TIMELINE_CLIENT_BYTES_FINISHED_COUNTER.remove_label_values(&[ + tenant_id, + timeline_id, + a, + b, + ]); + } { let _ = remote_physical_size_gauge; // use to avoid 'unused' warning in desctructuring above let _ = REMOTE_PHYSICAL_SIZE.remove_label_values(&[tenant_id, timeline_id]); diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 28c4943dbd..c42824a8b5 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -219,7 +219,8 @@ use utils::lsn::Lsn; use crate::metrics::{ MeasureRemoteOp, RemoteOpFileKind, RemoteOpKind, RemoteTimelineClientMetrics, - REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS, + RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES, + REMOTE_ONDEMAND_DOWNLOADED_LAYERS, }; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::{ @@ -367,9 +368,13 @@ impl RemoteTimelineClient { /// Download index file pub async fn download_index_file(&self) -> Result { - let _unfinished_gauge_guard = self - .metrics - .call_begin(&RemoteOpFileKind::Index, &RemoteOpKind::Download); + let _unfinished_gauge_guard = self.metrics.call_begin( + &RemoteOpFileKind::Index, + &RemoteOpKind::Download, + crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { + reason: "no need for a downloads gauge", + }, + ); download::download_index_part( self.conf, @@ -398,9 +403,13 @@ impl RemoteTimelineClient { layer_metadata: &LayerFileMetadata, ) -> anyhow::Result { let downloaded_size = { - let _unfinished_gauge_guard = self - .metrics - .call_begin(&RemoteOpFileKind::Layer, &RemoteOpKind::Download); + let _unfinished_gauge_guard = self.metrics.call_begin( + &RemoteOpFileKind::Layer, + &RemoteOpKind::Download, + crate::metrics::RemoteTimelineClientMetricsCallTrackSize::DontTrackSize { + reason: "no need for a downloads gauge", + }, + ); download::download_layer_file( self.conf, &self.storage_impl, @@ -886,11 +895,32 @@ impl RemoteTimelineClient { fn calls_unfinished_metric_impl( &self, op: &UploadOp, - ) -> Option<(RemoteOpFileKind, RemoteOpKind)> { + ) -> Option<( + RemoteOpFileKind, + RemoteOpKind, + RemoteTimelineClientMetricsCallTrackSize, + )> { + use RemoteTimelineClientMetricsCallTrackSize::DontTrackSize; let res = match op { - UploadOp::UploadLayer(_, _) => (RemoteOpFileKind::Layer, RemoteOpKind::Upload), - UploadOp::UploadMetadata(_, _) => (RemoteOpFileKind::Index, RemoteOpKind::Upload), - UploadOp::Delete(file_kind, _) => (*file_kind, RemoteOpKind::Delete), + UploadOp::UploadLayer(_, m) => ( + RemoteOpFileKind::Layer, + RemoteOpKind::Upload, + RemoteTimelineClientMetricsCallTrackSize::Bytes(m.file_size()), + ), + UploadOp::UploadMetadata(_, _) => ( + RemoteOpFileKind::Index, + RemoteOpKind::Upload, + DontTrackSize { + reason: "metadata uploads are tiny", + }, + ), + UploadOp::Delete(file_kind, _) => ( + *file_kind, + RemoteOpKind::Delete, + DontTrackSize { + reason: "should we track deletes? positive or negative sign?", + }, + ), UploadOp::Barrier(_) => { // we do not account these return None; @@ -900,20 +930,20 @@ impl RemoteTimelineClient { } fn calls_unfinished_metric_begin(&self, op: &UploadOp) { - let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) { + let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) { Some(x) => x, None => return, }; - let guard = self.metrics.call_begin(&file_kind, &op_kind); + let guard = self.metrics.call_begin(&file_kind, &op_kind, track_bytes); guard.will_decrement_manually(); // in unfinished_ops_metric_end() } fn calls_unfinished_metric_end(&self, op: &UploadOp) { - let (file_kind, op_kind) = match self.calls_unfinished_metric_impl(op) { + let (file_kind, op_kind, track_bytes) = match self.calls_unfinished_metric_impl(op) { Some(x) => x, None => return, }; - self.metrics.call_end(&file_kind, &op_kind); + self.metrics.call_end(&file_kind, &op_kind, track_bytes); } fn stop(&self) { @@ -981,11 +1011,19 @@ impl RemoteTimelineClient { mod tests { use super::*; use crate::{ - tenant::harness::{TenantHarness, TIMELINE_ID}, + context::RequestContext, + tenant::{ + harness::{TenantHarness, TIMELINE_ID}, + Tenant, + }, DEFAULT_PG_VERSION, }; use remote_storage::{RemoteStorageConfig, RemoteStorageKind}; - use std::{collections::HashSet, path::Path}; + use std::{ + collections::HashSet, + path::{Path, PathBuf}, + }; + use tokio::runtime::EnterGuard; use utils::lsn::Lsn; pub(super) fn dummy_contents(name: &str) -> Vec { @@ -1034,39 +1072,80 @@ mod tests { assert_eq!(found, expected); } + struct TestSetup { + runtime: &'static tokio::runtime::Runtime, + entered_runtime: EnterGuard<'static>, + harness: TenantHarness<'static>, + tenant: Arc, + tenant_ctx: RequestContext, + remote_fs_dir: PathBuf, + client: Arc, + } + + impl TestSetup { + fn new(test_name: &str) -> anyhow::Result { + // Use a current-thread runtime in the test + let runtime = Box::leak(Box::new( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?, + )); + let entered_runtime = runtime.enter(); + + let test_name = Box::leak(Box::new(format!("remote_timeline_client__{test_name}"))); + let harness = TenantHarness::create(test_name)?; + let (tenant, ctx) = runtime.block_on(harness.load()); + // create an empty timeline directory + let timeline = + tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + let _ = timeline.initialize(&ctx).unwrap(); + + let remote_fs_dir = harness.conf.workdir.join("remote_fs"); + std::fs::create_dir_all(remote_fs_dir)?; + let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?; + + let storage_config = RemoteStorageConfig { + max_concurrent_syncs: std::num::NonZeroUsize::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, + ) + .unwrap(), + max_sync_errors: std::num::NonZeroU32::new( + remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, + ) + .unwrap(), + storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), + }; + + let storage = GenericRemoteStorage::from_config(&storage_config).unwrap(); + + let client = Arc::new(RemoteTimelineClient { + conf: harness.conf, + runtime, + tenant_id: harness.tenant_id, + timeline_id: TIMELINE_ID, + storage_impl: storage, + upload_queue: Mutex::new(UploadQueue::Uninitialized), + metrics: Arc::new(RemoteTimelineClientMetrics::new( + &harness.tenant_id, + &TIMELINE_ID, + )), + }); + + Ok(Self { + runtime, + entered_runtime, + harness, + tenant, + tenant_ctx: ctx, + remote_fs_dir, + client, + }) + } + } + // Test scheduling #[test] fn upload_scheduling() -> anyhow::Result<()> { - // Use a current-thread runtime in the test - let runtime = Box::leak(Box::new( - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?, - )); - let _entered = runtime.enter(); - - let harness = TenantHarness::create("upload_scheduling")?; - let (tenant, ctx) = runtime.block_on(harness.load()); - let _timeline = - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let timeline_path = harness.timeline_path(&TIMELINE_ID); - - let remote_fs_dir = harness.conf.workdir.join("remote_fs"); - std::fs::create_dir_all(remote_fs_dir)?; - let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?; - - let storage_config = RemoteStorageConfig { - max_concurrent_syncs: std::num::NonZeroUsize::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS, - ) - .unwrap(), - max_sync_errors: std::num::NonZeroU32::new( - remote_storage::DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS, - ) - .unwrap(), - storage: RemoteStorageKind::LocalFs(remote_fs_dir.clone()), - }; - // Test outline: // // Schedule upload of a bunch of layers. Check that they are started immediately, not queued @@ -1081,21 +1160,19 @@ mod tests { // Schedule another deletion. Check that it's launched immediately. // Schedule index upload. Check that it's queued - println!("workdir: {}", harness.conf.workdir.display()); - - let storage_impl = GenericRemoteStorage::from_config(&storage_config)?; - let client = Arc::new(RemoteTimelineClient { - conf: harness.conf, + let TestSetup { runtime, - tenant_id: harness.tenant_id, - timeline_id: TIMELINE_ID, - storage_impl, - upload_queue: Mutex::new(UploadQueue::Uninitialized), - metrics: Arc::new(RemoteTimelineClientMetrics::new( - &harness.tenant_id, - &TIMELINE_ID, - )), - }); + entered_runtime: _entered_runtime, + harness, + tenant: _tenant, + tenant_ctx: _tenant_ctx, + remote_fs_dir, + client, + } = TestSetup::new("upload_scheduling").unwrap(); + + let timeline_path = harness.timeline_path(&TIMELINE_ID); + + println!("workdir: {}", harness.conf.workdir.display()); let remote_timeline_dir = remote_fs_dir.join(timeline_path.strip_prefix(&harness.conf.workdir)?); @@ -1216,4 +1293,90 @@ mod tests { Ok(()) } + + #[test] + fn bytes_unfinished_gauge_for_layer_file_uploads() -> anyhow::Result<()> { + // Setup + + let TestSetup { + runtime, + harness, + client, + .. + } = TestSetup::new("metrics")?; + + let metadata = dummy_metadata(Lsn(0x10)); + client.init_upload_queue_for_empty_remote(&metadata)?; + + let timeline_path = harness.timeline_path(&TIMELINE_ID); + + let layer_file_name_1: LayerFileName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); + let content_1 = dummy_contents("foo"); + std::fs::write( + timeline_path.join(layer_file_name_1.file_name()), + &content_1, + )?; + + #[derive(Debug, PartialEq)] + struct BytesStartedFinished { + started: Option, + finished: Option, + } + let get_bytes_started_stopped = || { + let started = client + .metrics + .get_bytes_started_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload) + .map(|v| v.try_into().unwrap()); + let stopped = client + .metrics + .get_bytes_finished_counter_value(&RemoteOpFileKind::Layer, &RemoteOpKind::Upload) + .map(|v| v.try_into().unwrap()); + BytesStartedFinished { + started, + finished: stopped, + } + }; + + // Test + + let init = get_bytes_started_stopped(); + + client.schedule_layer_file_upload( + &layer_file_name_1, + &LayerFileMetadata::new(content_1.len() as u64), + )?; + + let pre = get_bytes_started_stopped(); + + runtime.block_on(client.wait_completion())?; + + let post = get_bytes_started_stopped(); + + // Validate + + assert_eq!( + init, + BytesStartedFinished { + started: None, + finished: None + } + ); + assert_eq!( + pre, + BytesStartedFinished { + started: Some(content_1.len()), + // assert that the _finished metric is created eagerly so that subtractions work on first sample + finished: Some(0), + } + ); + assert_eq!( + post, + BytesStartedFinished { + started: Some(content_1.len()), + finished: Some(content_1.len()) + } + ); + + Ok(()) + } } diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index c88b985c8e..5fed6fcf84 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -45,6 +45,8 @@ PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS: Tuple[str, ...] = ( *[f"pageserver_remote_timeline_client_calls_started_{x}" for x in ["bucket", "count", "sum"]], *[f"pageserver_remote_operation_seconds_{x}" for x in ["bucket", "count", "sum"]], "pageserver_remote_physical_size", + "pageserver_remote_timeline_client_bytes_started_total", + "pageserver_remote_timeline_client_bytes_finished_total", ) PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (