pageserver: use warn_when_period_overrun in secondary/heatmap

This commit is contained in:
John Spray
2023-12-19 21:28:41 +00:00
parent bc1f328d61
commit c772894c57
4 changed files with 136 additions and 35 deletions

View File

@@ -13,6 +13,7 @@ use crate::{
remote_timeline_client::{index::LayerFileMetadata, HEATMAP_BASENAME},
span::debug_assert_current_span_has_tenant_id,
storage_layer::{Layer, LayerFileName},
tasks::{warn_when_period_overrun, BackgroundLoopKind},
timeline::{DiskUsageEvictionInfo, LocalLayerInfoForDiskUsageEviction},
},
virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile},
@@ -31,11 +32,12 @@ use anyhow::Context;
use chrono::format::{DelayedFormat, StrftimeItems};
use pageserver_api::shard::TenantShardId;
use rand::Rng;
use remote_storage::GenericRemoteStorage;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
use tracing::{info_span, instrument, Instrument};
use utils::{completion::Barrier, crashsafe::path_with_suffix_extension, fs_ext, id::TimelineId};
use super::{
@@ -86,7 +88,8 @@ pub(super) struct SecondaryDetailTimeline {
/// to TenantManager
#[derive(Default, Debug)]
pub(super) struct SecondaryDetail {
freshened_at: Option<Instant>,
last_download: Option<Instant>,
next_download: Option<Instant>,
pub(super) timelines: HashMap<TimelineId, SecondaryDetailTimeline>,
}
@@ -171,6 +174,9 @@ struct SecondaryDownloader {
struct PendingDownload {
secondary_state: Arc<SecondaryTenant>,
last_download: Option<Instant>,
target_time: Option<Instant>,
period: Option<Duration>,
}
impl TenantScoped for PendingDownload {
@@ -230,7 +236,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
// take priority to run again.
let mut detail = secondary_state.detail.lock().unwrap();
detail.freshened_at = Some(Instant::now());
detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
}
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
@@ -246,22 +252,41 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
tenants.push(secondary_state.clone());
});
// Step 2: filter out tenants which are not elegible to run yet
// Step 2: filter out tenants which are not yet elegible to run
let now = Instant::now();
let tenants = tenants.into_iter().filter(|c| {
let detail = c.detail.lock().unwrap();
match detail.freshened_at {
None => true, // Not yet freshened, therefore elegible to run
Some(t) => {
let since = now.duration_since(t);
since > DOWNLOAD_FRESHEN_INTERVAL
}
}
});
result.jobs = tenants
.map(|t| PendingDownload { secondary_state: t })
.into_iter()
.filter_map(|c| {
let (last_download, next_download) = {
let mut detail = c.detail.lock().unwrap();
if detail.next_download.is_none() {
// Initialize with a jitter: this spreads initial downloads on startup
// or mass-attach across our freshen interval.
let jittered_period =
rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
detail.next_download = Some(now.checked_add(jittered_period).expect(
"Using our constant, which is known to be small compared with clock range",
));
}
(detail.last_download, detail.next_download.unwrap())
};
if now < next_download {
Some(PendingDownload {
secondary_state: c,
last_download,
target_time: Some(next_download),
period: Some(DOWNLOAD_FRESHEN_INTERVAL),
})
} else {
None
}
})
.collect();
// Step 3: sort by target execution time to run most urgent first.
result.jobs.sort_by_key(|j| j.target_time);
result
}
@@ -278,6 +303,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
};
Ok(PendingDownload {
target_time: None,
period: None,
last_download: None,
secondary_state: tenant,
})
}
@@ -288,11 +316,17 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
result_tx: tokio::sync::mpsc::UnboundedSender<CompleteDownload>,
job: PendingDownload,
) -> RunningDownload {
let PendingDownload { secondary_state } = job;
let PendingDownload {
secondary_state,
last_download,
target_time,
period,
} = job;
let (completion, barrier) = utils::completion::channel();
let remote_storage = self.remote_storage.clone();
let conf = self.tenant_manager.get_conf();
let tenant_shard_id = *secondary_state.get_tenant_shard_id();
join_set.spawn(async move {
let _completion = completion;
@@ -303,13 +337,31 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
tracing::info!("Failed to freshen secondary content: {e:#}")
};
// If the job had a target execution time, we may check our final execution
// time against that for observability purposes.
if let (Some(target_time), Some(period)) = (target_time, period) {
// Only track execution lag if this isn't our first download: otherwise, it is expected
// that execution will have taken longer than our configured interval, for example
// when starting up a pageserver and
if last_download.is_some() {
// Elapsed time includes any scheduling lag as well as the execution of the job
let elapsed = Instant::now().duration_since(target_time);
warn_when_period_overrun(
elapsed,
period,
BackgroundLoopKind::SecondaryDownload,
);
}
}
result_tx
.send(CompleteDownload {
secondary_state,
completed_at: Instant::now(),
})
.ok();
});
}.instrument(info_span!("secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())));
RunningDownload { barrier }
}
}
@@ -462,8 +514,9 @@ impl<'a> TenantDownloader<'a> {
}
}
#[instrument(skip_all, name="secondary_download", fields(tenant_id=%self.secondary_state.get_tenant_shard_id().tenant_id, shard_id=%self.secondary_state.get_tenant_shard_id().shard_slug()))]
async fn download(&self) -> anyhow::Result<()> {
debug_assert_current_span_has_tenant_id();
// For the duration of a download, we must hold the SecondaryTenant::gate, to ensure
// cover our access to local storage.
let Ok(_guard) = self.secondary_state.gate.enter() else {

View File

@@ -1,19 +1,24 @@
use std::{
collections::HashMap,
sync::{Arc, Weak},
time::Instant,
time::{Duration, Instant},
};
use crate::{
metrics::SECONDARY_MODE,
tenant::{
config::AttachmentMode, mgr::TenantManager, remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id, Tenant,
config::AttachmentMode,
mgr::TenantManager,
remote_timeline_client::remote_heatmap_path,
span::debug_assert_current_span_has_tenant_id,
tasks::{warn_when_period_overrun, BackgroundLoopKind},
Tenant,
},
};
use md5;
use pageserver_api::shard::TenantShardId;
use rand::Rng;
use remote_storage::GenericRemoteStorage;
use super::{
@@ -25,7 +30,7 @@ use super::{
};
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
use tracing::{info_span, instrument, Instrument};
use utils::{backoff, completion::Barrier};
use super::{heatmap::HeatMapTenant, UploadCommand};
@@ -43,6 +48,8 @@ impl HasBarrier for WriteInProgress {
struct UploadPending {
tenant: Arc<Tenant>,
last_digest: Option<md5::Digest>,
target_time: Option<Instant>,
period: Option<Duration>,
}
impl TenantScoped for UploadPending {
@@ -154,7 +161,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let tenants = self.tenant_manager.get_attached_active_tenant_shards();
yielding_loop(1000, &self.cancel, tenants.into_iter(), |tenant| {
match tenant.get_heatmap_period() {
let period = match tenant.get_heatmap_period() {
None => {
// Heatmaps are disabled for this tenant
return;
@@ -166,9 +173,11 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
result.want_interval = match result.want_interval {
None => Some(period),
Some(existing) => Some(std::cmp::min(period, existing)),
}
};
period
}
}
};
// Stale attachments do not upload anything: if we are in this state, there is probably some
// other attachment in mode Single or Multi running on another pageserver, and we don't
@@ -182,11 +191,15 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let state = self
.tenants
.entry(*tenant.get_tenant_shard_id())
.or_insert_with(|| UploaderTenantState {
tenant: Arc::downgrade(&tenant),
last_upload: None,
next_upload: Some(Instant::now()),
last_digest: None,
.or_insert_with(|| {
let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
UploaderTenantState {
tenant: Arc::downgrade(&tenant),
last_upload: None,
next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
last_digest: None,
}
});
// Decline to do the upload if insufficient time has passed
@@ -198,6 +211,8 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
result.jobs.push(UploadPending {
tenant,
last_digest,
target_time: state.next_upload,
period: Some(period),
});
})
.await
@@ -215,10 +230,13 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
let UploadPending {
tenant,
last_digest,
target_time,
period,
} = job;
let remote_storage = self.remote_storage.clone();
let (completion, barrier) = utils::completion::channel();
let tenant_shard_id = *tenant.get_tenant_shard_id();
join_set.spawn(async move {
// Guard for the barrier in [`WriteInProgress`]
let _completion = completion;
@@ -253,6 +271,16 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
};
let now = Instant::now();
// If the job had a target execution time, we may check our final execution
// time against that for observability purposes.
if let (Some(target_time), Some(period)) = (target_time, period) {
// Elapsed time includes any scheduling lag as well as the execution of the job
let elapsed = now.duration_since(target_time);
warn_when_period_overrun(elapsed, period, BackgroundLoopKind::HeatmapUpload);
}
let next_upload = tenant
.get_heatmap_period()
.and_then(|period| now.checked_add(period));
@@ -265,7 +293,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
next_upload,
})
.ok();
});
}.instrument(info_span!("secondary_download", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug())));
WriteInProgress { barrier }
}
@@ -284,6 +312,8 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
// Ignore our state for last digest: this forces an upload even if nothing has changed
last_digest: None,
tenant,
target_time: None,
period: None,
})
}
@@ -330,7 +360,6 @@ enum UploadHeatmapError {
/// The inner upload operation. This will skip if `last_digest` is Some and matches the digest
/// of the object we would have uploaded.
#[instrument(skip_all, fields(tenant_id = %tenant.get_tenant_shard_id().tenant_id, shard_id = %tenant.get_tenant_shard_id().shard_slug()))]
async fn upload_tenant_heatmap(
remote_storage: GenericRemoteStorage,
tenant: &Arc<Tenant>,

View File

@@ -96,6 +96,21 @@ pub(super) trait TenantScoped {
fn get_tenant_shard_id(&self) -> &TenantShardId;
}
pub(super) trait ScheduledJob {
/// Indicative intended time of execution: jobs are executed _after_ this time,
/// but how much after is an indicator of whether we're keeping up with schedudling
/// goals.
fn get_target_time(&self) -> Option<Instant> {
None
}
/// If the job runs on a periodic basis, expose the period here. This is used
/// together with the target time to determine whether the job is considered late.
fn get_period(&self) -> Option<Duration> {
None
}
}
/// For types that contain a Barrier that may be waited on
pub(super) trait HasBarrier {
fn get_barrier(&self) -> Barrier;
@@ -199,10 +214,12 @@ where
.checked_add(self.scheduling_interval)
.unwrap_or_else(|| {
tracing::warn!(
"Scheduling interval invalid ({}s), running immediately!",
"Scheduling interval invalid ({}s)",
self.scheduling_interval.as_secs_f64()
);
Instant::now()
// unwrap(): this constant is small, cannot fail to add to time unless
// we are close to the end of the universe.
Instant::now().checked_add(MIN_SCHEDULING_INTERVAL).unwrap()
});
loop {
tokio::select! {

View File

@@ -45,6 +45,8 @@ pub(crate) enum BackgroundLoopKind {
ConsumptionMetricsCollectMetrics,
ConsumptionMetricsSyntheticSizeWorker,
InitialLogicalSizeCalculation,
HeatmapUpload,
SecondaryDownload,
}
impl BackgroundLoopKind {