add random init delay for background tasks (#3655)

Fixes #3649.
This commit is contained in:
Joonas Koivunen
2023-02-21 13:42:11 +02:00
committed by GitHub
parent 7de373210d
commit b220ba6cd1
2 changed files with 73 additions and 6 deletions

View File

@@ -11,6 +11,7 @@ use crate::task_mgr;
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::mgr;
use crate::tenant::{Tenant, TenantState};
use tokio_util::sync::CancellationToken;
use tracing::*;
use utils::id::TenantId;
@@ -53,12 +54,14 @@ async fn compaction_loop(tenant_id: TenantId) {
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
@@ -68,9 +71,19 @@ async fn compaction_loop(tenant_id: TenantId) {
},
};
let period = tenant.get_compaction_period();
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let started_at = Instant::now();
let period = tenant.get_compaction_period();
let sleep_duration = if period == Duration::ZERO {
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
@@ -89,7 +102,7 @@ async fn compaction_loop(tenant_id: TenantId) {
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -111,14 +124,16 @@ async fn gc_loop(tenant_id: TenantId) {
info!("starting");
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let cancel = task_mgr::shutdown_token();
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
loop {
trace!("waking up");
let tenant = tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request");
return;
},
@@ -128,9 +143,17 @@ async fn gc_loop(tenant_id: TenantId) {
},
};
let period = tenant.get_gc_period();
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let started_at = Instant::now();
let period = tenant.get_gc_period();
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
info!("automatic GC is disabled");
@@ -151,7 +174,7 @@ async fn gc_loop(tenant_id: TenantId) {
// Sleep
tokio::select! {
_ = task_mgr::shutdown_watcher() => {
_ = cancel.cancelled() => {
info!("received cancellation request during idling");
break;
},
@@ -207,6 +230,37 @@ async fn wait_for_active_tenant(
}
}
#[derive(thiserror::Error, Debug)]
#[error("cancelled")]
pub(crate) struct Cancelled;
/// Provide a random delay for background task initialization.
///
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
/// different periods for more stable load.
pub(crate) async fn random_init_delay(
period: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
use rand::Rng;
let d = {
let mut rng = rand::thread_rng();
// gen_range asserts that the range cannot be empty, which it could be because period can
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
let period = std::cmp::max(period, Duration::from_secs(10));
// semi-ok default as the source of jitter
rng.gen_range(Duration::ZERO..=period)
};
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(d) => Ok(()),
}
}
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
if elapsed >= period && period != Duration::ZERO {

View File

@@ -41,6 +41,19 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
use crate::tenant::tasks::random_init_delay;
{
let policy = self.get_eviction_policy();
let period = match policy {
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &cancel).await.is_err() {
info!("shutting down");
return;
}
}
loop {
let policy = self.get_eviction_policy();
let cf = self.eviction_iteration(&policy, cancel.clone()).await;