diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index da9c095a15..e2a84d0c24 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -89,16 +89,112 @@ //! [`RequestContext`] argument. Functions in the middle of the call chain //! only need to pass it on. -use crate::task_mgr::TaskKind; +use std::sync::Arc; + +use once_cell::sync::Lazy; +use tracing::warn; +use utils::{id::TimelineId, shard::TenantShardId}; + +use crate::{ + metrics::{StorageIoSizeMetrics, TimelineMetrics}, + task_mgr::TaskKind, + tenant::Timeline, +}; // The main structure of this module, see module-level comment. -#[derive(Debug)] pub struct RequestContext { task_kind: TaskKind, download_behavior: DownloadBehavior, access_stats_behavior: AccessStatsBehavior, page_content_kind: PageContentKind, read_path_debug: bool, + scope: Scope, +} + +#[derive(Clone)] +pub(crate) enum Scope { + Global { + io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics, + }, + SecondaryTenant { + io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics, + }, + SecondaryTimeline { + io_size_metrics: crate::metrics::StorageIoSizeMetrics, + }, + Timeline { + // We wrap the `Arc`s inside another Arc to avoid child + // context creation contending for the ref counters of the Arc, + // which are shared among all tasks that operate on the timeline, especially + // concurrent page_service connections. + #[allow(clippy::redundant_allocation)] + arc_arc: Arc>, + }, + #[cfg(test)] + UnitTest { + io_size_metrics: &'static crate::metrics::StorageIoSizeMetrics, + }, +} + +static GLOBAL_IO_SIZE_METRICS: Lazy = + Lazy::new(|| crate::metrics::StorageIoSizeMetrics::new("*", "*", "*")); + +impl Scope { + pub(crate) fn new_global() -> Self { + Scope::Global { + io_size_metrics: &GLOBAL_IO_SIZE_METRICS, + } + } + /// NB: this allocates, so, use only at relatively long-lived roots, e.g., at start + /// of a compaction iteration. + pub(crate) fn new_timeline(timeline: &Timeline) -> Self { + Scope::Timeline { + arc_arc: Arc::new(Arc::clone(&timeline.metrics)), + } + } + pub(crate) fn new_page_service_pagestream( + timeline_handle: &crate::tenant::timeline::handle::Handle< + crate::page_service::TenantManagerTypes, + >, + ) -> Self { + Scope::Timeline { + arc_arc: Arc::clone(&timeline_handle.metrics), + } + } + pub(crate) fn new_secondary_timeline( + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + ) -> Self { + // TODO(https://github.com/neondatabase/neon/issues/11156): secondary timelines have no infrastructure for metrics lifecycle. + + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let shard_id = tenant_shard_id.shard_slug().to_string(); + let timeline_id = timeline_id.to_string(); + + let io_size_metrics = + crate::metrics::StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id); + Scope::SecondaryTimeline { io_size_metrics } + } + pub(crate) fn new_secondary_tenant(_tenant_shard_id: &TenantShardId) -> Self { + // Before propagating metrics via RequestContext, the labels were inferred from file path. + // The only user of VirtualFile at tenant scope is the heatmap download & read. + // The inferred labels for the path of the heatmap file on local disk were that of the global metric (*,*,*). + // Thus, we do the same here, and extend that for anything secondary-tenant scoped. + // + // If we want to have (tenant_id, shard_id, '*') labels for secondary tenants in the future, + // we will need to think about the metric lifecycle, i.e., remove them during secondary tenant shutdown, + // like we do for attached timelines. (We don't have attached-tenant-scoped usage of VirtualFile + // at this point, so, we were able to completely side-step tenant-scoped stuff there). + Scope::SecondaryTenant { + io_size_metrics: &GLOBAL_IO_SIZE_METRICS, + } + } + #[cfg(test)] + pub(crate) fn new_unit_test() -> Self { + Scope::UnitTest { + io_size_metrics: &GLOBAL_IO_SIZE_METRICS, + } + } } /// The kind of access to the page cache. @@ -157,6 +253,7 @@ impl RequestContextBuilder { access_stats_behavior: AccessStatsBehavior::Update, page_content_kind: PageContentKind::Unknown, read_path_debug: false, + scope: Scope::new_global(), }, } } @@ -171,10 +268,16 @@ impl RequestContextBuilder { access_stats_behavior: original.access_stats_behavior, page_content_kind: original.page_content_kind, read_path_debug: original.read_path_debug, + scope: original.scope.clone(), }, } } + pub fn task_kind(mut self, k: TaskKind) -> Self { + self.inner.task_kind = k; + self + } + /// Configure the DownloadBehavior of the context: whether to /// download missing layers, and/or warn on the download. pub fn download_behavior(mut self, b: DownloadBehavior) -> Self { @@ -199,6 +302,11 @@ impl RequestContextBuilder { self } + pub(crate) fn scope(mut self, s: Scope) -> Self { + self.inner.scope = s; + self + } + pub fn build(self) -> RequestContext { self.inner } @@ -281,7 +389,50 @@ impl RequestContext { } fn child_impl(&self, task_kind: TaskKind, download_behavior: DownloadBehavior) -> Self { - Self::new(task_kind, download_behavior) + RequestContextBuilder::extend(self) + .task_kind(task_kind) + .download_behavior(download_behavior) + .build() + } + + pub fn with_scope_timeline(&self, timeline: &Arc) -> Self { + RequestContextBuilder::extend(self) + .scope(Scope::new_timeline(timeline)) + .build() + } + + pub(crate) fn with_scope_page_service_pagestream( + &self, + timeline_handle: &crate::tenant::timeline::handle::Handle< + crate::page_service::TenantManagerTypes, + >, + ) -> Self { + RequestContextBuilder::extend(self) + .scope(Scope::new_page_service_pagestream(timeline_handle)) + .build() + } + + pub fn with_scope_secondary_timeline( + &self, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + ) -> Self { + RequestContextBuilder::extend(self) + .scope(Scope::new_secondary_timeline(tenant_shard_id, timeline_id)) + .build() + } + + pub fn with_scope_secondary_tenant(&self, tenant_shard_id: &TenantShardId) -> Self { + RequestContextBuilder::extend(self) + .scope(Scope::new_secondary_tenant(tenant_shard_id)) + .build() + } + + #[cfg(test)] + pub fn with_scope_unit_test(&self) -> Self { + RequestContextBuilder::new(TaskKind::UnitTest) + .scope(Scope::new_unit_test()) + .build() } pub fn task_kind(&self) -> TaskKind { @@ -303,4 +454,38 @@ impl RequestContext { pub(crate) fn read_path_debug(&self) -> bool { self.read_path_debug } + + pub(crate) fn io_size_metrics(&self) -> &StorageIoSizeMetrics { + match &self.scope { + Scope::Global { io_size_metrics } => { + let is_unit_test = cfg!(test); + let is_regress_test_build = cfg!(feature = "testing"); + if is_unit_test || is_regress_test_build { + panic!("all VirtualFile instances are timeline-scoped"); + } else { + use once_cell::sync::Lazy; + use std::sync::Mutex; + use std::time::Duration; + use utils::rate_limit::RateLimit; + static LIMIT: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1)))); + let mut guard = LIMIT.lock().unwrap(); + guard.call2(|rate_limit_stats| { + warn!( + %rate_limit_stats, + backtrace=%std::backtrace::Backtrace::force_capture(), + "all VirtualFile instances are timeline-scoped", + ); + }); + + io_size_metrics + } + } + Scope::Timeline { arc_arc } => &arc_arc.storage_io_size, + Scope::SecondaryTimeline { io_size_metrics } => io_size_metrics, + Scope::SecondaryTenant { io_size_metrics } => io_size_metrics, + #[cfg(test)] + Scope::UnitTest { io_size_metrics } => io_size_metrics, + } + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3c0c23a56d..77bfab47e0 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -55,6 +55,7 @@ use utils::id::{TenantId, TimelineId}; use utils::lsn::Lsn; use crate::config::PageServerConf; +use crate::context; use crate::context::{DownloadBehavior, RequestContext, RequestContextBuilder}; use crate::deletion_queue::DeletionQueueClient; use crate::pgdatadir_mapping::LsnForTimestamp; @@ -953,12 +954,13 @@ async fn timeline_detail_handler( tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; let timeline = tenant.get_timeline(timeline_id, false)?; + let ctx = &ctx.with_scope_timeline(&timeline); let timeline_info = build_timeline_info( &timeline, include_non_incremental_logical_size.unwrap_or(false), force_await_initial_logical_size.unwrap_or(false), - &ctx, + ctx, ) .await .context("get local timeline info") @@ -1002,7 +1004,8 @@ async fn get_lsn_by_timestamp_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download) + .with_scope_timeline(&timeline); let result = timeline .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx) .await?; @@ -1074,7 +1077,8 @@ async fn get_timestamp_of_lsn_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download) + .with_scope_timeline(&timeline); let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?; match result { @@ -1429,7 +1433,8 @@ async fn timeline_layer_scan_disposable_keys( active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download) + .with_scope_timeline(&timeline); let guard = timeline.layers.read().await; let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else { @@ -1515,7 +1520,8 @@ async fn timeline_download_heatmap_layers_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download) + .with_scope_timeline(&timeline); let max_concurrency = get_config(&request) .remote_storage_config @@ -1563,7 +1569,8 @@ async fn layer_download_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download) + .with_scope_timeline(&timeline); let downloaded = timeline .download_layer(&layer_name, &ctx) .await @@ -2299,8 +2306,8 @@ async fn timeline_compact_handler( .unwrap_or(false); async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download).with_scope_timeline(&timeline); if scheduled { let tenant = state .tenant_manager @@ -2407,8 +2414,8 @@ async fn timeline_checkpoint_handler( parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false); async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download).with_scope_timeline(&timeline); if wait_until_flushed { timeline.freeze_and_flush().await } else { @@ -2463,7 +2470,8 @@ async fn timeline_download_remote_layers_handler_post( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download) + .with_scope_timeline(&timeline); match timeline.spawn_download_all_remote_layers(body, &ctx).await { Ok(st) => json_response(StatusCode::ACCEPTED, st), Err(st) => json_response(StatusCode::CONFLICT, st), @@ -2546,6 +2554,7 @@ async fn timeline_detach_ancestor_handler( tracing::info!("all timeline upload queues are drained"); let timeline = tenant.get_timeline(timeline_id, true)?; + let ctx = &ctx.with_scope_timeline(&timeline); let progress = timeline .prepare_to_detach_from_ancestor(&tenant, options, ctx) @@ -2652,8 +2661,9 @@ async fn getpage_at_lsn_handler_inner( async { let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); // Enable read path debugging - let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true).build(); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; + let ctx = RequestContextBuilder::extend(&ctx).read_path_debug(true) + .scope(context::Scope::new_timeline(&timeline)).build(); // Use last_record_lsn if no lsn is provided let lsn = lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); @@ -2687,8 +2697,8 @@ async fn timeline_collect_keyspace( let at_lsn: Option = parse_query_param(&request, "at_lsn")?; async { - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download).with_scope_timeline(&timeline); let at_lsn = at_lsn.unwrap_or_else(|| timeline.get_last_record_lsn()); let (dense_ks, sparse_ks) = timeline .collect_keyspace(at_lsn, &ctx) @@ -3325,7 +3335,7 @@ async fn put_tenant_timeline_import_basebackup( tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; - let timeline = tenant + let (timeline, timeline_ctx) = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .map_err(ApiError::InternalServerError) .await?; @@ -3344,7 +3354,13 @@ async fn put_tenant_timeline_import_basebackup( info!("importing basebackup"); timeline - .import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &ctx) + .import_basebackup_from_tar( + tenant.clone(), + &mut body, + base_lsn, + broker_client, + &timeline_ctx, + ) .await .map_err(ApiError::InternalServerError)?; @@ -3384,6 +3400,7 @@ async fn put_tenant_timeline_import_wal( let state = get_state(&request); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, TenantShardId::unsharded(tenant_id), timeline_id).await?; + let ctx = RequestContextBuilder::extend(&ctx).scope(context::Scope::new_timeline(&timeline)).build(); let mut body = StreamReader::new(request.into_body().map(|res| { res.map_err(|error| { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b5b4e5c91f..fd90ef8cd7 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1227,11 +1227,24 @@ impl StorageIoTime { pub(crate) static STORAGE_IO_TIME_METRIC: Lazy = Lazy::new(StorageIoTime::new); -const STORAGE_IO_SIZE_OPERATIONS: &[&str] = &["read", "write"]; +#[derive(Clone, Copy)] +#[repr(usize)] +enum StorageIoSizeOperation { + Read, + Write, +} + +impl StorageIoSizeOperation { + const VARIANTS: &'static [&'static str] = &["read", "write"]; + + fn as_str(&self) -> &'static str { + Self::VARIANTS[*self as usize] + } +} // Needed for the https://neonprod.grafana.net/d/5uK9tHL4k/picking-tenant-for-relocation?orgId=1 -pub(crate) static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { - register_int_gauge_vec!( +static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( "pageserver_io_operations_bytes_total", "Total amount of bytes read/written in IO operations", &["operation", "tenant_id", "shard_id", "timeline_id"] @@ -1239,6 +1252,34 @@ pub(crate) static STORAGE_IO_SIZE: Lazy = Lazy::new(|| { .expect("failed to define a metric") }); +#[derive(Clone, Debug)] +pub(crate) struct StorageIoSizeMetrics { + pub read: UIntGauge, + pub write: UIntGauge, +} + +impl StorageIoSizeMetrics { + pub(crate) fn new(tenant_id: &str, shard_id: &str, timeline_id: &str) -> Self { + let read = STORAGE_IO_SIZE + .get_metric_with_label_values(&[ + StorageIoSizeOperation::Read.as_str(), + tenant_id, + shard_id, + timeline_id, + ]) + .unwrap(); + let write = STORAGE_IO_SIZE + .get_metric_with_label_values(&[ + StorageIoSizeOperation::Write.as_str(), + tenant_id, + shard_id, + timeline_id, + ]) + .unwrap(); + Self { read, write } + } +} + #[cfg(not(test))] pub(crate) mod virtual_file_descriptor_cache { use super::*; @@ -2821,6 +2862,7 @@ pub(crate) struct TimelineMetrics { /// Number of valid LSN leases. pub valid_lsn_lease_count_gauge: UIntGauge, pub wal_records_received: IntCounter, + pub storage_io_size: StorageIoSizeMetrics, shutdown: std::sync::atomic::AtomicBool, } @@ -2956,6 +2998,8 @@ impl TimelineMetrics { .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .unwrap(); + let storage_io_size = StorageIoSizeMetrics::new(&tenant_id, &shard_id, &timeline_id); + TimelineMetrics { tenant_id, shard_id, @@ -2985,6 +3029,7 @@ impl TimelineMetrics { evictions_with_low_residence_duration: std::sync::RwLock::new( evictions_with_low_residence_duration, ), + storage_io_size, valid_lsn_lease_count_gauge, wal_records_received, shutdown: std::sync::atomic::AtomicBool::default(), @@ -3175,7 +3220,7 @@ impl TimelineMetrics { ]); } - for op in STORAGE_IO_SIZE_OPERATIONS { + for op in StorageIoSizeOperation::VARIANTS { let _ = STORAGE_IO_SIZE.remove_label_values(&[op, tenant_id, shard_id, timeline_id]); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index ba2ed9dc81..f2d2ab05ad 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -56,6 +56,7 @@ use crate::config::PageServerConf; use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::{ self, COMPUTE_COMMANDS_COUNTERS, ComputeCommandKind, LIVE_CONNECTIONS, SmgrOpTimer, + TimelineMetrics, }; use crate::pgdatadir_mapping::Version; use crate::span::{ @@ -423,6 +424,9 @@ impl timeline::handle::Types for TenantManagerTypes { pub(crate) struct TenantManagerCacheItem { pub(crate) timeline: Arc, + // allow() for cheap propagation through RequestContext inside a task + #[allow(clippy::redundant_allocation)] + pub(crate) metrics: Arc>, #[allow(dead_code)] // we store it to keep the gate open pub(crate) gate_guard: GateGuard, } @@ -506,8 +510,11 @@ impl timeline::handle::TenantManager for TenantManagerWrappe } }; + let metrics = Arc::new(Arc::clone(&timeline.metrics)); + Ok(TenantManagerCacheItem { timeline, + metrics, gate_guard, }) } @@ -1238,6 +1245,14 @@ impl PageServerHandler { ), QueryError, > { + macro_rules! upgrade_handle_and_set_context { + ($shard:ident) => {{ + let weak_handle = &$shard; + let handle = weak_handle.upgrade()?; + let ctx = ctx.with_scope_page_service_pagestream(&handle); + (handle, ctx) + }}; + } Ok(match batch { BatchedFeMessage::Exists { span, @@ -1246,9 +1261,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::exists"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx) + self.handle_get_rel_exists_request(&shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1264,9 +1280,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx) + self.handle_get_nblocks_request(&shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1282,17 +1299,18 @@ impl PageServerHandler { pages, } => { fail::fail_point!("ps::handle-pagerequest-message::getpage"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( { let npages = pages.len(); trace!(npages, "handling getpage request"); let res = self .handle_get_page_at_lsn_request_batched( - &*shard.upgrade()?, + &shard, effective_request_lsn, pages, io_concurrency, - ctx, + &ctx, ) .instrument(span.clone()) .await; @@ -1309,9 +1327,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_db_size_request(&*shard.upgrade()?, &req, ctx) + self.handle_db_size_request(&shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1327,9 +1346,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![ - self.handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx) + self.handle_get_slru_segment_request(&shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1345,12 +1365,13 @@ impl PageServerHandler { requests, } => { fail::fail_point!("ps::handle-pagerequest-message::test"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( { let npages = requests.len(); trace!(npages, "handling getpage request"); let res = self - .handle_test_request_batch(&*shard.upgrade()?, requests, ctx) + .handle_test_request_batch(&shard, requests, &ctx) .instrument(span.clone()) .await; assert_eq!(res.len(), npages); @@ -2126,6 +2147,7 @@ impl PageServerHandler { .get(tenant_id, timeline_id, ShardSelector::Zero) .await?; set_tracing_field_shard_id(&timeline); + let ctx = ctx.with_scope_timeline(&timeline); if timeline.is_archived() == Some(true) { tracing::info!( @@ -2143,7 +2165,7 @@ impl PageServerHandler { lsn, crate::tenant::timeline::WaitLsnWaiter::PageService, crate::tenant::timeline::WaitLsnTimeout::Default, - ctx, + &ctx, ) .await?; timeline @@ -2169,7 +2191,7 @@ impl PageServerHandler { prev_lsn, full_backup, replica, - ctx, + &ctx, ) .await .map_err(map_basebackup_error)?; @@ -2192,7 +2214,7 @@ impl PageServerHandler { prev_lsn, full_backup, replica, - ctx, + &ctx, ) .await .map_err(map_basebackup_error)?; @@ -2209,7 +2231,7 @@ impl PageServerHandler { prev_lsn, full_backup, replica, - ctx, + &ctx, ) .await .map_err(map_basebackup_error)?; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 8bcc6d58ec..4685f9383b 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -2758,7 +2758,7 @@ mod tests { TimelineId::from_array(hex!("11223344556677881122334455667788")); let (tenant, ctx) = harness.load().await; - let tline = tenant + let (tline, ctx) = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx) .await?; let tline = tline.raw_timeline().unwrap(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c78d15c9b5..3a34c8e254 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -77,6 +77,8 @@ use self::timeline::{ EvictionTaskTenantState, GcCutoffs, TimelineDeleteProgress, TimelineResources, WaitLsnError, }; use crate::config::PageServerConf; +use crate::context; +use crate::context::RequestContextBuilder; use crate::context::{DownloadBehavior, RequestContext}; use crate::deletion_queue::{DeletionQueueClient, DeletionQueueError}; use crate::l0_flush::L0FlushGlobalState; @@ -1114,7 +1116,7 @@ impl Tenant { } }; - let timeline = self.create_timeline_struct( + let (timeline, timeline_ctx) = self.create_timeline_struct( timeline_id, &metadata, previous_heatmap, @@ -1124,6 +1126,7 @@ impl Tenant { idempotency.clone(), index_part.gc_compaction.clone(), index_part.rel_size_migration.clone(), + ctx, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -1257,7 +1260,7 @@ impl Tenant { match activate { ActivateTimelineArgs::Yes { broker_client } => { info!("activating timeline after reload from pgdata import task"); - timeline.activate(self.clone(), broker_client, None, ctx); + timeline.activate(self.clone(), broker_client, None, &timeline_ctx); } ActivateTimelineArgs::No => (), } @@ -1765,6 +1768,7 @@ impl Tenant { import_pgdata, ActivateTimelineArgs::No, guard, + ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), )); } } @@ -1782,6 +1786,7 @@ impl Tenant { timeline_id, &index_part.metadata, remote_timeline_client, + ctx, ) .instrument(tracing::info_span!("timeline_delete", %timeline_id)) .await @@ -2219,7 +2224,7 @@ impl Tenant { self.clone(), broker_client.clone(), background_jobs_can_start, - &ctx, + &ctx.with_scope_timeline(&timeline), ); } @@ -2416,8 +2421,8 @@ impl Tenant { new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, - _ctx: &RequestContext, - ) -> anyhow::Result { + ctx: &RequestContext, + ) -> anyhow::Result<(UninitializedTimeline, RequestContext)> { anyhow::ensure!( self.is_active(), "Cannot create empty timelines on inactive tenant" @@ -2452,6 +2457,7 @@ impl Tenant { initdb_lsn, None, None, + ctx, ) .await } @@ -2469,7 +2475,7 @@ impl Tenant { pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let uninit_tl = self + let (uninit_tl, ctx) = self .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) .await?; let tline = uninit_tl.raw_timeline().expect("we just created it"); @@ -2481,7 +2487,7 @@ impl Tenant { .init_empty_test_timeline() .context("init_empty_test_timeline")?; modification - .commit(ctx) + .commit(&ctx) .await .context("commit init_empty_test_timeline modification")?; @@ -2699,7 +2705,12 @@ impl Tenant { // doing stuff before the IndexPart is durable in S3, which is done by the previous section. let activated_timeline = match result { CreateTimelineResult::Created(timeline) => { - timeline.activate(self.clone(), broker_client, None, ctx); + timeline.activate( + self.clone(), + broker_client, + None, + &ctx.with_scope_timeline(&timeline), + ); timeline } CreateTimelineResult::Idempotent(timeline) => { @@ -2761,10 +2772,9 @@ impl Tenant { } }; - let mut uninit_timeline = { + let (mut uninit_timeline, timeline_ctx) = { let this = &self; let initdb_lsn = Lsn(0); - let _ctx = ctx; async move { let new_metadata = TimelineMetadata::new( // Initialize disk_consistent LSN to 0, The caller must import some data to @@ -2784,6 +2794,7 @@ impl Tenant { initdb_lsn, None, None, + ctx, ) .await } @@ -2813,6 +2824,7 @@ impl Tenant { index_part, activate, timeline_create_guard, + timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn), )); // NB: the timeline doesn't exist in self.timelines at this point @@ -2826,6 +2838,7 @@ impl Tenant { index_part: import_pgdata::index_part_format::Root, activate: ActivateTimelineArgs, timeline_create_guard: TimelineCreateGuard, + ctx: RequestContext, ) { debug_assert_current_span_has_tenant_and_timeline_id(); info!("starting"); @@ -2837,6 +2850,7 @@ impl Tenant { index_part, activate, timeline_create_guard, + ctx, ) .await; if let Err(err) = &res { @@ -2852,9 +2866,8 @@ impl Tenant { index_part: import_pgdata::index_part_format::Root, activate: ActivateTimelineArgs, timeline_create_guard: TimelineCreateGuard, + ctx: RequestContext, ) -> Result<(), anyhow::Error> { - let ctx = RequestContext::new(TaskKind::ImportPgdata, DownloadBehavior::Warn); - info!("importing pgdata"); import_pgdata::doit(&timeline, index_part, &ctx, self.cancel.clone()) .await @@ -3063,6 +3076,7 @@ impl Tenant { let mut has_pending_l0 = false; for timeline in compact_l0 { + let ctx = &ctx.with_scope_timeline(&timeline); let outcome = timeline .compact(cancel, CompactFlags::OnlyL0Compaction.into(), ctx) .instrument(info_span!("compact_timeline", timeline_id = %timeline.timeline_id)) @@ -3096,6 +3110,7 @@ impl Tenant { if !timeline.is_active() { continue; } + let ctx = &ctx.with_scope_timeline(&timeline); let mut outcome = timeline .compact(cancel, EnumSet::default(), ctx) @@ -3321,7 +3336,7 @@ impl Tenant { self.clone(), broker_client.clone(), background_jobs_can_start, - ctx, + &ctx.with_scope_timeline(timeline), ); activated_timelines += 1; } @@ -4136,7 +4151,8 @@ impl Tenant { create_idempotency: CreateTimelineIdempotency, gc_compaction_state: Option, rel_size_v2_status: Option, - ) -> anyhow::Result> { + ctx: &RequestContext, + ) -> anyhow::Result<(Arc, RequestContext)> { let state = match cause { CreateTimelineCause::Load => { let ancestor_id = new_metadata.ancestor_timeline(); @@ -4172,7 +4188,11 @@ impl Tenant { self.cancel.child_token(), ); - Ok(timeline) + let timeline_ctx = RequestContextBuilder::extend(ctx) + .scope(context::Scope::new_timeline(&timeline)) + .build(); + + Ok((timeline, timeline_ctx)) } /// [`Tenant::shutdown`] must be called before dropping the returned [`Tenant`] object @@ -4588,6 +4608,7 @@ impl Tenant { // Ensures all timelines use the same start time when computing the time cutoff. let now_ts_for_pitr_calc = SystemTime::now(); for timeline in timelines.iter() { + let ctx = &ctx.with_scope_timeline(timeline); let cutoff = timeline .get_last_record_lsn() .checked_sub(horizon) @@ -4761,7 +4782,7 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> Result { let src_id = src_timeline.timeline_id; @@ -4864,7 +4885,7 @@ impl Tenant { src_timeline.pg_version, ); - let uninitialized_timeline = self + let (uninitialized_timeline, _timeline_ctx) = self .prepare_new_timeline( dst_id, &metadata, @@ -4872,6 +4893,7 @@ impl Tenant { start_lsn + 1, Some(Arc::clone(src_timeline)), Some(src_timeline.get_rel_size_v2_status()), + ctx, ) .await?; @@ -5138,7 +5160,7 @@ impl Tenant { pgdata_lsn, pg_version, ); - let mut raw_timeline = self + let (mut raw_timeline, timeline_ctx) = self .prepare_new_timeline( timeline_id, &new_metadata, @@ -5146,6 +5168,7 @@ impl Tenant { pgdata_lsn, None, None, + ctx, ) .await?; @@ -5156,7 +5179,7 @@ impl Tenant { &unfinished_timeline, &pgdata_path, pgdata_lsn, - ctx, + &timeline_ctx, ) .await .with_context(|| { @@ -5217,6 +5240,7 @@ impl Tenant { /// An empty layer map is initialized, and new data and WAL can be imported starting /// at 'disk_consistent_lsn'. After any initial data has been imported, call /// `finish_creation` to insert the Timeline into the timelines map. + #[allow(clippy::too_many_arguments)] async fn prepare_new_timeline<'a>( &'a self, new_timeline_id: TimelineId, @@ -5225,7 +5249,8 @@ impl Tenant { start_lsn: Lsn, ancestor: Option>, rel_size_v2_status: Option, - ) -> anyhow::Result> { + ctx: &RequestContext, + ) -> anyhow::Result<(UninitializedTimeline<'a>, RequestContext)> { let tenant_shard_id = self.tenant_shard_id; let resources = self.build_timeline_resources(new_timeline_id); @@ -5233,7 +5258,7 @@ impl Tenant { .remote_client .init_upload_queue_for_empty_remote(new_metadata, rel_size_v2_status.clone())?; - let timeline_struct = self + let (timeline_struct, timeline_ctx) = self .create_timeline_struct( new_timeline_id, new_metadata, @@ -5244,6 +5269,7 @@ impl Tenant { create_guard.idempotency.clone(), None, rel_size_v2_status, + ctx, ) .context("Failed to create timeline data structure")?; @@ -5264,10 +5290,13 @@ impl Tenant { "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}" ); - Ok(UninitializedTimeline::new( - self, - new_timeline_id, - Some((timeline_struct, create_guard)), + Ok(( + UninitializedTimeline::new( + self, + new_timeline_id, + Some((timeline_struct, create_guard)), + ), + timeline_ctx, )) } @@ -5802,7 +5831,8 @@ pub(crate) mod harness { } pub(crate) async fn load(&self) -> (Arc, RequestContext) { - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error) + .with_scope_unit_test(); ( self.do_try_load(&ctx) .await @@ -6825,7 +6855,7 @@ mod tests { let (tenant, ctx) = harness.load().await; let io_concurrency = IoConcurrency::spawn_for_test(); - let tline = tenant + let (tline, ctx) = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) .await?; let tline = tline.raw_timeline().unwrap(); @@ -7447,7 +7477,7 @@ mod tests { .await; let initdb_lsn = Lsn(0x20); - let utline = tenant + let (utline, ctx) = tenant .create_empty_timeline(TIMELINE_ID, initdb_lsn, DEFAULT_PG_VERSION, &ctx) .await?; let tline = utline.raw_timeline().unwrap(); @@ -7514,7 +7544,7 @@ mod tests { let harness = TenantHarness::create(name).await?; { let (tenant, ctx) = harness.load().await; - let tline = tenant + let (tline, _ctx) = tenant .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) .await?; // Leave the timeline ID in [`Tenant::timelines_creating`] to exclude attempting to create it again diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index b16a88eaa4..ff9a7e57b6 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -471,7 +471,8 @@ pub(crate) mod tests { blobs: &[Vec], compression: bool, ) -> Result<(), Error> { - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = write_maybe_compressed::(blobs, compression, &ctx).await?; diff --git a/pageserver/src/tenant/disk_btree.rs b/pageserver/src/tenant/disk_btree.rs index 73c105b34e..1791e5996c 100644 --- a/pageserver/src/tenant/disk_btree.rs +++ b/pageserver/src/tenant/disk_btree.rs @@ -32,8 +32,7 @@ use hex; use thiserror::Error; use tracing::error; -use crate::context::{DownloadBehavior, RequestContext}; -use crate::task_mgr::TaskKind; +use crate::context::RequestContext; use crate::tenant::block_io::{BlockReader, BlockWriter}; // The maximum size of a value stored in the B-tree. 5 bytes is enough currently. @@ -477,16 +476,15 @@ where } #[allow(dead_code)] - pub async fn dump(&self) -> Result<()> { + pub async fn dump(&self, ctx: &RequestContext) -> Result<()> { let mut stack = Vec::new(); - let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error); stack.push((self.root_blk, String::new(), 0, 0, 0)); let block_cursor = self.reader.block_cursor(); while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() { - let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?; + let blk = block_cursor.read_blk(self.start_blk + blknum, ctx).await?; let buf: &[u8] = blk.as_ref(); let node = OnDiskNode::::deparse(buf)?; @@ -835,6 +833,8 @@ pub(crate) mod tests { use rand::Rng; use super::*; + use crate::context::DownloadBehavior; + use crate::task_mgr::TaskKind; use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef}; #[derive(Clone, Default)] @@ -869,7 +869,8 @@ pub(crate) mod tests { let mut disk = TestDisk::new(); let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let all_keys: Vec<&[u8; 6]> = vec![ b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb", @@ -887,7 +888,7 @@ pub(crate) mod tests { let reader = DiskBtreeReader::new(0, root_offset, disk); - reader.dump().await?; + reader.dump(&ctx).await?; // Test the `get` function on all the keys. for (key, val) in all_data.iter() { @@ -979,7 +980,8 @@ pub(crate) mod tests { async fn lots_of_keys() -> Result<()> { let mut disk = TestDisk::new(); let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); const NUM_KEYS: u64 = 1000; @@ -997,7 +999,7 @@ pub(crate) mod tests { let reader = DiskBtreeReader::new(0, root_offset, disk); - reader.dump().await?; + reader.dump(&ctx).await?; use std::sync::Mutex; @@ -1167,7 +1169,8 @@ pub(crate) mod tests { // Build a tree from it let mut disk = TestDisk::new(); let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); for (key, val) in disk_btree_test_data::TEST_DATA { writer.append(&key, val)?; @@ -1198,7 +1201,7 @@ pub(crate) mod tests { .await?; assert_eq!(count, disk_btree_test_data::TEST_DATA.len()); - reader.dump().await?; + reader.dump(&ctx).await?; Ok(()) } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index cb25fa6185..f048a355a8 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -351,7 +351,8 @@ mod tests { let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap(); fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?; - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); Ok((conf, tenant_shard_id, timeline_id, ctx)) } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 5f3a0932c4..1cf0241631 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -491,7 +491,10 @@ impl JobGenerator TenantDownloader<'a> { // Download the layers in the heatmap for timeline in heatmap.timelines { + let ctx = &ctx.with_scope_secondary_timeline(tenant_shard_id, &timeline.timeline_id); let timeline_state = timeline_states .remove(&timeline.timeline_id) .expect("Just populated above"); diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index ed6b351c75..8cc94b4e4d 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -474,7 +474,7 @@ async fn fill_logical_sizes( if cached_size.is_none() { let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap()); let parallel_size_calcs = Arc::clone(limit); - let ctx = ctx.attached_child(); + let ctx = ctx.attached_child().with_scope_timeline(&timeline); joinset.spawn( calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx) .in_current_span(), diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 83ac6aab51..62adae1680 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -1334,7 +1334,7 @@ impl DeltaLayerInner { block_reader, ); - tree_reader.dump().await?; + tree_reader.dump(ctx).await?; let keys = self.index_entries(ctx).await?; @@ -1972,6 +1972,7 @@ pub(crate) mod test { .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, ctx) .await .unwrap(); + let ctx = &ctx.with_scope_timeline(&timeline); let initdb_layer = timeline .layers diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 0db9e8c845..2e6cee036c 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -199,7 +199,7 @@ impl ImageLayerInner { block_reader, ); - tree_reader.dump().await?; + tree_reader.dump(ctx).await?; tree_reader .visit( diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index a7f3c6b8c5..7086429bfe 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -8,7 +8,6 @@ use utils::id::TimelineId; use super::failpoints::{Failpoint, FailpointKind}; use super::*; use crate::context::DownloadBehavior; -use crate::task_mgr::TaskKind; use crate::tenant::harness::{TenantHarness, test_img}; use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint}; @@ -27,11 +26,9 @@ async fn smoke_test() { let h = TenantHarness::create("smoke_test").await.unwrap(); let span = h.span(); let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1)); - let (tenant, _) = h.load().await; + let (tenant, ctx) = h.load().await; let io_concurrency = IoConcurrency::spawn_for_test(); - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download); - let image_layers = vec![( Lsn(0x40), vec![( @@ -56,6 +53,7 @@ async fn smoke_test() { ) .await .unwrap(); + let ctx = &ctx.with_scope_timeline(&timeline); // Grab one of the timeline's layers to exercise in the test, and the other layer that is just // there to avoid the timeline being illegally empty @@ -94,7 +92,7 @@ async fn smoke_test() { controlfile_keyspace.clone(), Lsn(0x10)..Lsn(0x11), &mut data, - &ctx, + ctx, ) .await .unwrap(); @@ -129,7 +127,7 @@ async fn smoke_test() { controlfile_keyspace.clone(), Lsn(0x10)..Lsn(0x11), &mut data, - &ctx, + ctx, ) .instrument(download_span.clone()) .await @@ -179,7 +177,7 @@ async fn smoke_test() { // plain downloading is rarely needed layer - .download_and_keep_resident(&ctx) + .download_and_keep_resident(ctx) .instrument(download_span) .await .unwrap(); @@ -341,6 +339,7 @@ fn read_wins_pending_eviction() { .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await .unwrap(); + let ctx = ctx.with_scope_timeline(&timeline); let layer = { let mut layers = { @@ -473,6 +472,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) { .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await .unwrap(); + let ctx = ctx.with_scope_timeline(&timeline); let layer = { let mut layers = { @@ -642,12 +642,12 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await .unwrap(); + let ctx = ctx.with_scope_timeline(&timeline); // This test does downloads let ctx = RequestContextBuilder::extend(&ctx) .download_behavior(DownloadBehavior::Download) .build(); - let layer = { let mut layers = { let layers = timeline.layers.read().await; @@ -727,6 +727,7 @@ async fn evict_and_wait_does_not_wait_for_download() { .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx) .await .unwrap(); + let ctx = ctx.with_scope_timeline(&timeline); // This test does downloads let ctx = RequestContextBuilder::extend(&ctx) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 4483ecfe94..e01c3dbd4d 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -287,7 +287,7 @@ pub struct Timeline { // The LSN of gc-compaction that was last applied to this timeline. gc_compaction_state: ArcSwap>, - pub(super) metrics: TimelineMetrics, + pub(crate) metrics: Arc, // `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code // in `crate::page_service` writes these metrics. @@ -2685,14 +2685,14 @@ impl Timeline { } Arc::new_cyclic(|myself| { - let metrics = TimelineMetrics::new( + let metrics = Arc::new(TimelineMetrics::new( &tenant_shard_id, &timeline_id, crate::metrics::EvictionsWithLowResidenceDurationBuilder::new( "mtime", evictions_low_residence_duration_metric_threshold, ), - ); + )); let aux_file_metrics = metrics.aux_file_size_gauge.clone(); let mut result = Timeline { @@ -2876,7 +2876,7 @@ impl Timeline { "layer flush task", async move { let _guard = guard; - let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error); + let background_ctx = RequestContext::todo_child(TaskKind::LayerFlushTask, DownloadBehavior::Error).with_scope_timeline(&self_clone); self_clone.flush_loop(layer_flush_start_rx, &background_ctx).await; let mut flush_loop_state = self_clone.flush_loop_state.lock().unwrap(); assert!(matches!(*flush_loop_state, FlushLoopState::Running{..})); @@ -7127,6 +7127,7 @@ mod tests { ) .await .unwrap(); + let ctx = &ctx.with_scope_timeline(&timeline); // Layer visibility is an input to heatmap generation, so refresh it first timeline.update_layer_visibility().await.unwrap(); @@ -7192,7 +7193,7 @@ mod tests { eprintln!("Downloading {layer} and re-generating heatmap"); - let ctx = &RequestContextBuilder::extend(&ctx) + let ctx = &RequestContextBuilder::extend(ctx) .download_behavior(crate::context::DownloadBehavior::Download) .build(); diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index c9666bb4e1..740f590735 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -11,6 +11,7 @@ use utils::id::TimelineId; use utils::{crashsafe, fs_ext, pausable_failpoint}; use crate::config::PageServerConf; +use crate::context::RequestContext; use crate::task_mgr::{self, TaskKind}; use crate::tenant::metadata::TimelineMetadata; use crate::tenant::remote_timeline_client::{ @@ -291,10 +292,11 @@ impl DeleteTimelineFlow { timeline_id: TimelineId, local_metadata: &TimelineMetadata, remote_client: RemoteTimelineClient, + ctx: &RequestContext, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. - let timeline = tenant + let (timeline, _timeline_ctx) = tenant .create_timeline_struct( timeline_id, local_metadata, @@ -307,6 +309,7 @@ impl DeleteTimelineFlow { crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here None, // doesn't matter what we put here None, // doesn't matter what we put here + ctx, ) .context("create_timeline_struct")?; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 187d9f248e..397e8e8978 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -93,7 +93,8 @@ impl Timeline { } } - let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn); + let ctx = RequestContext::new(TaskKind::Eviction, DownloadBehavior::Warn) + .with_scope_timeline(&self); loop { let policy = self.get_eviction_policy(); let cf = self diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs index dcf17a376c..166917d674 100644 --- a/pageserver/src/tenant/vectored_blob_io.rs +++ b/pageserver/src/tenant/vectored_blob_io.rs @@ -961,7 +961,8 @@ mod tests { } async fn round_trip_test_compressed(blobs: &[Vec], compression: bool) -> Result<(), Error> { - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let (_temp_dir, pathbuf, offsets) = write_maybe_compressed::(blobs, compression, &ctx).await?; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index b47aecf8a6..1da3130df0 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -26,15 +26,14 @@ use owned_buffers_io::io_buf_aligned::{IoBufAligned, IoBufAlignedMut}; use owned_buffers_io::io_buf_ext::FullSlice; use pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT; pub use pageserver_api::models::virtual_file as api; -use pageserver_api::shard::TenantShardId; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use tokio::time::Instant; use tokio_epoll_uring::{BoundedBuf, IoBuf, IoBufMut, Slice}; +use crate::assert_u64_eq_usize::UsizeIsU64; use crate::context::RequestContext; -use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC, StorageIoOperation}; +use crate::metrics::{STORAGE_IO_TIME_METRIC, StorageIoOperation}; use crate::page_cache::{PAGE_SZ, PageWriteGuard}; -use crate::tenant::TENANTS_SEGMENT_NAME; pub(crate) mod io_engine; pub use io_engine::{ FeatureTestResult as IoEngineFeatureTestResult, feature_test as io_engine_feature_test, @@ -121,7 +120,7 @@ impl VirtualFile { pub async fn open_with_options>( path: P, open_options: &OpenOptions, - ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ + ctx: &RequestContext, ) -> Result { let inner = VirtualFileInner::open_with_options(path, open_options, ctx).await?; Ok(VirtualFile { @@ -133,7 +132,7 @@ impl VirtualFile { pub async fn open_with_options_v2>( path: P, open_options: &OpenOptions, - ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ + ctx: &RequestContext, ) -> Result { let file = match get_io_mode() { IoMode::Buffered => { @@ -304,13 +303,6 @@ pub struct VirtualFileInner { /// storing it here. pub path: Utf8PathBuf, open_options: OpenOptions, - - // These are strings becase we only use them for metrics, and those expect strings. - // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into - // strings. - tenant_id: String, - shard_id: String, - timeline_id: String, } #[derive(Debug, PartialEq, Clone, Copy)] @@ -592,36 +584,16 @@ impl VirtualFileInner { pub async fn open_with_options>( path: P, open_options: &OpenOptions, - _ctx: &RequestContext, /* TODO: carry a pointer to the metrics in the RequestContext instead of the parsing https://github.com/neondatabase/neon/issues/6107 */ + _ctx: &RequestContext, ) -> Result { - let path_ref = path.as_ref(); - let path_str = path_ref.to_string(); - let parts = path_str.split('/').collect::>(); - let (tenant_id, shard_id, timeline_id) = - if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME { - let tenant_shard_part = parts[parts.len() - 4]; - let (tenant_id, shard_id) = match tenant_shard_part.parse::() { - Ok(tenant_shard_id) => ( - tenant_shard_id.tenant_id.to_string(), - format!("{}", tenant_shard_id.shard_slug()), - ), - Err(_) => { - // Malformed path: this ID is just for observability, so tolerate it - // and pass through - (tenant_shard_part.to_string(), "*".to_string()) - } - }; - (tenant_id, shard_id, parts[parts.len() - 2].to_string()) - } else { - ("*".to_string(), "*".to_string(), "*".to_string()) - }; + let path = path.as_ref(); let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case // where our caller doesn't get to use the returned VirtualFile before its // slot gets re-used by someone else. let file = observe_duration!(StorageIoOperation::Open, { - open_options.open(path_ref.as_std_path()).await? + open_options.open(path.as_std_path()).await? }); // Strip all options other than read and write. @@ -637,11 +609,8 @@ impl VirtualFileInner { let vfile = VirtualFileInner { handle: RwLock::new(handle), pos: 0, - path: path_ref.to_path_buf(), + path: path.to_owned(), open_options: reopen_options, - tenant_id, - shard_id, - timeline_id, }; // TODO: Under pressure, it's likely the slot will get re-used and @@ -944,7 +913,7 @@ impl VirtualFileInner { &self, buf: tokio_epoll_uring::Slice, offset: u64, - _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ + ctx: &RequestContext, ) -> (tokio_epoll_uring::Slice, Result) where Buf: tokio_epoll_uring::IoBufMut + Send, @@ -962,14 +931,7 @@ impl VirtualFileInner { let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await; let res = res.maybe_fatal_err("io_engine read_at inside VirtualFileInner::read_at"); if let Ok(size) = res { - STORAGE_IO_SIZE - .with_label_values(&[ - "read", - &self.tenant_id, - &self.shard_id, - &self.timeline_id, - ]) - .add(size as i64); + ctx.io_size_metrics().read.add(size.into_u64()); } (buf, res) }) @@ -980,9 +942,9 @@ impl VirtualFileInner { &self, buf: FullSlice, offset: u64, - _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ + ctx: &RequestContext, ) -> (FullSlice, Result) { - let (slice, result) = self.write_at_inner(buf, offset, _ctx).await; + let (slice, result) = self.write_at_inner(buf, offset, ctx).await; let result = result.maybe_fatal_err("write_at"); (slice, result) } @@ -991,7 +953,7 @@ impl VirtualFileInner { &self, buf: FullSlice, offset: u64, - _ctx: &RequestContext, /* TODO: use for metrics: https://github.com/neondatabase/neon/issues/6107 */ + ctx: &RequestContext, ) -> (FullSlice, Result) { let file_guard = match self.lock_file().await { Ok(file_guard) => file_guard, @@ -1001,14 +963,7 @@ impl VirtualFileInner { let ((_file_guard, buf), result) = io_engine::get().write_at(file_guard, offset, buf).await; if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&[ - "write", - &self.tenant_id, - &self.shard_id, - &self.timeline_id, - ]) - .add(size as i64); + ctx.io_size_metrics().write.add(size.into_u64()); } (buf, result) }) @@ -1593,7 +1548,8 @@ mod tests { where A: Adapter, { - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let testdir = crate::config::PageServerConf::test_repo_dir(testname); std::fs::create_dir_all(&testdir)?; @@ -1720,7 +1676,8 @@ mod tests { const THREADS: usize = 100; const SAMPLE: [u8; SIZE] = [0xADu8; SIZE]; - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let testdir = crate::config::PageServerConf::test_repo_dir("vfile_concurrency"); std::fs::create_dir_all(&testdir)?; @@ -1779,7 +1736,8 @@ mod tests { #[tokio::test] async fn test_atomic_overwrite_basic() { - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic"); std::fs::create_dir_all(&testdir).unwrap(); @@ -1807,7 +1765,8 @@ mod tests { #[tokio::test] async fn test_atomic_overwrite_preexisting_tmp() { - let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error); + let ctx = + RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test(); let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp"); std::fs::create_dir_all(&testdir).unwrap();