diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 870475eb57..de30c4dcb6 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -93,7 +93,7 @@ pub(super) async fn downloader_task( scheduler .run(command_queue, background_jobs_can_start, cancel) - .instrument(info_span!("secondary_downloads")) + .instrument(info_span!("secondary_download_scheduler")) .await } @@ -1013,6 +1013,11 @@ impl<'a> TenantDownloader<'a> { ); // Note: no backoff::retry wrapper here because download_layer_file does its own retries internally + tracing::info!( + "Starting download of layer {}, size {}", + layer.name, + layer.metadata.file_size + ); let downloaded_bytes = match download_layer_file( self.conf, self.remote_storage, diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index fddced3ead..9c7a9c4234 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -53,7 +53,7 @@ pub(super) async fn heatmap_uploader_task( scheduler .run(command_queue, background_jobs_can_start, cancel) - .instrument(info_span!("heatmap_uploader")) + .instrument(info_span!("heatmap_upload_scheduler")) .await } diff --git a/pageserver/src/tenant/secondary/scheduler.rs b/pageserver/src/tenant/secondary/scheduler.rs index 3d042f4513..0ec1c7872a 100644 --- a/pageserver/src/tenant/secondary/scheduler.rs +++ b/pageserver/src/tenant/secondary/scheduler.rs @@ -179,6 +179,13 @@ where // Schedule some work, if concurrency limit permits it self.spawn_pending(); + // This message is printed every scheduling iteration as proof of liveness when looking at logs + tracing::info!( + "Status: {} tasks running, {} pending", + self.running.len(), + self.pending.len() + ); + // Between scheduling iterations, we will: // - Drain any complete tasks and spawn pending tasks // - Handle incoming administrative commands @@ -258,7 +265,11 @@ where self.tasks.spawn(fut); - self.running.insert(tenant_shard_id, in_progress); + let replaced = self.running.insert(tenant_shard_id, in_progress); + debug_assert!(replaced.is_none()); + if replaced.is_some() { + tracing::warn!(%tenant_shard_id, "Unexpectedly spawned a task when one was already running") + } } /// For all pending tenants that are elegible for execution, spawn their task. @@ -268,7 +279,9 @@ where while !self.pending.is_empty() && self.running.len() < self.concurrency { // unwrap: loop condition includes !is_empty() let pending = self.pending.pop_front().unwrap(); - self.do_spawn(pending); + if !self.running.contains_key(pending.get_tenant_shard_id()) { + self.do_spawn(pending); + } } } @@ -321,7 +334,8 @@ where let tenant_shard_id = job.get_tenant_shard_id(); let barrier = if let Some(barrier) = self.get_running(tenant_shard_id) { - tracing::info!("Command already running, waiting for it"); + tracing::info!(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), + "Command already running, waiting for it"); barrier } else { let running = self.spawn_now(job);