mirror of
https://github.com/neondatabase/neon.git
synced 2026-06-04 05:50:38 +00:00
pageserver: drop out of secondary download if iteration time has passed (#8198)
## Problem Very long running downloads can be wasteful, because the heatmap they're using is outdated after a few minutes. Closes: https://github.com/neondatabase/neon/issues/8182 ## Summary of changes - Impose a deadline on timeline downloads, using the same period as we use for scheduling, and returning an UpdateError::Restart when it is reached. This restart will involve waiting for a scheduling interval, but that's a good thing: it helps let other tenants proceed. - Refactor download_timeline so that the part where we update the state for local layers is done even if we fall out of the layer download loop with an error: this is important, especially for big tenants, because only layers in the SecondaryDetail state will be considered for eviction.
This commit is contained in:
@@ -262,6 +262,7 @@ impl scheduler::RunningJob for RunningDownload {
|
||||
struct CompleteDownload {
|
||||
secondary_state: Arc<SecondaryTenant>,
|
||||
completed_at: Instant,
|
||||
result: Result<(), UpdateError>,
|
||||
}
|
||||
|
||||
impl scheduler::Completion for CompleteDownload {
|
||||
@@ -286,21 +287,33 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
let CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: _completed_at,
|
||||
result,
|
||||
} = completion;
|
||||
|
||||
tracing::debug!("Secondary tenant download completed");
|
||||
|
||||
let mut detail = secondary_state.detail.lock().unwrap();
|
||||
|
||||
let period = detail
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
match result {
|
||||
Err(UpdateError::Restart) => {
|
||||
// Start downloading again as soon as we can. This will involve waiting for the scheduler's
|
||||
// scheduling interval. This slightly reduces the peak download speed of tenants that hit their
|
||||
// deadline and keep restarting, but that also helps give other tenants a chance to execute rather
|
||||
// that letting one big tenant dominate for a long time.
|
||||
detail.next_download = Some(Instant::now());
|
||||
}
|
||||
_ => {
|
||||
let period = detail
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
|
||||
// We advance next_download irrespective of errors: we don't want error cases to result in
|
||||
// expensive busy-polling.
|
||||
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
|
||||
// We advance next_download irrespective of errors: we don't want error cases to result in
|
||||
// expensive busy-polling.
|
||||
detail.next_download = Some(Instant::now() + period_jitter(period, 5));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
|
||||
@@ -396,9 +409,10 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
(RunningDownload { barrier }, Box::pin(async move {
|
||||
let _completion = completion;
|
||||
|
||||
match TenantDownloader::new(conf, &remote_storage, &secondary_state)
|
||||
let result = TenantDownloader::new(conf, &remote_storage, &secondary_state)
|
||||
.download(&download_ctx)
|
||||
.await
|
||||
.await;
|
||||
match &result
|
||||
{
|
||||
Err(UpdateError::NoData) => {
|
||||
tracing::info!("No heatmap found for tenant. This is fine if it is new.");
|
||||
@@ -415,6 +429,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
Err(e @ (UpdateError::DownloadError(_) | UpdateError::Other(_))) => {
|
||||
tracing::error!("Error while downloading tenant: {e}");
|
||||
},
|
||||
Err(UpdateError::Restart) => {
|
||||
tracing::info!("Download reached deadline & will restart to update heatmap")
|
||||
}
|
||||
Ok(()) => {}
|
||||
};
|
||||
|
||||
@@ -436,6 +453,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
CompleteDownload {
|
||||
secondary_state,
|
||||
completed_at: Instant::now(),
|
||||
result
|
||||
}
|
||||
}.instrument(info_span!(parent: None, "secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))))
|
||||
}
|
||||
@@ -452,6 +470,11 @@ struct TenantDownloader<'a> {
|
||||
/// Errors that may be encountered while updating a tenant
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum UpdateError {
|
||||
/// This is not a true failure, but it's how a download indicates that it would like to be restarted by
|
||||
/// the scheduler, to pick up the latest heatmap
|
||||
#[error("Reached deadline, restarting downloads")]
|
||||
Restart,
|
||||
|
||||
#[error("No remote data found")]
|
||||
NoData,
|
||||
#[error("Insufficient local storage space")]
|
||||
@@ -603,6 +626,26 @@ impl<'a> TenantDownloader<'a> {
|
||||
self.prepare_timelines(&heatmap, heatmap_mtime).await?;
|
||||
}
|
||||
|
||||
// Calculate a deadline for downloads: if downloading takes longer than this, it is useful to drop out and start again,
|
||||
// so that we are always using reasonably a fresh heatmap. Otherwise, if we had really huge content to download, we might
|
||||
// spend 10s of minutes downloading layers we don't need.
|
||||
// (see https://github.com/neondatabase/neon/issues/8182)
|
||||
let deadline = {
|
||||
let period = self
|
||||
.secondary_state
|
||||
.detail
|
||||
.lock()
|
||||
.unwrap()
|
||||
.last_download
|
||||
.as_ref()
|
||||
.map(|d| d.upload_period)
|
||||
.unwrap_or(DEFAULT_DOWNLOAD_INTERVAL);
|
||||
|
||||
// Use double the period: we are not promising to complete within the period, this is just a heuristic
|
||||
// to keep using a "reasonably fresh" heatmap.
|
||||
Instant::now() + period * 2
|
||||
};
|
||||
|
||||
// Download the layers in the heatmap
|
||||
for timeline in heatmap.timelines {
|
||||
let timeline_state = timeline_states
|
||||
@@ -618,7 +661,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
|
||||
let timeline_id = timeline.timeline_id;
|
||||
self.download_timeline(timeline, timeline_state, ctx)
|
||||
self.download_timeline(timeline, timeline_state, deadline, ctx)
|
||||
.instrument(tracing::info_span!(
|
||||
"secondary_download_timeline",
|
||||
tenant_id=%tenant_shard_id.tenant_id,
|
||||
@@ -827,26 +870,28 @@ impl<'a> TenantDownloader<'a> {
|
||||
.and_then(|x| x)
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
/// Download heatmap layers that are not present on local disk, or update their
|
||||
/// access time if they are already present.
|
||||
async fn download_timeline_layers(
|
||||
&self,
|
||||
tenant_shard_id: &TenantShardId,
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), UpdateError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
|
||||
) -> (Result<(), UpdateError>, Vec<HeatMapLayer>) {
|
||||
// Accumulate updates to the state
|
||||
let mut touched = Vec::new();
|
||||
|
||||
tracing::debug!(timeline_id=%timeline.timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
// Download heatmap layers that are not present on local disk, or update their
|
||||
// access time if they are already present.
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return Err(UpdateError::Cancelled);
|
||||
return (Err(UpdateError::Cancelled), touched);
|
||||
}
|
||||
|
||||
if Instant::now() > deadline {
|
||||
// We've been running downloads for a while, restart to download latest heatmap.
|
||||
return (Err(UpdateError::Restart), touched);
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
@@ -916,20 +961,43 @@ impl<'a> TenantDownloader<'a> {
|
||||
|
||||
match self
|
||||
.download_layer(tenant_shard_id, &timeline.timeline_id, layer, ctx)
|
||||
.await?
|
||||
.await
|
||||
{
|
||||
Some(layer) => touched.push(layer),
|
||||
None => {
|
||||
Ok(Some(layer)) => touched.push(layer),
|
||||
Ok(None) => {
|
||||
// Not an error but we didn't download it: remote layer is missing. Don't add it to the list of
|
||||
// things to consider touched.
|
||||
}
|
||||
Err(e) => {
|
||||
return (Err(e), touched);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched.
|
||||
(Ok(()), touched)
|
||||
}
|
||||
|
||||
async fn download_timeline(
|
||||
&self,
|
||||
timeline: HeatMapTimeline,
|
||||
timeline_state: SecondaryDetailTimeline,
|
||||
deadline: Instant,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(), UpdateError> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
let timeline_id = timeline.timeline_id;
|
||||
|
||||
tracing::debug!(timeline_id=%timeline_id, "Downloading layers, {} in heatmap", timeline.layers.len());
|
||||
|
||||
let (result, touched) = self
|
||||
.download_timeline_layers(tenant_shard_id, timeline, timeline_state, deadline, ctx)
|
||||
.await;
|
||||
|
||||
// Write updates to state to record layers we just downloaded or touched, irrespective of whether the overall result was successful
|
||||
{
|
||||
let mut detail = self.secondary_state.detail.lock().unwrap();
|
||||
let timeline_detail = detail.timelines.entry(timeline.timeline_id).or_default();
|
||||
let timeline_detail = detail.timelines.entry(timeline_id).or_default();
|
||||
|
||||
tracing::info!("Wrote timeline_detail for {} touched layers", touched.len());
|
||||
|
||||
@@ -943,14 +1011,14 @@ impl<'a> TenantDownloader<'a> {
|
||||
let local_path = local_layer_path(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
&timeline_id,
|
||||
&t.name,
|
||||
&t.metadata.generation,
|
||||
);
|
||||
e.insert(OnDiskState::new(
|
||||
self.conf,
|
||||
tenant_shard_id,
|
||||
&timeline.timeline_id,
|
||||
&timeline_id,
|
||||
t.name,
|
||||
t.metadata.clone(),
|
||||
t.access_time,
|
||||
@@ -961,7 +1029,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
result
|
||||
}
|
||||
|
||||
/// Call this during timeline download if a layer will _not_ be downloaded, to update progress statistics
|
||||
|
||||
Reference in New Issue
Block a user