mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 09:22:55 +00:00
Merge branch 'main' into problame/move-pageserver-config-into-api-crate
This commit is contained in:
@@ -31,7 +31,10 @@ use crate::{
|
||||
|
||||
use super::{
|
||||
heatmap::HeatMapLayer,
|
||||
scheduler::{self, Completion, JobGenerator, SchedulingResult, TenantBackgroundJobs},
|
||||
scheduler::{
|
||||
self, period_jitter, period_warmup, Completion, JobGenerator, SchedulingResult,
|
||||
TenantBackgroundJobs,
|
||||
},
|
||||
SecondaryTenant,
|
||||
};
|
||||
|
||||
@@ -45,7 +48,6 @@ use chrono::format::{DelayedFormat, StrftimeItems};
|
||||
use futures::Future;
|
||||
use pageserver_api::models::SecondaryProgress;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::Rng;
|
||||
use remote_storage::{DownloadError, Etag, GenericRemoteStorage};
|
||||
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -274,7 +276,7 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
// Update freshened_at even if there was an error: we don't want errored tenants to implicitly
|
||||
// take priority to run again.
|
||||
let mut detail = secondary_state.detail.lock().unwrap();
|
||||
detail.next_download = Some(Instant::now() + DOWNLOAD_FRESHEN_INTERVAL);
|
||||
detail.next_download = Some(Instant::now() + period_jitter(DOWNLOAD_FRESHEN_INTERVAL, 5));
|
||||
}
|
||||
|
||||
async fn schedule(&mut self) -> SchedulingResult<PendingDownload> {
|
||||
@@ -305,11 +307,9 @@ impl JobGenerator<PendingDownload, RunningDownload, CompleteDownload, DownloadCo
|
||||
}
|
||||
|
||||
if detail.next_download.is_none() {
|
||||
// Initialize with a jitter: this spreads initial downloads on startup
|
||||
// or mass-attach across our freshen interval.
|
||||
let jittered_period =
|
||||
rand::thread_rng().gen_range(Duration::ZERO..DOWNLOAD_FRESHEN_INTERVAL);
|
||||
detail.next_download = Some(now.checked_add(jittered_period).expect(
|
||||
// Initialize randomly in the range from 0 to our interval: this uniformly spreads the start times. Subsequent
|
||||
// rounds will use a smaller jitter to avoid accidentally synchronizing later.
|
||||
detail.next_download = Some(now.checked_add(period_warmup(DOWNLOAD_FRESHEN_INTERVAL)).expect(
|
||||
"Using our constant, which is known to be small compared with clock range",
|
||||
));
|
||||
}
|
||||
|
||||
@@ -20,12 +20,14 @@ use crate::{
|
||||
|
||||
use futures::Future;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use rand::Rng;
|
||||
use remote_storage::{GenericRemoteStorage, TimeoutOrCancel};
|
||||
|
||||
use super::{
|
||||
heatmap::HeatMapTenant,
|
||||
scheduler::{self, JobGenerator, RunningJob, SchedulingResult, TenantBackgroundJobs},
|
||||
scheduler::{
|
||||
self, period_jitter, period_warmup, JobGenerator, RunningJob, SchedulingResult,
|
||||
TenantBackgroundJobs,
|
||||
},
|
||||
CommandRequest, UploadCommand,
|
||||
};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -181,15 +183,11 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
let state = self
|
||||
.tenants
|
||||
.entry(*tenant.get_tenant_shard_id())
|
||||
.or_insert_with(|| {
|
||||
let jittered_period = rand::thread_rng().gen_range(Duration::ZERO..period);
|
||||
|
||||
UploaderTenantState {
|
||||
tenant: Arc::downgrade(&tenant),
|
||||
last_upload: None,
|
||||
next_upload: Some(now.checked_add(jittered_period).unwrap_or(now)),
|
||||
last_digest: None,
|
||||
}
|
||||
.or_insert_with(|| UploaderTenantState {
|
||||
tenant: Arc::downgrade(&tenant),
|
||||
last_upload: None,
|
||||
next_upload: Some(now.checked_add(period_warmup(period)).unwrap_or(now)),
|
||||
last_digest: None,
|
||||
});
|
||||
|
||||
// Decline to do the upload if insufficient time has passed
|
||||
@@ -274,7 +272,7 @@ impl JobGenerator<UploadPending, WriteInProgress, WriteComplete, UploadCommand>
|
||||
|
||||
let next_upload = tenant
|
||||
.get_heatmap_period()
|
||||
.and_then(|period| now.checked_add(period));
|
||||
.and_then(|period| now.checked_add(period_jitter(period, 5)));
|
||||
|
||||
WriteComplete {
|
||||
tenant_shard_id: *tenant.get_tenant_shard_id(),
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use futures::Future;
|
||||
use rand::Rng;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
marker::PhantomData,
|
||||
@@ -19,6 +20,26 @@ use super::{CommandRequest, CommandResponse};
|
||||
const MAX_SCHEDULING_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const MIN_SCHEDULING_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Jitter a Duration by an integer percentage. Returned values are uniform
|
||||
/// in the range 100-pct..100+pct (i.e. a 5% jitter is 5% either way: a ~10% range)
|
||||
pub(super) fn period_jitter(d: Duration, pct: u32) -> Duration {
|
||||
if d == Duration::ZERO {
|
||||
d
|
||||
} else {
|
||||
rand::thread_rng().gen_range((d * (100 - pct)) / 100..(d * (100 + pct)) / 100)
|
||||
}
|
||||
}
|
||||
|
||||
/// When a periodic task first starts, it should wait for some time in the range 0..period, so
|
||||
/// that starting many such tasks at the same time spreads them across the time range.
|
||||
pub(super) fn period_warmup(period: Duration) -> Duration {
|
||||
if period == Duration::ZERO {
|
||||
period
|
||||
} else {
|
||||
rand::thread_rng().gen_range(Duration::ZERO..period)
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheduling helper for background work across many tenants.
|
||||
///
|
||||
/// Systems that need to run background work across many tenants may use this type
|
||||
|
||||
Reference in New Issue
Block a user