From babbe125dabdd528843d78c97874833ae67c314e Mon Sep 17 00:00:00 2001 From: John Spray Date: Fri, 28 Jun 2024 18:05:09 +0100 Subject: [PATCH] pageserver: drop out of secondary download if iteration time has passed (#8198) ## Problem Very long running downloads can be wasteful, because the heatmap they're using is outdated after a few minutes. Closes: https://github.com/neondatabase/neon/issues/8182 ## Summary of changes - Impose a deadline on timeline downloads, using the same period as we use for scheduling, and returning an UpdateError::Restart when it is reached. This restart will involve waiting for a scheduling interval, but that's a good thing: it helps let other tenants proceed. - Refactor download_timeline so that the part where we update the state for local layers is done even if we fall out of the layer download loop with an error: this is important, especially for big tenants, because only layers in the SecondaryDetail state will be considered for eviction. --- pageserver/src/tenant/secondary/downloader.rs | 126 ++++++++++++++---- 1 file changed, 97 insertions(+), 29 deletions(-) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 24176ecf19..f6f30641db 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -262,6 +262,7 @@ impl scheduler::RunningJob for RunningDownload { struct CompleteDownload { secondary_state: Arc, completed_at: Instant, + result: Result<(), UpdateError>, } impl scheduler::Completion for CompleteDownload { @@ -286,21 +287,33 @@ impl JobGenerator { + // Start downloading again as soon as we can. This will involve waiting for the scheduler's + // scheduling interval. This slightly reduces the peak download speed of tenants that hit their + // deadline and keep restarting, but that also helps give other tenants a chance to execute rather + // that letting one big tenant dominate for a long time. + detail.next_download = Some(Instant::now()); + } + _ => { + let period = detail + .last_download + .as_ref() + .map(|d| d.upload_period) + .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL); - // We advance next_download irrespective of errors: we don't want error cases to result in - // expensive busy-polling. - detail.next_download = Some(Instant::now() + period_jitter(period, 5)); + // We advance next_download irrespective of errors: we don't want error cases to result in + // expensive busy-polling. + detail.next_download = Some(Instant::now() + period_jitter(period, 5)); + } + } } async fn schedule(&mut self) -> SchedulingResult { @@ -396,9 +409,10 @@ impl JobGenerator { tracing::info!("No heatmap found for tenant. This is fine if it is new."); @@ -415,6 +429,9 @@ impl JobGenerator { tracing::error!("Error while downloading tenant: {e}"); }, + Err(UpdateError::Restart) => { + tracing::info!("Download reached deadline & will restart to update heatmap") + } Ok(()) => {} }; @@ -436,6 +453,7 @@ impl JobGenerator { /// Errors that may be encountered while updating a tenant #[derive(thiserror::Error, Debug)] enum UpdateError { + /// This is not a true failure, but it's how a download indicates that it would like to be restarted by + /// the scheduler, to pick up the latest heatmap + #[error("Reached deadline, restarting downloads")] + Restart, + #[error("No remote data found")] NoData, #[error("Insufficient local storage space")] @@ -603,6 +626,26 @@ impl<'a> TenantDownloader<'a> { self.prepare_timelines(&heatmap, heatmap_mtime).await?; } + // Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again, + // so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might + // spend 10s of minutes downloading layers we don't need. + // (see https://github.com/neondatabase/neon/issues/8182) + let deadline = { + let period = self + .secondary_state + .detail + .lock() + .unwrap() + .last_download + .as_ref() + .map(|d| d.upload_period) + .unwrap_or(DEFAULT_DOWNLOAD_INTERVAL); + + // Use double the period: we are not promising to complete within the period, this is just a heuristic + // to keep using a "reasonably fresh" heatmap. + Instant::now() + period * 2 + }; + // Download the layers in the heatmap for timeline in heatmap.timelines { let timeline_state = timeline_states @@ -618,7 +661,7 @@ impl<'a> TenantDownloader<'a> { } let timeline_id = timeline.timeline_id; - self.download_timeline(timeline, timeline_state, ctx) + self.download_timeline(timeline, timeline_state, deadline, ctx) .instrument(tracing::info_span!( "secondary_download_timeline", tenant_id=%tenant_shard_id.tenant_id, @@ -827,26 +870,28 @@ impl<'a> TenantDownloader<'a> { .and_then(|x| x) } - async fn download_timeline( + /// Download heatmap layers that are not present on local disk, or update their + /// access time if they are already present. + async fn download_timeline_layers( &self, + tenant_shard_id: &TenantShardId, timeline: HeatMapTimeline, timeline_state: SecondaryDetailTimeline, + deadline: Instant, ctx: &RequestContext, - ) -> Result<(), UpdateError> { - debug_assert_current_span_has_tenant_and_timeline_id(); - let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); - + ) -> (Result<(), UpdateError>, Vec) { // Accumulate updates to the state let mut touched = Vec::new(); - tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); - - // Download heatmap layers that are not present on local disk, or update their - // access time if they are already present. for layer in timeline.layers { if self.secondary_state.cancel.is_cancelled() { tracing::debug!("Cancelled -- dropping out of layer loop"); - return Err(UpdateError::Cancelled); + return (Err(UpdateError::Cancelled), touched); + } + + if Instant::now() > deadline { + // We've been running downloads for a while, restart to download latest heatmap. + return (Err(UpdateError::Restart), touched); } // Existing on-disk layers: just update their access time. @@ -916,20 +961,43 @@ impl<'a> TenantDownloader<'a> { match self .download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx) - .await? + .await { - Some(layer) => touched.push(layer), - None => { + Ok(Some(layer)) => touched.push(layer), + Ok(None) => { // Not an error but we didn't download it: remote layer is missing. Don't add it to the list of // things to consider touched. } + Err(e) => { + return (Err(e), touched); + } } } - // Write updates to state to record layers we just downloaded or touched. + (Ok(()), touched) + } + + async fn download_timeline( + &self, + timeline: HeatMapTimeline, + timeline_state: SecondaryDetailTimeline, + deadline: Instant, + ctx: &RequestContext, + ) -> Result<(), UpdateError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); + let timeline_id = timeline.timeline_id; + + tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len()); + + let (result, touched) = self + .download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx) + .await; + + // Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful { let mut detail = self.secondary_state.detail.lock().unwrap(); - let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default(); + let timeline_detail = detail.timelines.entry(timeline_id).or_default(); tracing::info!("Wrote timeline_detail for {} touched layers", touched.len()); @@ -943,14 +1011,14 @@ impl<'a> TenantDownloader<'a> { let local_path = local_layer_path( self.conf, tenant_shard_id, - &timeline.timeline_id, + &timeline_id, &t.name, &t.metadata.generation, ); e.insert(OnDiskState::new( self.conf, tenant_shard_id, - &timeline.timeline_id, + &timeline_id, t.name, t.metadata.clone(), t.access_time, @@ -961,7 +1029,7 @@ impl<'a> TenantDownloader<'a> { } } - Ok(()) + result } /// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics