From c772894c57b5452829a02106712bdcc448c149aa Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 19 Dec 2023 21:28:41 +0000 Subject: [PATCH] pageserver: use warn_when_period_overrun in secondary/heatmap --- pageserver/src/tenant/secondary/downloader.rs | 91 +++++++++++++++---- .../src/tenant/secondary/heatmap_uploader.rs | 57 +++++++++--- pageserver/src/tenant/secondary/scheduler.rs | 21 ++++- pageserver/src/tenant/tasks.rs | 2 + 4 files changed, 136 insertions(+), 35 deletions(-) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 79e75aae44..716fdf1833 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -13,6 +13,7 @@ use crate::{ remote_timeline_client::{index::LayerFileMetadata, HEATMAP_BASENAME}, span::debug_assert_current_span_has_tenant_id, storage_layer::{Layer, LayerFileName}, + tasks::{warn_when_period_overrun, BackgroundLoopKind}, timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction}, }, virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}, @@ -31,11 +32,12 @@ use anyhow::Context; use chrono::format::{DelayedFormat, StrftimeItems}; use pageserver_api::shard::TenantShardId; +use rand::Rng; use remote_storage::GenericRemoteStorage; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{instrument, Instrument}; +use tracing::{info_span, instrument, Instrument}; use utils::{completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId}; use super::{ @@ -86,7 +88,8 @@ pub(super) struct SecondaryDetailTimeline { /// to TenantManager #[derive(Default, Debug)] pub(super) struct SecondaryDetail { - freshened_at: Option, + last_download: Option, + next_download: Option, pub(super) timelines: HashMap, } @@ -171,6 +174,9 @@ struct SecondaryDownloader { struct PendingDownload { secondary_state: Arc, + last_download: Option, + target_time: Option, + period: Option, } impl TenantScoped for PendingDownload { @@ -230,7 +236,7 @@ impl JobGenerator SchedulingResult { @@ -246,22 +252,41 @@ impl JobGenerator true, // Not yet freshened, therefore elegible to run - Some(t) => { - let since = now.duration_since(t); - since > DOWNLOAD_FRESHEN_INTERVAL - } - } - }); - result.jobs = tenants - .map(|t| PendingDownload { secondary_state: t }) + .into_iter() + .filter_map(|c| { + let (last_download, next_download) = { + let mut detail = c.detail.lock().unwrap(); + if detail.next_download.is_none() { + // Initialize with a jitter: this spreads initial downloads on startup + // or mass-attach across our freshen interval. + let jittered_period = + rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL); + detail.next_download = Some(now.checked_add(jittered_period).expect( + "Using our constant, which is known to be small compared with clock range", + )); + } + (detail.last_download, detail.next_download.unwrap()) + }; + + if now < next_download { + Some(PendingDownload { + secondary_state: c, + last_download, + target_time: Some(next_download), + period: Some(DOWNLOAD_FRESHEN_INTERVAL), + }) + } else { + None + } + }) .collect(); + + // Step 3: sort by target execution time to run most urgent first. + result.jobs.sort_by_key(|j| j.target_time); + result } @@ -278,6 +303,9 @@ impl JobGenerator, job: PendingDownload, ) -> RunningDownload { - let PendingDownload { secondary_state } = job; + let PendingDownload { + secondary_state, + last_download, + target_time, + period, + } = job; let (completion, barrier) = utils::completion::channel(); let remote_storage = self.remote_storage.clone(); let conf = self.tenant_manager.get_conf(); + let tenant_shard_id = *secondary_state.get_tenant_shard_id(); join_set.spawn(async move { let _completion = completion; @@ -303,13 +337,31 @@ impl JobGenerator TenantDownloader<'a> { } } - #[instrument(skip_all, name="secondary_download", fields(tenant_id=%self.secondary_state.get_tenant_shard_id().tenant_id, shard_id=%self.secondary_state.get_tenant_shard_id().shard_slug()))] async fn download(&self) -> anyhow::Result<()> { + debug_assert_current_span_has_tenant_id(); + // For the duration of a download, we must hold the SecondaryTenant::gate, to ensure // cover our access to local storage. let Ok(_guard) = self.secondary_state.gate.enter() else { diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index fc32d315dd..95022ea117 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -1,19 +1,24 @@ use std::{ collections::HashMap, sync::{Arc, Weak}, - time::Instant, + time::{Duration, Instant}, }; use crate::{ metrics::SECONDARY_MODE, tenant::{ - config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path, - span::debug_assert_current_span_has_tenant_id, Tenant, + config::AttachmentMode, + mgr::TenantManager, + remote_timeline_client::remote_heatmap_path, + span::debug_assert_current_span_has_tenant_id, + tasks::{warn_when_period_overrun, BackgroundLoopKind}, + Tenant, }, }; use md5; use pageserver_api::shard::TenantShardId; +use rand::Rng; use remote_storage::GenericRemoteStorage; use super::{ @@ -25,7 +30,7 @@ use super::{ }; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::instrument; +use tracing::{info_span, instrument, Instrument}; use utils::{backoff, completion::Barrier}; use super::{heatmap::HeatMapTenant, UploadCommand}; @@ -43,6 +48,8 @@ impl HasBarrier for WriteInProgress { struct UploadPending { tenant: Arc, last_digest: Option, + target_time: Option, + period: Option, } impl TenantScoped for UploadPending { @@ -154,7 +161,7 @@ impl JobGenerator let tenants = self.tenant_manager.get_attached_active_tenant_shards(); yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| { - match tenant.get_heatmap_period() { + let period = match tenant.get_heatmap_period() { None => { // Heatmaps are disabled for this tenant return; @@ -166,9 +173,11 @@ impl JobGenerator result.want_interval = match result.want_interval { None => Some(period), Some(existing) => Some(std::cmp::min(period, existing)), - } + }; + + period } - } + }; // Stale attachments do not upload anything: if we are in this state, there is probably some // other attachment in mode Single or Multi running on another pageserver, and we don't @@ -182,11 +191,15 @@ impl JobGenerator let state = self .tenants .entry(*tenant.get_tenant_shard_id()) - .or_insert_with(|| UploaderTenantState { - tenant: Arc::downgrade(&tenant), - last_upload: None, - next_upload: Some(Instant::now()), - last_digest: None, + .or_insert_with(|| { + let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period); + + UploaderTenantState { + tenant: Arc::downgrade(&tenant), + last_upload: None, + next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)), + last_digest: None, + } }); // Decline to do the upload if insufficient time has passed @@ -198,6 +211,8 @@ impl JobGenerator result.jobs.push(UploadPending { tenant, last_digest, + target_time: state.next_upload, + period: Some(period), }); }) .await @@ -215,10 +230,13 @@ impl JobGenerator let UploadPending { tenant, last_digest, + target_time, + period, } = job; let remote_storage = self.remote_storage.clone(); let (completion, barrier) = utils::completion::channel(); + let tenant_shard_id = *tenant.get_tenant_shard_id(); join_set.spawn(async move { // Guard for the barrier in [`WriteInProgress`] let _completion = completion; @@ -253,6 +271,16 @@ impl JobGenerator }; let now = Instant::now(); + + // If the job had a target execution time, we may check our final execution + // time against that for observability purposes. + if let (Some(target_time), Some(period)) = (target_time, period) { + // Elapsed time includes any scheduling lag as well as the execution of the job + let elapsed = now.duration_since(target_time); + + warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload); + } + let next_upload = tenant .get_heatmap_period() .and_then(|period| now.checked_add(period)); @@ -265,7 +293,7 @@ impl JobGenerator next_upload, }) .ok(); - }); + }.instrument(info_span!("secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))); WriteInProgress { barrier } } @@ -284,6 +312,8 @@ impl JobGenerator // Ignore our state for last digest: this forces an upload even if nothing has changed last_digest: None, tenant, + target_time: None, + period: None, }) } @@ -330,7 +360,6 @@ enum UploadHeatmapError { /// The inner upload operation. This will skip if `last_digest` is Some and matches the digest /// of the object we would have uploaded. -#[instrument(skip_all, fields(tenant_id = %tenant.get_tenant_shard_id().tenant_id, shard_id = %tenant.get_tenant_shard_id().shard_slug()))] async fn upload_tenant_heatmap( remote_storage: GenericRemoteStorage, tenant: &Arc, diff --git a/pageserver/src/tenant/secondary/scheduler.rs b/pageserver/src/tenant/secondary/scheduler.rs index 16c4e66c3b..fa6cd12d66 100644 --- a/pageserver/src/tenant/secondary/scheduler.rs +++ b/pageserver/src/tenant/secondary/scheduler.rs @@ -96,6 +96,21 @@ pub(super) trait TenantScoped { fn get_tenant_shard_id(&self) -> &TenantShardId; } +pub(super) trait ScheduledJob { + /// Indicative intended time of execution: jobs are executed _after_ this time, + /// but how much after is an indicator of whether we're keeping up with schedudling + /// goals. + fn get_target_time(&self) -> Option { + None + } + + /// If the job runs on a periodic basis, expose the period here. This is used + /// together with the target time to determine whether the job is considered late. + fn get_period(&self) -> Option { + None + } +} + /// For types that contain a Barrier that may be waited on pub(super) trait HasBarrier { fn get_barrier(&self) -> Barrier; @@ -199,10 +214,12 @@ where .checked_add(self.scheduling_interval) .unwrap_or_else(|| { tracing::warn!( - "Scheduling interval invalid ({}s), running immediately!", + "Scheduling interval invalid ({}s)", self.scheduling_interval.as_secs_f64() ); - Instant::now() + // unwrap(): this constant is small, cannot fail to add to time unless + // we are close to the end of the universe. + Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap() }); loop { tokio::select! { diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 7ff1873eda..aa5894cc37 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -45,6 +45,8 @@ pub(crate) enum BackgroundLoopKind { ConsumptionMetricsCollectMetrics, ConsumptionMetricsSyntheticSizeWorker, InitialLogicalSizeCalculation, + HeatmapUpload, + SecondaryDownload, } impl BackgroundLoopKind {