From b688d1e80c6849d6d0eb3eef810973643f55851a Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Mon, 24 Feb 2025 18:09:24 +0100 Subject: [PATCH] half-borked attempt at propagating to more parts of the implementation; page_service's shard swapping turns out to be painful / requires `Handle: Clone`, don't want that --- pageserver/src/context.rs | 44 ++++++++++++++++-- pageserver/src/http/routes.rs | 4 +- pageserver/src/page_service.rs | 36 ++++++++++----- pageserver/src/tenant.rs | 57 +++++++++++++++--------- pageserver/src/tenant/ephemeral_file.rs | 1 + pageserver/src/tenant/timeline.rs | 17 +------ pageserver/src/tenant/timeline/delete.rs | 12 ++--- 7 files changed, 115 insertions(+), 56 deletions(-) diff --git a/pageserver/src/context.rs b/pageserver/src/context.rs index 3aaa131766..e1d08dfc5e 100644 --- a/pageserver/src/context.rs +++ b/pageserver/src/context.rs @@ -92,6 +92,7 @@ use std::sync::Arc; use once_cell::sync::Lazy; +use tracing::warn; use crate::{ task_mgr::TaskKind, @@ -120,6 +121,10 @@ pub(crate) enum Scope { Timeline { timeline: Arc, }, + TimelineHandle { + timeline_handle: + crate::tenant::timeline::handle::Handle, + }, } impl Scope { @@ -140,17 +145,22 @@ impl Scope { timeline: Arc::clone(timeline), } } + pub(crate) fn new_timeline_handle( + timeline_handle: crate::tenant::timeline::handle::Handle< + crate::page_service::TenantManagerTypes, + >, + ) -> Self { + Scope::TimelineHandle { timeline_handle } + } pub(crate) fn io_size_metrics(&self) -> &crate::metrics::StorageIoSizeMetrics { match self { Scope::Global { io_size_metrics } => io_size_metrics, Scope::Tenant { tenant } => &tenant.virtual_file_io_metrics, Scope::Timeline { timeline } => &timeline.metrics.storage_io_size, + Scope::TimelineHandle { timeline_handle } => &timeline_handle.metrics.storage_io_size, } } - pub(crate) fn is_timeline(&self) -> bool { - matches!(self, Scope::Timeline { .. }) - } } /// The kind of access to the page cache. @@ -366,4 +376,32 @@ impl RequestContext { pub(crate) fn scope(&self) -> &Scope { &self.scope } + + pub(crate) fn scope_mut(&mut self) -> &mut Scope { + &mut self.scope + } + + pub(crate) fn assert_is_timeline_scoped(&self, what: &str) { + if let Scope::Timeline { .. } = self.scope() { + return; + } + if cfg!(debug_assertions) || cfg!(feature = "testing") { + panic!("RequestContext must be timeline-scoped what={what}"); + } else { + use once_cell::sync::Lazy; + use std::sync::Mutex; + use std::time::Duration; + use utils::rate_limit::RateLimit; + static LIMIT: Lazy> = + Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1)))); + let mut guard = LIMIT.lock().unwrap(); + guard.call2(|rate_limit_stats| { + warn!( + %rate_limit_stats, + what, + "RequestContext must be timeline-scoped", + ); + }); + } + } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e49a0e969d..9d2f04dc4d 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3286,7 +3286,7 @@ async fn put_tenant_timeline_import_basebackup( tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?; - let timeline = tenant + let (timeline, timeline_ctx) = tenant .create_empty_timeline(timeline_id, base_lsn, pg_version, &ctx) .map_err(ApiError::InternalServerError) .await?; @@ -3305,7 +3305,7 @@ async fn put_tenant_timeline_import_basebackup( info!("importing basebackup"); timeline - .import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &ctx) + .import_basebackup_from_tar(tenant.clone(), &mut body, base_lsn, broker_client, &timeline_ctx) .await .map_err(ApiError::InternalServerError)?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7285697040..d918972c17 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -53,7 +53,7 @@ use utils::{ use crate::auth::check_permission; use crate::basebackup::BasebackupError; use crate::config::PageServerConf; -use crate::context::{DownloadBehavior, RequestContext}; +use crate::context::{self, DownloadBehavior, RequestContext, RequestContextBuilder}; use crate::metrics::{self, SmgrOpTimer}; use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS}; use crate::pgdatadir_mapping::Version; @@ -1086,6 +1086,16 @@ impl PageServerHandler { batch }; + macro_rules! upgrade_handle_and_set_context { + ($shard:expr) => {{ + let shard = $shard.upgrade()?; + let ctx = RequestContextBuilder::extend(ctx) + .scope(context::Scope::new_timeline_handle(shard.clone())) + .build(); + (shard, ctx) + }}; + } + // invoke handler function let (mut handler_results, span): ( Vec>, @@ -1098,9 +1108,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::exists"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![self - .handle_get_rel_exists_request(&*shard.upgrade()?, &req, ctx) + .handle_get_rel_exists_request(&*shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1115,9 +1126,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::nblocks"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![self - .handle_get_nblocks_request(&*shard.upgrade()?, &req, ctx) + .handle_get_nblocks_request(&*shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1132,17 +1144,18 @@ impl PageServerHandler { pages, } => { fail::fail_point!("ps::handle-pagerequest-message::getpage"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( { let npages = pages.len(); trace!(npages, "handling getpage request"); let res = self .handle_get_page_at_lsn_request_batched( - &*shard.upgrade()?, + &*shard, effective_request_lsn, pages, io_concurrency, - ctx, + &ctx, ) .instrument(span.clone()) .await; @@ -1159,9 +1172,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::dbsize"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![self - .handle_db_size_request(&*shard.upgrade()?, &req, ctx) + .handle_db_size_request(&*shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1176,9 +1190,10 @@ impl PageServerHandler { req, } => { fail::fail_point!("ps::handle-pagerequest-message::slrusegment"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( vec![self - .handle_get_slru_segment_request(&*shard.upgrade()?, &req, ctx) + .handle_get_slru_segment_request(&*shard, &req, &ctx) .instrument(span.clone()) .await .map(|msg| (msg, timer)) @@ -1193,12 +1208,13 @@ impl PageServerHandler { requests, } => { fail::fail_point!("ps::handle-pagerequest-message::test"); + let (shard, ctx) = upgrade_handle_and_set_context!(shard); ( { let npages = requests.len(); trace!(npages, "handling getpage request"); let res = self - .handle_test_request_batch(&*shard.upgrade()?, requests, ctx) + .handle_test_request_batch(&*shard, requests, ctx) .instrument(span.clone()) .await; assert_eq!(res.len(), npages); @@ -1631,7 +1647,7 @@ impl PageServerHandler { // let executor = pipeline_stage!("executor", self.cancel.clone(), move |cancel| { - let ctx = ctx.attached_child(); + let mut ctx = ctx.attached_child(); async move { let _cancel_batcher = cancel_batcher.drop_guard(); loop { @@ -1658,7 +1674,7 @@ impl PageServerHandler { io_concurrency.clone(), &cancel, protocol_version, - &ctx, + &mut ctx, ), ) .await?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8f484f2dd7..2449df4a26 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1164,7 +1164,7 @@ impl Tenant { } }; - let timeline = self.create_timeline_struct( + let (timeline, timeline_ctx) = self.create_timeline_struct( timeline_id, &metadata, previous_heatmap, @@ -1172,6 +1172,7 @@ impl Tenant { resources, CreateTimelineCause::Load, idempotency.clone(), + ctx, )?; let disk_consistent_lsn = timeline.get_disk_consistent_lsn(); anyhow::ensure!( @@ -1793,6 +1794,7 @@ impl Tenant { timeline_id, &index_part.metadata, remote_timeline_client, + ctx, ) .instrument(tracing::info_span!("timeline_delete", %timeline_id)) .await @@ -2426,8 +2428,8 @@ impl Tenant { new_timeline_id: TimelineId, initdb_lsn: Lsn, pg_version: u32, - _ctx: &RequestContext, - ) -> anyhow::Result { + ctx: &RequestContext, + ) -> anyhow::Result<(UninitializedTimeline, RequestContext)> { anyhow::ensure!( self.is_active(), "Cannot create empty timelines on inactive tenant" @@ -2461,6 +2463,7 @@ impl Tenant { create_guard, initdb_lsn, None, + ctx, ) .await } @@ -2478,7 +2481,7 @@ impl Tenant { pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let uninit_tl = self + let (uninit_tl, ctx) = self .create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx) .await?; let tline = uninit_tl.raw_timeline().expect("we just created it"); @@ -2490,7 +2493,7 @@ impl Tenant { .init_empty_test_timeline() .context("init_empty_test_timeline")?; modification - .commit(ctx) + .commit(&ctx) .await .context("commit init_empty_test_timeline modification")?; @@ -2762,10 +2765,9 @@ impl Tenant { } }; - let mut uninit_timeline = { + let (mut uninit_timeline, timeline_ctx) = { let this = &self; let initdb_lsn = Lsn(0); - let _ctx = ctx; async move { let new_metadata = TimelineMetadata::new( // Initialize disk_consistent LSN to 0, The caller must import some data to @@ -2784,6 +2786,7 @@ impl Tenant { timeline_create_guard, initdb_lsn, None, + &ctx, ) .await } @@ -4130,7 +4133,8 @@ impl Tenant { resources: TimelineResources, cause: CreateTimelineCause, create_idempotency: CreateTimelineIdempotency, - ) -> anyhow::Result> { + ctx: &RequestContext, + ) -> anyhow::Result<(Arc, RequestContext)> { let state = match cause { CreateTimelineCause::Load => { let ancestor_id = new_metadata.ancestor_timeline(); @@ -4164,7 +4168,11 @@ impl Tenant { self.cancel.child_token(), ); - Ok(timeline) + let timeline_ctx = RequestContextBuilder::extend(ctx) + .scope(context::Scope::new_timeline(&timeline)) + .build(); + + Ok((timeline, timeline_ctx)) } /// [`Tenant::shutdown`] must be called before dropping the returned [`Tenant`] object @@ -4283,7 +4291,9 @@ impl Tenant { pagestream_throttle_metrics: Arc::new( crate::metrics::tenant_throttling::Pagestream::new(&tenant_shard_id), ), - virtual_file_io_metrics: crate::metrics::StorageIoSizeMetrics::new_tenant(&tenant_shard_id), + virtual_file_io_metrics: crate::metrics::StorageIoSizeMetrics::new_tenant( + &tenant_shard_id, + ), tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)), ongoing_timeline_detach: std::sync::Mutex::default(), gc_block: Default::default(), @@ -4754,7 +4764,7 @@ impl Tenant { src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, - _ctx: &RequestContext, + ctx: &RequestContext, ) -> Result { let src_id = src_timeline.timeline_id; @@ -4854,13 +4864,14 @@ impl Tenant { src_timeline.pg_version, ); - let uninitialized_timeline = self + let (uninitialized_timeline, _timeline_ctx) = self .prepare_new_timeline( dst_id, &metadata, timeline_create_guard, start_lsn + 1, Some(Arc::clone(src_timeline)), + &ctx, ) .await?; @@ -5125,13 +5136,14 @@ impl Tenant { pgdata_lsn, pg_version, ); - let mut raw_timeline = self + let (mut raw_timeline, timeline_ctx) = self .prepare_new_timeline( timeline_id, &new_metadata, timeline_create_guard, pgdata_lsn, None, + ctx, ) .await?; @@ -5142,7 +5154,7 @@ impl Tenant { &unfinished_timeline, &pgdata_path, pgdata_lsn, - ctx, + &timeline_ctx, ) .await .with_context(|| { @@ -5210,7 +5222,8 @@ impl Tenant { create_guard: TimelineCreateGuard, start_lsn: Lsn, ancestor: Option>, - ) -> anyhow::Result> { + ctx: &RequestContext, + ) -> anyhow::Result<(UninitializedTimeline<'a>, RequestContext)> { let tenant_shard_id = self.tenant_shard_id; let resources = self.build_timeline_resources(new_timeline_id); @@ -5218,7 +5231,7 @@ impl Tenant { .remote_client .init_upload_queue_for_empty_remote(new_metadata)?; - let timeline_struct = self + let (timeline_struct, timeline_ctx) = self .create_timeline_struct( new_timeline_id, new_metadata, @@ -5227,6 +5240,7 @@ impl Tenant { resources, CreateTimelineCause::Load, create_guard.idempotency.clone(), + ctx, ) .context("Failed to create timeline data structure")?; @@ -5245,10 +5259,13 @@ impl Tenant { "Successfully created initial files for timeline {tenant_shard_id}/{new_timeline_id}" ); - Ok(UninitializedTimeline::new( - self, - new_timeline_id, - Some((timeline_struct, create_guard)), + Ok(( + UninitializedTimeline::new( + self, + new_timeline_id, + Some((timeline_struct, create_guard)), + ), + timeline_ctx, )) } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index ba79672bc7..60883f8aac 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -132,6 +132,7 @@ impl EphemeralFile { srcbuf: &[u8], ctx: &RequestContext, ) -> std::io::Result { + ctx.assert_is_timeline_scoped("EphemeralFile::write_raw"); let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?; if let Some(control) = control { control.release().await; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b60da0463c..ad93661204 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1131,21 +1131,6 @@ impl Timeline { lsn: Lsn, ctx: &RequestContext, ) -> Result { - if !ctx.scope().is_timeline() { - if cfg!(debug_assertions) || cfg!(feature = "testing") { - panic!("get() called with RequestContext in non-timeline scope"); - } else { - static LIMIT: Lazy> = - Lazy::new(|| Mutex::new(RateLimit::new(Duration::from_secs(1)))); - let mut guard = LIMIT.lock().unwrap(); - guard.call2(|rate_limit_stats| { - warn!( - %rate_limit_stats, - "get() called with RequestContext in non-timeline scope", - ); - }); - } - } if !lsn.is_valid() { return Err(PageReconstructError::Other(anyhow::anyhow!("Invalid LSN"))); } @@ -1314,6 +1299,8 @@ impl Timeline { reconstruct_state: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result>, GetVectoredError> { + ctx.assert_is_timeline_scoped("Timeline::get_vectored_impl"); + let read_path = if self.conf.enable_read_path_debugging || ctx.read_path_debug() { Some(ReadPath::new(keyspace.clone(), lsn)) } else { diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 841b2fa1c7..1761d7bc71 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -6,20 +6,18 @@ use std::{ use anyhow::Context; use pageserver_api::{models::TimelineState, shard::TenantShardId}; use remote_storage::DownloadError; +use reqwest::Request; use tokio::sync::OwnedMutexGuard; use tracing::{error, info, info_span, instrument, Instrument}; use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; use crate::{ - config::PageServerConf, - task_mgr::{self, TaskKind}, - tenant::{ + config::PageServerConf, context::RequestContext, task_mgr::{self, TaskKind}, tenant::{ metadata::TimelineMetadata, remote_timeline_client::{PersistIndexPartWithDeletedFlagError, RemoteTimelineClient}, CreateTimelineCause, DeleteTimelineError, MaybeDeletedIndexPart, Tenant, TenantManifestError, Timeline, TimelineOrOffloaded, - }, - virtual_file::MaybeFatalIo, + }, virtual_file::MaybeFatalIo }; /// Mark timeline as deleted in S3 so we won't pick it up next time @@ -286,10 +284,11 @@ impl DeleteTimelineFlow { timeline_id: TimelineId, local_metadata: &TimelineMetadata, remote_client: RemoteTimelineClient, + ctx: &RequestContext, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. - let timeline = tenant + let (timeline, timeline_ctx) = tenant .create_timeline_struct( timeline_id, local_metadata, @@ -300,6 +299,7 @@ impl DeleteTimelineFlow { // Thus we need to skip the validation here. CreateTimelineCause::Delete, crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here + ctx, ) .context("create_timeline_struct")?;