refactor(pageserver): propagate RequestContext to layer downloads (#11001)

For some reason the layer download API never fully got
`RequestContext`-infected.

This PR fixes that as a precursor to
- https://github.com/neondatabase/neon/issues/6107
This commit is contained in:
Christian Schwarz
2025-02-27 10:26:25 +01:00
committed by GitHub
parent c92a36740b
commit f09843ef17
11 changed files with 84 additions and 53 deletions

View File

@@ -307,7 +307,7 @@ where
let mut layer_ids: Vec<LayerId> = Vec::new();
for layer_id in &job.input_layers {
let layer = &self.layers[layer_id.0].layer;
if let Some(dl) = self.executor.downcast_delta_layer(layer).await? {
if let Some(dl) = self.executor.downcast_delta_layer(layer, ctx).await? {
deltas.push(dl.clone());
layer_ids.push(*layer_id);
}
@@ -536,7 +536,7 @@ where
let mut deltas: Vec<E::DeltaLayer> = Vec::new();
for layer_id in &job.input_layers {
let l = &self.layers[layer_id.0];
if let Some(dl) = self.executor.downcast_delta_layer(&l.layer).await? {
if let Some(dl) = self.executor.downcast_delta_layer(&l.layer, ctx).await? {
deltas.push(dl.clone());
}
}

View File

@@ -55,6 +55,7 @@ pub trait CompactionJobExecutor {
fn downcast_delta_layer(
&self,
layer: &Self::Layer,
ctx: &Self::RequestContext,
) -> impl Future<Output = anyhow::Result<Option<Self::DeltaLayer>>> + Send;
// ----

View File

@@ -487,6 +487,7 @@ impl interface::CompactionJobExecutor for MockTimeline {
async fn downcast_delta_layer(
&self,
layer: &MockLayer,
_ctx: &MockRequestContext,
) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
Ok(match layer {
MockLayer::Delta(l) => Some(l.clone()),

View File

@@ -927,11 +927,10 @@ async fn get_lsn_by_timestamp_handler(
let with_lease = parse_query_param(&request, "with_lease")?.unwrap_or(false);
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let result = timeline
.find_lsn_for_timestamp(timestamp_pg, &cancel, &ctx)
.await?;
@@ -1000,10 +999,10 @@ async fn get_timestamp_of_lsn_handler(
.with_context(|| format!("Invalid LSN: {lsn_str:?}"))
.map_err(ApiError::BadRequest)?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let result = timeline.get_timestamp_for_lsn(lsn, &ctx).await?;
match result {
@@ -1368,7 +1367,7 @@ async fn timeline_layer_scan_disposable_keys(
};
let resident_layer = layer
.download_and_keep_resident()
.download_and_keep_resident(&ctx)
.await
.map_err(|err| match err {
tenant::storage_layer::layer::DownloadError::TimelineShutdown
@@ -1443,6 +1442,7 @@ async fn timeline_download_heatmap_layers_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let max_concurrency = get_config(&request)
.remote_storage_config
@@ -1451,7 +1451,9 @@ async fn timeline_download_heatmap_layers_handler(
.unwrap_or(DEFAULT_MAX_CONCURRENCY);
let concurrency = std::cmp::min(max_concurrency, desired_concurrency);
timeline.start_heatmap_layers_download(concurrency).await?;
timeline
.start_heatmap_layers_download(concurrency, &ctx)
.await?;
json_response(StatusCode::ACCEPTED, ())
}
@@ -1490,8 +1492,9 @@ async fn layer_download_handler(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let downloaded = timeline
.download_layer(&layer_name)
.download_layer(&layer_name, &ctx)
.await
.map_err(|e| match e {
tenant::storage_layer::layer::DownloadError::TimelineShutdown
@@ -2389,7 +2392,8 @@ async fn timeline_download_remote_layers_handler_post(
let timeline =
active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id)
.await?;
match timeline.spawn_download_all_remote_layers(body).await {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
match timeline.spawn_download_all_remote_layers(body, &ctx).await {
Ok(st) => json_response(StatusCode::ACCEPTED, st),
Err(st) => json_response(StatusCode::CONFLICT, st),
}

View File

@@ -2083,7 +2083,7 @@ pub(crate) mod test {
.await
.unwrap();
let new_layer = new_layer.download_and_keep_resident().await.unwrap();
let new_layer = new_layer.download_and_keep_resident(ctx).await.unwrap();
new_layer
.copy_delta_prefix(&mut writer, truncate_at, ctx)

View File

@@ -324,16 +324,16 @@ impl Layer {
reconstruct_data: &mut ValuesReconstructState,
ctx: &RequestContext,
) -> Result<(), GetVectoredError> {
let downloaded = self
.0
.get_or_maybe_download(true, Some(ctx))
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let downloaded =
self.0
.get_or_maybe_download(true, ctx)
.await
.map_err(|err| match err {
DownloadError::TimelineShutdown | DownloadError::DownloadCancelled => {
GetVectoredError::Cancelled
}
other => GetVectoredError::Other(anyhow::anyhow!(other)),
})?;
let this = ResidentLayer {
downloaded: downloaded.clone(),
owner: self.clone(),
@@ -356,8 +356,8 @@ impl Layer {
/// Download the layer if evicted.
///
/// Will not error when the layer is already downloaded.
pub(crate) async fn download(&self) -> Result<(), DownloadError> {
self.0.get_or_maybe_download(true, None).await?;
pub(crate) async fn download(&self, ctx: &RequestContext) -> Result<(), DownloadError> {
self.0.get_or_maybe_download(true, ctx).await?;
Ok(())
}
@@ -392,8 +392,11 @@ impl Layer {
}
/// Downloads if necessary and creates a guard, which will keep this layer from being evicted.
pub(crate) async fn download_and_keep_resident(&self) -> Result<ResidentLayer, DownloadError> {
let downloaded = self.0.get_or_maybe_download(true, None).await?;
pub(crate) async fn download_and_keep_resident(
&self,
ctx: &RequestContext,
) -> Result<ResidentLayer, DownloadError> {
let downloaded = self.0.get_or_maybe_download(true, ctx).await?;
Ok(ResidentLayer {
downloaded,
@@ -446,7 +449,7 @@ impl Layer {
if verbose {
// for now, unconditionally download everything, even if that might not be wanted.
let l = self.0.get_or_maybe_download(true, Some(ctx)).await?;
let l = self.0.get_or_maybe_download(true, ctx).await?;
l.dump(&self.0, ctx).await?
}
@@ -945,7 +948,7 @@ impl LayerInner {
async fn get_or_maybe_download(
self: &Arc<Self>,
allow_download: bool,
ctx: Option<&RequestContext>,
ctx: &RequestContext,
) -> Result<Arc<DownloadedLayer>, DownloadError> {
let (weak, permit) = {
// get_or_init_detached can:
@@ -1035,21 +1038,14 @@ impl LayerInner {
return Err(DownloadError::NotFile(ft));
}
if let Some(ctx) = ctx {
self.check_expected_download(ctx)?;
}
self.check_expected_download(ctx)?;
if !allow_download {
// this is only used from tests, but it is hard to test without the boolean
return Err(DownloadError::DownloadRequired);
}
let download_ctx = ctx
.map(|ctx| ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download))
.unwrap_or(RequestContext::new(
TaskKind::LayerDownload,
DownloadBehavior::Download,
));
let download_ctx = ctx.detached_child(TaskKind::LayerDownload, DownloadBehavior::Download);
async move {
tracing::info!(%reason, "downloading on-demand");

View File

@@ -178,7 +178,7 @@ async fn smoke_test() {
// plain downloading is rarely needed
layer
.download_and_keep_resident()
.download_and_keep_resident(&ctx)
.instrument(download_span)
.await
.unwrap();
@@ -379,7 +379,7 @@ fn read_wins_pending_eviction() {
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
layer
.0
.get_or_maybe_download(false, None)
.get_or_maybe_download(false, &ctx)
.instrument(download_span)
.await
.expect("should had reinitialized without downloading");
@@ -514,7 +514,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
// because no actual eviction happened, we get to just reinitialize the DownloadedLayer
layer
.0
.get_or_maybe_download(false, None)
.get_or_maybe_download(false, &ctx)
.instrument(download_span)
.await
.expect("should had reinitialized without downloading");
@@ -642,6 +642,11 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
.await
.unwrap();
// This test does downloads
let ctx = RequestContextBuilder::extend(&ctx)
.download_behavior(DownloadBehavior::Download)
.build();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
@@ -674,7 +679,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
// simulate a cancelled read which is cancelled before it gets to re-initialize
let e = layer
.0
.get_or_maybe_download(false, None)
.get_or_maybe_download(false, &ctx)
.await
.unwrap_err();
assert!(
@@ -698,7 +703,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
// failpoint is still enabled, but it is not hit
let e = layer
.0
.get_or_maybe_download(false, None)
.get_or_maybe_download(false, &ctx)
.await
.unwrap_err();
assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
@@ -722,6 +727,11 @@ async fn evict_and_wait_does_not_wait_for_download() {
.await
.unwrap();
// This test does downloads
let ctx = RequestContextBuilder::extend(&ctx)
.download_behavior(DownloadBehavior::Download)
.build();
let layer = {
let mut layers = {
let layers = timeline.layers.read().await;
@@ -768,7 +778,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let mut download = std::pin::pin!(
layer
.0
.get_or_maybe_download(true, None)
.get_or_maybe_download(true, &ctx)
.instrument(download_span)
);

View File

@@ -2197,6 +2197,7 @@ impl Timeline {
pub(crate) async fn download_layer(
&self,
layer_file_name: &LayerName,
ctx: &RequestContext,
) -> Result<Option<bool>, super::storage_layer::layer::DownloadError> {
let Some(layer) = self
.find_layer(layer_file_name)
@@ -2210,7 +2211,7 @@ impl Timeline {
return Ok(None);
};
layer.download().await?;
layer.download(ctx).await?;
Ok(Some(true))
}
@@ -6210,6 +6211,7 @@ impl Timeline {
pub(crate) async fn spawn_download_all_remote_layers(
self: Arc<Self>,
request: DownloadRemoteLayersTaskSpawnRequest,
ctx: &RequestContext,
) -> Result<DownloadRemoteLayersTaskInfo, DownloadRemoteLayersTaskInfo> {
use pageserver_api::models::DownloadRemoteLayersTaskState;
@@ -6230,6 +6232,10 @@ impl Timeline {
}
let self_clone = Arc::clone(&self);
let task_ctx = ctx.detached_child(
TaskKind::DownloadAllRemoteLayers,
DownloadBehavior::Download,
);
let task_id = task_mgr::spawn(
task_mgr::BACKGROUND_RUNTIME.handle(),
task_mgr::TaskKind::DownloadAllRemoteLayers,
@@ -6237,7 +6243,7 @@ impl Timeline {
Some(self.timeline_id),
"download all remote layers task",
async move {
self_clone.download_all_remote_layers(request).await;
self_clone.download_all_remote_layers(request, &task_ctx).await;
let mut status_guard = self_clone.download_all_remote_layers_task_info.write().unwrap();
match &mut *status_guard {
None => {
@@ -6272,6 +6278,7 @@ impl Timeline {
async fn download_all_remote_layers(
self: &Arc<Self>,
request: DownloadRemoteLayersTaskSpawnRequest,
ctx: &RequestContext,
) {
use pageserver_api::models::DownloadRemoteLayersTaskState;
@@ -6328,9 +6335,10 @@ impl Timeline {
let span = tracing::info_span!("download", layer = %next);
let ctx = ctx.attached_child();
js.spawn(
async move {
let res = next.download().await;
let res = next.download(&ctx).await;
(next, res)
}
.instrument(span),
@@ -6920,6 +6928,7 @@ mod tests {
use utils::lsn::Lsn;
use super::HeatMapTimeline;
use crate::context::RequestContextBuilder;
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::layer_map::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerName, LayerVisibilityHint};
@@ -7056,8 +7065,12 @@ mod tests {
eprintln!("Downloading {layer} and re-generating heatmap");
let ctx = &RequestContextBuilder::extend(&ctx)
.download_behavior(crate::context::DownloadBehavior::Download)
.build();
let _resident = layer
.download_and_keep_resident()
.download_and_keep_resident(ctx)
.instrument(tracing::info_span!(
parent: None,
"download_layer",

View File

@@ -1153,7 +1153,7 @@ impl Timeline {
// - We do not run concurrently with other kinds of compaction, so the only layer map writes we race with are:
// - GC, which at worst witnesses us "undelete" a layer that they just deleted.
// - ingestion, which only inserts layers, therefore cannot collide with us.
let resident = layer.download_and_keep_resident().await?;
let resident = layer.download_and_keep_resident(ctx).await?;
let keys_written = resident
.filter(&self.shard_identity, &mut image_layer_writer, ctx)
@@ -1381,14 +1381,14 @@ impl Timeline {
let mut fully_compacted = true;
deltas_to_compact.push(first_level0_delta.download_and_keep_resident().await?);
deltas_to_compact.push(first_level0_delta.download_and_keep_resident(ctx).await?);
for l in level0_deltas_iter {
let lsn_range = &l.layer_desc().lsn_range;
if lsn_range.start != prev_lsn_end {
break;
}
deltas_to_compact.push(l.download_and_keep_resident().await?);
deltas_to_compact.push(l.download_and_keep_resident(ctx).await?);
deltas_to_compact_bytes += l.metadata().file_size;
prev_lsn_end = lsn_range.end;
@@ -2828,7 +2828,7 @@ impl Timeline {
total_downloaded_size += layer.layer_desc().file_size;
}
total_layer_size += layer.layer_desc().file_size;
let resident_layer = layer.download_and_keep_resident().await?;
let resident_layer = layer.download_and_keep_resident(ctx).await?;
downloaded_layers.push(resident_layer);
}
info!(
@@ -3404,6 +3404,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
async fn downcast_delta_layer(
&self,
layer: &OwnArc<PersistentLayerDesc>,
ctx: &RequestContext,
) -> anyhow::Result<Option<ResidentDeltaLayer>> {
// this is a lot more complex than a simple downcast...
if layer.is_delta() {
@@ -3411,7 +3412,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
let guard = self.timeline.layers.read().await;
guard.get_from_desc(layer)
};
let result = l.download_and_keep_resident().await?;
let result = l.download_and_keep_resident(ctx).await?;
Ok(Some(ResidentDeltaLayer(result)))
} else {

View File

@@ -629,7 +629,7 @@ async fn copy_lsn_prefix(
.with_context(|| format!("prepare to copy lsn prefix of ancestors {layer}"))
.map_err(Error::Prepare)?;
let resident = layer.download_and_keep_resident().await.map_err(|e| {
let resident = layer.download_and_keep_resident(ctx).await.map_err(|e| {
if e.is_cancelled() {
Error::ShuttingDown
} else {

View File

@@ -10,6 +10,8 @@ use http_utils::error::ApiError;
use tokio_util::sync::CancellationToken;
use utils::sync::gate::Gate;
use crate::context::RequestContext;
use super::Timeline;
// This status is not strictly necessary now, but gives us a nice place
@@ -30,6 +32,7 @@ impl HeatmapLayersDownloader {
fn new(
timeline: Arc<Timeline>,
concurrency: usize,
ctx: RequestContext,
) -> Result<HeatmapLayersDownloader, ApiError> {
let tl_guard = timeline.gate.enter().map_err(|_| ApiError::Cancelled)?;
@@ -63,6 +66,7 @@ impl HeatmapLayersDownloader {
let stream = futures::stream::iter(heatmap.layers.into_iter().filter_map(
|layer| {
let ctx = ctx.attached_child();
let tl = timeline.clone();
let dl_guard = match downloads_guard.enter() {
Ok(g) => g,
@@ -75,7 +79,7 @@ impl HeatmapLayersDownloader {
Some(async move {
let _dl_guard = dl_guard;
let res = tl.download_layer(&layer.name).await;
let res = tl.download_layer(&layer.name, &ctx).await;
if let Err(err) = res {
if !err.is_cancelled() {
tracing::warn!(layer=%layer.name,"Failed to download heatmap layer: {err}")
@@ -139,10 +143,11 @@ impl Timeline {
pub(crate) async fn start_heatmap_layers_download(
self: &Arc<Self>,
concurrency: usize,
ctx: &RequestContext,
) -> Result<(), ApiError> {
let mut locked = self.heatmap_layers_downloader.lock().unwrap();
if locked.as_ref().map(|dl| dl.is_complete()).unwrap_or(true) {
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency)?;
let dl = HeatmapLayersDownloader::new(self.clone(), concurrency, ctx.attached_child())?;
*locked = Some(dl);
Ok(())
} else {