From e67a07f1b7927c0b413daafde007b590109298a8 Mon Sep 17 00:00:00 2001 From: John Spray Date: Thu, 9 May 2024 10:19:34 +0100 Subject: [PATCH] pageserver: refactor secondary layer download into fn --- pageserver/src/tenant/secondary/downloader.rs | 132 ++++++++++-------- 1 file changed, 73 insertions(+), 59 deletions(-) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 2a8f83be95..ac113d16aa 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -874,66 +874,9 @@ impl<'a> TenantDownloader<'a> { } } - // Failpoint for simulating slow remote storage - failpoint_support::sleep_millis_async!( - "secondary-layer-download-sleep", - &self.secondary_state.cancel - ); + self.download_layer(tenant_shard_id, &timeline.timeline_id, &layer, ctx) + .await?; - // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally - let downloaded_bytes = match download_layer_file( - self.conf, - self.remote_storage, - *tenant_shard_id, - timeline.timeline_id, - &layer.name, - &LayerFileMetadata::from(&layer.metadata), - &self.secondary_state.cancel, - ctx, - ) - .await - { - Ok(bytes) => bytes, - Err(DownloadError::NotFound) => { - // A heatmap might be out of date and refer to a layer that doesn't exist any more. - // This is harmless: continue to download the next layer. It is expected during compaction - // GC. - tracing::debug!( - "Skipped downloading missing layer {}, raced with compaction/gc?", - layer.name - ); - continue; - } - Err(e) => return Err(e.into()), - }; - - if downloaded_bytes != layer.metadata.file_size { - let local_path = local_layer_path( - self.conf, - tenant_shard_id, - &timeline.timeline_id, - &layer.name, - &layer.metadata.generation, - ); - - tracing::warn!( - "Downloaded layer {} with unexpected size {} != {}. Removing download.", - layer.name, - downloaded_bytes, - layer.metadata.file_size - ); - - tokio::fs::remove_file(&local_path) - .await - .or_else(fs_ext::ignore_not_found)?; - } else { - tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes); - let mut progress = self.secondary_state.progress.lock().unwrap(); - progress.bytes_downloaded += downloaded_bytes; - progress.layers_downloaded += 1; - } - - SECONDARY_MODE.download_layer.inc(); touched.push(layer) } @@ -966,6 +909,77 @@ impl<'a> TenantDownloader<'a> { Ok(()) } + + async fn download_layer( + &self, + tenant_shard_id: &TenantShardId, + timeline_id: &TimelineId, + layer: &HeatMapLayer, + ctx: &RequestContext, + ) -> Result<(), UpdateError> { + // Failpoint for simulating slow remote storage + failpoint_support::sleep_millis_async!( + "secondary-layer-download-sleep", + &self.secondary_state.cancel + ); + + // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally + let downloaded_bytes = match download_layer_file( + self.conf, + self.remote_storage, + *tenant_shard_id, + *timeline_id, + &layer.name, + &LayerFileMetadata::from(&layer.metadata), + &self.secondary_state.cancel, + ctx, + ) + .await + { + Ok(bytes) => bytes, + Err(DownloadError::NotFound) => { + // A heatmap might be out of date and refer to a layer that doesn't exist any more. + // This is harmless: continue to download the next layer. It is expected during compaction + // GC. + tracing::debug!( + "Skipped downloading missing layer {}, raced with compaction/gc?", + layer.name + ); + return Ok(()); + } + Err(e) => return Err(e.into()), + }; + + if downloaded_bytes != layer.metadata.file_size { + let local_path = local_layer_path( + self.conf, + tenant_shard_id, + timeline_id, + &layer.name, + &layer.metadata.generation, + ); + + tracing::warn!( + "Downloaded layer {} with unexpected size {} != {}. Removing download.", + layer.name, + downloaded_bytes, + layer.metadata.file_size + ); + + tokio::fs::remove_file(&local_path) + .await + .or_else(fs_ext::ignore_not_found)?; + } else { + tracing::info!("Downloaded layer {}, size {}", layer.name, downloaded_bytes); + let mut progress = self.secondary_state.progress.lock().unwrap(); + progress.bytes_downloaded += downloaded_bytes; + progress.layers_downloaded += 1; + } + + SECONDARY_MODE.download_layer.inc(); + + Ok(()) + } } /// Scan local storage and build up Layer objects based on the metadata in a HeatMapTimeline