From f09843ef17003f015b7469ffc0437794997e2e45 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 27 Feb 2025 10:26:25 +0100 Subject: [PATCH] refactor(pageserver): propagate RequestContext to layer downloads (#11001) For some reason the layer download API never fully got `RequestContext`-infected. This PR fixes that as a precursor to - https://github.com/neondatabase/neon/issues/6107 --- pageserver/compaction/src/compact_tiered.rs | 4 +- pageserver/compaction/src/interface.rs | 1 + pageserver/compaction/src/simulator.rs | 1 + pageserver/src/http/routes.rs | 18 +++++--- .../src/tenant/storage_layer/delta_layer.rs | 2 +- pageserver/src/tenant/storage_layer/layer.rs | 46 +++++++++---------- .../src/tenant/storage_layer/layer/tests.rs | 22 ++++++--- pageserver/src/tenant/timeline.rs | 21 +++++++-- pageserver/src/tenant/timeline/compaction.rs | 11 +++-- .../src/tenant/timeline/detach_ancestor.rs | 2 +- .../timeline/heatmap_layers_downloader.rs | 9 +++- 11 files changed, 84 insertions(+), 53 deletions(-) diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs index 7779ffaf8b..02b11910ce 100644 --- a/pageserver/compaction/src/compact_tiered.rs +++ b/pageserver/compaction/src/compact_tiered.rs @@ -307,7 +307,7 @@ where let mut layer_ids: Vec = Vec::new(); for layer_id in &job.input_layers { let layer = &self.layers[layer_id.0].layer; - if let Some(dl) = self.executor.downcast_delta_layer(layer).await? { + if let Some(dl) = self.executor.downcast_delta_layer(layer, ctx).await? { deltas.push(dl.clone()); layer_ids.push(*layer_id); } @@ -536,7 +536,7 @@ where let mut deltas: Vec = Vec::new(); for layer_id in &job.input_layers { let l = &self.layers[layer_id.0]; - if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? { + if let Some(dl) = self.executor.downcast_delta_layer(&l.layer, ctx).await? { deltas.push(dl.clone()); } } diff --git a/pageserver/compaction/src/interface.rs b/pageserver/compaction/src/interface.rs index 8ed393a645..92723faeaf 100644 --- a/pageserver/compaction/src/interface.rs +++ b/pageserver/compaction/src/interface.rs @@ -55,6 +55,7 @@ pub trait CompactionJobExecutor { fn downcast_delta_layer( &self, layer: &Self::Layer, + ctx: &Self::RequestContext, ) -> impl Future>> + Send; // ---- diff --git a/pageserver/compaction/src/simulator.rs b/pageserver/compaction/src/simulator.rs index 673b80c313..341fceba6f 100644 --- a/pageserver/compaction/src/simulator.rs +++ b/pageserver/compaction/src/simulator.rs @@ -487,6 +487,7 @@ impl interface::CompactionJobExecutor for MockTimeline { async fn downcast_delta_layer( &self, layer: &MockLayer, + _ctx: &MockRequestContext, ) -> anyhow::Result>> { Ok(match layer { MockLayer::Delta(l) => Some(l.clone()), diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index dd5a24a41f..b738d22740 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -927,11 +927,10 @@ async fn get_lsn_by_timestamp_handler( let with_lease = parse_query_param(&request, "with_lease")?.unwrap_or(false); - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); - let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let result = timeline .find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx) .await?; @@ -1000,10 +999,10 @@ async fn get_timestamp_of_lsn_handler( .with_context(|| format!("Invalid LSN: {lsn_str:?}")) .map_err(ApiError::BadRequest)?; - let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?; match result { @@ -1368,7 +1367,7 @@ async fn timeline_layer_scan_disposable_keys( }; let resident_layer = layer - .download_and_keep_resident() + .download_and_keep_resident(&ctx) .await .map_err(|err| match err { tenant::storage_layer::layer::DownloadError::TimelineShutdown @@ -1443,6 +1442,7 @@ async fn timeline_download_heatmap_layers_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let max_concurrency = get_config(&request) .remote_storage_config @@ -1451,7 +1451,9 @@ async fn timeline_download_heatmap_layers_handler( .unwrap_or(DEFAULT_MAX_CONCURRENCY); let concurrency = std::cmp::min(max_concurrency, desired_concurrency); - timeline.start_heatmap_layers_download(concurrency).await?; + timeline + .start_heatmap_layers_download(concurrency, &ctx) + .await?; json_response(StatusCode::ACCEPTED, ()) } @@ -1490,8 +1492,9 @@ async fn layer_download_handler( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); let downloaded = timeline - .download_layer(&layer_name) + .download_layer(&layer_name, &ctx) .await .map_err(|e| match e { tenant::storage_layer::layer::DownloadError::TimelineShutdown @@ -2389,7 +2392,8 @@ async fn timeline_download_remote_layers_handler_post( let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id) .await?; - match timeline.spawn_download_all_remote_layers(body).await { + let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download); + match timeline.spawn_download_all_remote_layers(body, &ctx).await { Ok(st) => json_response(StatusCode::ACCEPTED, st), Err(st) => json_response(StatusCode::CONFLICT, st), } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index d9afdc2405..83ac6aab51 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -2083,7 +2083,7 @@ pub(crate) mod test { .await .unwrap(); - let new_layer = new_layer.download_and_keep_resident().await.unwrap(); + let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap(); new_layer .copy_delta_prefix(&mut writer, truncate_at, ctx) diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index ae06aca63b..bde7fbc1f9 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -324,16 +324,16 @@ impl Layer { reconstruct_data: &mut ValuesReconstructState, ctx: &RequestContext, ) -> Result<(), GetVectoredError> { - let downloaded = self - .0 - .get_or_maybe_download(true, Some(ctx)) - .await - .map_err(|err| match err { - DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => { - GetVectoredError::Cancelled - } - other => GetVectoredError::Other(anyhow::anyhow!(other)), - })?; + let downloaded = + self.0 + .get_or_maybe_download(true, ctx) + .await + .map_err(|err| match err { + DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => { + GetVectoredError::Cancelled + } + other => GetVectoredError::Other(anyhow::anyhow!(other)), + })?; let this = ResidentLayer { downloaded: downloaded.clone(), owner: self.clone(), @@ -356,8 +356,8 @@ impl Layer { /// Download the layer if evicted. /// /// Will not error when the layer is already downloaded. - pub(crate) async fn download(&self) -> Result<(), DownloadError> { - self.0.get_or_maybe_download(true, None).await?; + pub(crate) async fn download(&self, ctx: &RequestContext) -> Result<(), DownloadError> { + self.0.get_or_maybe_download(true, ctx).await?; Ok(()) } @@ -392,8 +392,11 @@ impl Layer { } /// Downloads if necessary and creates a guard, which will keep this layer from being evicted. - pub(crate) async fn download_and_keep_resident(&self) -> Result { - let downloaded = self.0.get_or_maybe_download(true, None).await?; + pub(crate) async fn download_and_keep_resident( + &self, + ctx: &RequestContext, + ) -> Result { + let downloaded = self.0.get_or_maybe_download(true, ctx).await?; Ok(ResidentLayer { downloaded, @@ -446,7 +449,7 @@ impl Layer { if verbose { // for now, unconditionally download everything, even if that might not be wanted. - let l = self.0.get_or_maybe_download(true, Some(ctx)).await?; + let l = self.0.get_or_maybe_download(true, ctx).await?; l.dump(&self.0, ctx).await? } @@ -945,7 +948,7 @@ impl LayerInner { async fn get_or_maybe_download( self: &Arc, allow_download: bool, - ctx: Option<&RequestContext>, + ctx: &RequestContext, ) -> Result, DownloadError> { let (weak, permit) = { // get_or_init_detached can: @@ -1035,21 +1038,14 @@ impl LayerInner { return Err(DownloadError::NotFile(ft)); } - if let Some(ctx) = ctx { - self.check_expected_download(ctx)?; - } + self.check_expected_download(ctx)?; if !allow_download { // this is only used from tests, but it is hard to test without the boolean return Err(DownloadError::DownloadRequired); } - let download_ctx = ctx - .map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download)) - .unwrap_or(RequestContext::new( - TaskKind::LayerDownload, - DownloadBehavior::Download, - )); + let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download); async move { tracing::info!(%reason, "downloading on-demand"); diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 724150d27f..d43dfefdbc 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -178,7 +178,7 @@ async fn smoke_test() { // plain downloading is rarely needed layer - .download_and_keep_resident() + .download_and_keep_resident(&ctx) .instrument(download_span) .await .unwrap(); @@ -379,7 +379,7 @@ fn read_wins_pending_eviction() { // because no actual eviction happened, we get to just reinitialize the DownloadedLayer layer .0 - .get_or_maybe_download(false, None) + .get_or_maybe_download(false, &ctx) .instrument(download_span) .await .expect("should had reinitialized without downloading"); @@ -514,7 +514,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) { // because no actual eviction happened, we get to just reinitialize the DownloadedLayer layer .0 - .get_or_maybe_download(false, None) + .get_or_maybe_download(false, &ctx) .instrument(download_span) .await .expect("should had reinitialized without downloading"); @@ -642,6 +642,11 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { .await .unwrap(); + // This test does downloads + let ctx = RequestContextBuilder::extend(&ctx) + .download_behavior(DownloadBehavior::Download) + .build(); + let layer = { let mut layers = { let layers = timeline.layers.read().await; @@ -674,7 +679,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { // simulate a cancelled read which is cancelled before it gets to re-initialize let e = layer .0 - .get_or_maybe_download(false, None) + .get_or_maybe_download(false, &ctx) .await .unwrap_err(); assert!( @@ -698,7 +703,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() { // failpoint is still enabled, but it is not hit let e = layer .0 - .get_or_maybe_download(false, None) + .get_or_maybe_download(false, &ctx) .await .unwrap_err(); assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}"); @@ -722,6 +727,11 @@ async fn evict_and_wait_does_not_wait_for_download() { .await .unwrap(); + // This test does downloads + let ctx = RequestContextBuilder::extend(&ctx) + .download_behavior(DownloadBehavior::Download) + .build(); + let layer = { let mut layers = { let layers = timeline.layers.read().await; @@ -768,7 +778,7 @@ async fn evict_and_wait_does_not_wait_for_download() { let mut download = std::pin::pin!( layer .0 - .get_or_maybe_download(true, None) + .get_or_maybe_download(true, &ctx) .instrument(download_span) ); diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index dfa50d498c..3164cdbdd2 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2197,6 +2197,7 @@ impl Timeline { pub(crate) async fn download_layer( &self, layer_file_name: &LayerName, + ctx: &RequestContext, ) -> Result, super::storage_layer::layer::DownloadError> { let Some(layer) = self .find_layer(layer_file_name) @@ -2210,7 +2211,7 @@ impl Timeline { return Ok(None); }; - layer.download().await?; + layer.download(ctx).await?; Ok(Some(true)) } @@ -6210,6 +6211,7 @@ impl Timeline { pub(crate) async fn spawn_download_all_remote_layers( self: Arc, request: DownloadRemoteLayersTaskSpawnRequest, + ctx: &RequestContext, ) -> Result { use pageserver_api::models::DownloadRemoteLayersTaskState; @@ -6230,6 +6232,10 @@ impl Timeline { } let self_clone = Arc::clone(&self); + let task_ctx = ctx.detached_child( + TaskKind::DownloadAllRemoteLayers, + DownloadBehavior::Download, + ); let task_id = task_mgr::spawn( task_mgr::BACKGROUND_RUNTIME.handle(), task_mgr::TaskKind::DownloadAllRemoteLayers, @@ -6237,7 +6243,7 @@ impl Timeline { Some(self.timeline_id), "download all remote layers task", async move { - self_clone.download_all_remote_layers(request).await; + self_clone.download_all_remote_layers(request, &task_ctx).await; let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap(); match &mut *status_guard { None => { @@ -6272,6 +6278,7 @@ impl Timeline { async fn download_all_remote_layers( self: &Arc, request: DownloadRemoteLayersTaskSpawnRequest, + ctx: &RequestContext, ) { use pageserver_api::models::DownloadRemoteLayersTaskState; @@ -6328,9 +6335,10 @@ impl Timeline { let span = tracing::info_span!("download", layer = %next); + let ctx = ctx.attached_child(); js.spawn( async move { - let res = next.download().await; + let res = next.download(&ctx).await; (next, res) } .instrument(span), @@ -6920,6 +6928,7 @@ mod tests { use utils::lsn::Lsn; use super::HeatMapTimeline; + use crate::context::RequestContextBuilder; use crate::tenant::harness::{TenantHarness, test_img}; use crate::tenant::layer_map::LayerMap; use crate::tenant::storage_layer::{Layer, LayerName, LayerVisibilityHint}; @@ -7056,8 +7065,12 @@ mod tests { eprintln!("Downloading {layer} and re-generating heatmap"); + let ctx = &RequestContextBuilder::extend(&ctx) + .download_behavior(crate::context::DownloadBehavior::Download) + .build(); + let _resident = layer - .download_and_keep_resident() + .download_and_keep_resident(ctx) .instrument(tracing::info_span!( parent: None, "download_layer", diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 091bd583d7..3f2f1a6e5f 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1153,7 +1153,7 @@ impl Timeline { // - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are: // - GC, which at worst witnesses us "undelete" a layer that they just deleted. // - ingestion, which only inserts layers, therefore cannot collide with us. - let resident = layer.download_and_keep_resident().await?; + let resident = layer.download_and_keep_resident(ctx).await?; let keys_written = resident .filter(&self.shard_identity, &mut image_layer_writer, ctx) @@ -1381,14 +1381,14 @@ impl Timeline { let mut fully_compacted = true; - deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?); + deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?); for l in level0_deltas_iter { let lsn_range = &l.layer_desc().lsn_range; if lsn_range.start != prev_lsn_end { break; } - deltas_to_compact.push(l.download_and_keep_resident().await?); + deltas_to_compact.push(l.download_and_keep_resident(ctx).await?); deltas_to_compact_bytes += l.metadata().file_size; prev_lsn_end = lsn_range.end; @@ -2828,7 +2828,7 @@ impl Timeline { total_downloaded_size += layer.layer_desc().file_size; } total_layer_size += layer.layer_desc().file_size; - let resident_layer = layer.download_and_keep_resident().await?; + let resident_layer = layer.download_and_keep_resident(ctx).await?; downloaded_layers.push(resident_layer); } info!( @@ -3404,6 +3404,7 @@ impl CompactionJobExecutor for TimelineAdaptor { async fn downcast_delta_layer( &self, layer: &OwnArc, + ctx: &RequestContext, ) -> anyhow::Result> { // this is a lot more complex than a simple downcast... if layer.is_delta() { @@ -3411,7 +3412,7 @@ impl CompactionJobExecutor for TimelineAdaptor { let guard = self.timeline.layers.read().await; guard.get_from_desc(layer) }; - let result = l.download_and_keep_resident().await?; + let result = l.download_and_keep_resident(ctx).await?; Ok(Some(ResidentDeltaLayer(result))) } else { diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 71bd196344..b08003d04a 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -629,7 +629,7 @@ async fn copy_lsn_prefix( .with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}")) .map_err(Error::Prepare)?; - let resident = layer.download_and_keep_resident().await.map_err(|e| { + let resident = layer.download_and_keep_resident(ctx).await.map_err(|e| { if e.is_cancelled() { Error::ShuttingDown } else { diff --git a/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs b/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs index 27243ba378..184c830464 100644 --- a/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs +++ b/pageserver/src/tenant/timeline/heatmap_layers_downloader.rs @@ -10,6 +10,8 @@ use http_utils::error::ApiError; use tokio_util::sync::CancellationToken; use utils::sync::gate::Gate; +use crate::context::RequestContext; + use super::Timeline; // This status is not strictly necessary now, but gives us a nice place @@ -30,6 +32,7 @@ impl HeatmapLayersDownloader { fn new( timeline: Arc, concurrency: usize, + ctx: RequestContext, ) -> Result { let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?; @@ -63,6 +66,7 @@ impl HeatmapLayersDownloader { let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map( |layer| { + let ctx = ctx.attached_child(); let tl = timeline.clone(); let dl_guard = match downloads_guard.enter() { Ok(g) => g, @@ -75,7 +79,7 @@ impl HeatmapLayersDownloader { Some(async move { let _dl_guard = dl_guard; - let res = tl.download_layer(&layer.name).await; + let res = tl.download_layer(&layer.name, &ctx).await; if let Err(err) = res { if !err.is_cancelled() { tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}") @@ -139,10 +143,11 @@ impl Timeline { pub(crate) async fn start_heatmap_layers_download( self: &Arc, concurrency: usize, + ctx: &RequestContext, ) -> Result<(), ApiError> { let mut locked = self.heatmap_layers_downloader.lock().unwrap(); if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) { - let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?; + let dl = HeatmapLayersDownloader::new(self.clone(), concurrency, ctx.attached_child())?; *locked = Some(dl); Ok(()) } else {