mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 02:12:56 +00:00
pageserver: fix API-driven secondary downloads possibly colliding with background downloads (#7848)
## Problem We've seen some strange behaviors when doing lots of migrations involving secondary locations. One of these was where a tenant was apparently stuck in the `Scheduler::running` list, but didn't appear to be making any progress. Another was a shutdown hang (https://github.com/neondatabase/cloud/issues/13576). ## Summary of changes - Fix one issue (probably not the only one) where a tenant in the `pending` list could proceed to `spawn` even if the same tenant already had a running task via `handle_command` (this could have resulted in a weird value of SecondaryProgress) - Add various extra logging: - log before as well as after layer downloads so that it would be obvious if we were stuck in remote storage code (we shouldn't be, it has built in timeouts) - log the number of running + pending jobs from the scheduler every time it wakes up to do a scheduling iteration (~10s) -- this is quite chatty, but not compared with the volume of logs on a busy pageserver. It should give us confidence that the scheduler loop is still alive, and visibility of how many tasks the scheduler thinks are running.
This commit is contained in:
@@ -93,7 +93,7 @@ pub(super) async fn downloader_task(
|
||||
|
||||
scheduler
|
||||
.run(command_queue, background_jobs_can_start, cancel)
|
||||
.instrument(info_span!("secondary_downloads"))
|
||||
.instrument(info_span!("secondary_download_scheduler"))
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -1013,6 +1013,11 @@ impl<'a> TenantDownloader<'a> {
|
||||
);
|
||||
|
||||
// Note: no backoff::retry wrapper here because download_layer_file does its own retries internally
|
||||
tracing::info!(
|
||||
"Starting download of layer {}, size {}",
|
||||
layer.name,
|
||||
layer.metadata.file_size
|
||||
);
|
||||
let downloaded_bytes = match download_layer_file(
|
||||
self.conf,
|
||||
self.remote_storage,
|
||||
|
||||
@@ -53,7 +53,7 @@ pub(super) async fn heatmap_uploader_task(
|
||||
|
||||
scheduler
|
||||
.run(command_queue, background_jobs_can_start, cancel)
|
||||
.instrument(info_span!("heatmap_uploader"))
|
||||
.instrument(info_span!("heatmap_upload_scheduler"))
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
@@ -179,6 +179,13 @@ where
|
||||
// Schedule some work, if concurrency limit permits it
|
||||
self.spawn_pending();
|
||||
|
||||
// This message is printed every scheduling iteration as proof of liveness when looking at logs
|
||||
tracing::info!(
|
||||
"Status: {} tasks running, {} pending",
|
||||
self.running.len(),
|
||||
self.pending.len()
|
||||
);
|
||||
|
||||
// Between scheduling iterations, we will:
|
||||
// - Drain any complete tasks and spawn pending tasks
|
||||
// - Handle incoming administrative commands
|
||||
@@ -258,7 +265,11 @@ where
|
||||
|
||||
self.tasks.spawn(fut);
|
||||
|
||||
self.running.insert(tenant_shard_id, in_progress);
|
||||
let replaced = self.running.insert(tenant_shard_id, in_progress);
|
||||
debug_assert!(replaced.is_none());
|
||||
if replaced.is_some() {
|
||||
tracing::warn!(%tenant_shard_id, "Unexpectedly spawned a task when one was already running")
|
||||
}
|
||||
}
|
||||
|
||||
/// For all pending tenants that are elegible for execution, spawn their task.
|
||||
@@ -268,7 +279,9 @@ where
|
||||
while !self.pending.is_empty() && self.running.len() < self.concurrency {
|
||||
// unwrap: loop condition includes !is_empty()
|
||||
let pending = self.pending.pop_front().unwrap();
|
||||
self.do_spawn(pending);
|
||||
if !self.running.contains_key(pending.get_tenant_shard_id()) {
|
||||
self.do_spawn(pending);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -321,7 +334,8 @@ where
|
||||
|
||||
let tenant_shard_id = job.get_tenant_shard_id();
|
||||
let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) {
|
||||
tracing::info!("Command already running, waiting for it");
|
||||
tracing::info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
|
||||
"Command already running, waiting for it");
|
||||
barrier
|
||||
} else {
|
||||
let running = self.spawn_now(job);
|
||||
|
||||
Reference in New Issue
Block a user