mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-14 17:02:56 +00:00
@@ -11,6 +11,7 @@ use crate::task_mgr;
|
|||||||
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME};
|
||||||
use crate::tenant::mgr;
|
use crate::tenant::mgr;
|
||||||
use crate::tenant::{Tenant, TenantState};
|
use crate::tenant::{Tenant, TenantState};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
use utils::id::TenantId;
|
use utils::id::TenantId;
|
||||||
|
|
||||||
@@ -53,12 +54,14 @@ async fn compaction_loop(tenant_id: TenantId) {
|
|||||||
info!("starting");
|
info!("starting");
|
||||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||||
async {
|
async {
|
||||||
|
let cancel = task_mgr::shutdown_token();
|
||||||
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
|
||||||
|
let mut first = true;
|
||||||
loop {
|
loop {
|
||||||
trace!("waking up");
|
trace!("waking up");
|
||||||
|
|
||||||
let tenant = tokio::select! {
|
let tenant = tokio::select! {
|
||||||
_ = task_mgr::shutdown_watcher() => {
|
_ = cancel.cancelled() => {
|
||||||
info!("received cancellation request");
|
info!("received cancellation request");
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
@@ -68,9 +71,19 @@ async fn compaction_loop(tenant_id: TenantId) {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let period = tenant.get_compaction_period();
|
||||||
|
|
||||||
|
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
|
||||||
|
// loop
|
||||||
|
if first {
|
||||||
|
first = false;
|
||||||
|
if random_init_delay(period, &cancel).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let started_at = Instant::now();
|
let started_at = Instant::now();
|
||||||
|
|
||||||
let period = tenant.get_compaction_period();
|
|
||||||
let sleep_duration = if period == Duration::ZERO {
|
let sleep_duration = if period == Duration::ZERO {
|
||||||
info!("automatic compaction is disabled");
|
info!("automatic compaction is disabled");
|
||||||
// check again in 10 seconds, in case it's been enabled again.
|
// check again in 10 seconds, in case it's been enabled again.
|
||||||
@@ -89,7 +102,7 @@ async fn compaction_loop(tenant_id: TenantId) {
|
|||||||
|
|
||||||
// Sleep
|
// Sleep
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = task_mgr::shutdown_watcher() => {
|
_ = cancel.cancelled() => {
|
||||||
info!("received cancellation request during idling");
|
info!("received cancellation request during idling");
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
@@ -111,14 +124,16 @@ async fn gc_loop(tenant_id: TenantId) {
|
|||||||
info!("starting");
|
info!("starting");
|
||||||
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
|
||||||
async {
|
async {
|
||||||
|
let cancel = task_mgr::shutdown_token();
|
||||||
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
// GC might require downloading, to find the cutoff LSN that corresponds to the
|
||||||
// cutoff specified as time.
|
// cutoff specified as time.
|
||||||
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
|
||||||
|
let mut first = true;
|
||||||
loop {
|
loop {
|
||||||
trace!("waking up");
|
trace!("waking up");
|
||||||
|
|
||||||
let tenant = tokio::select! {
|
let tenant = tokio::select! {
|
||||||
_ = task_mgr::shutdown_watcher() => {
|
_ = cancel.cancelled() => {
|
||||||
info!("received cancellation request");
|
info!("received cancellation request");
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
@@ -128,9 +143,17 @@ async fn gc_loop(tenant_id: TenantId) {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let period = tenant.get_gc_period();
|
||||||
|
|
||||||
|
if first {
|
||||||
|
first = false;
|
||||||
|
if random_init_delay(period, &cancel).await.is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let started_at = Instant::now();
|
let started_at = Instant::now();
|
||||||
|
|
||||||
let period = tenant.get_gc_period();
|
|
||||||
let gc_horizon = tenant.get_gc_horizon();
|
let gc_horizon = tenant.get_gc_horizon();
|
||||||
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
|
let sleep_duration = if period == Duration::ZERO || gc_horizon == 0 {
|
||||||
info!("automatic GC is disabled");
|
info!("automatic GC is disabled");
|
||||||
@@ -151,7 +174,7 @@ async fn gc_loop(tenant_id: TenantId) {
|
|||||||
|
|
||||||
// Sleep
|
// Sleep
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = task_mgr::shutdown_watcher() => {
|
_ = cancel.cancelled() => {
|
||||||
info!("received cancellation request during idling");
|
info!("received cancellation request during idling");
|
||||||
break;
|
break;
|
||||||
},
|
},
|
||||||
@@ -207,6 +230,37 @@ async fn wait_for_active_tenant(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(thiserror::Error, Debug)]
|
||||||
|
#[error("cancelled")]
|
||||||
|
pub(crate) struct Cancelled;
|
||||||
|
|
||||||
|
/// Provide a random delay for background task initialization.
|
||||||
|
///
|
||||||
|
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
|
||||||
|
/// different periods for more stable load.
|
||||||
|
pub(crate) async fn random_init_delay(
|
||||||
|
period: Duration,
|
||||||
|
cancel: &CancellationToken,
|
||||||
|
) -> Result<(), Cancelled> {
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
|
let d = {
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
|
||||||
|
// gen_range asserts that the range cannot be empty, which it could be because period can
|
||||||
|
// be set to zero to disable gc or compaction, so lets set it to be at least 10s.
|
||||||
|
let period = std::cmp::max(period, Duration::from_secs(10));
|
||||||
|
|
||||||
|
// semi-ok default as the source of jitter
|
||||||
|
rng.gen_range(Duration::ZERO..=period)
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancel.cancelled() => Err(Cancelled),
|
||||||
|
_ = tokio::time::sleep(d) => Ok(()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
|
pub(crate) fn warn_when_period_overrun(elapsed: Duration, period: Duration, task: &str) {
|
||||||
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
|
// Duration::ZERO will happen because it's the "disable [bgtask]" value.
|
||||||
if elapsed >= period && period != Duration::ZERO {
|
if elapsed >= period && period != Duration::ZERO {
|
||||||
|
|||||||
@@ -41,6 +41,19 @@ impl Timeline {
|
|||||||
|
|
||||||
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
|
#[instrument(skip_all, fields(tenant_id = %self.tenant_id, timeline_id = %self.timeline_id))]
|
||||||
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
|
async fn eviction_task(self: Arc<Self>, cancel: CancellationToken) {
|
||||||
|
use crate::tenant::tasks::random_init_delay;
|
||||||
|
{
|
||||||
|
let policy = self.get_eviction_policy();
|
||||||
|
let period = match policy {
|
||||||
|
EvictionPolicy::LayerAccessThreshold(lat) => lat.period,
|
||||||
|
EvictionPolicy::NoEviction => Duration::from_secs(10),
|
||||||
|
};
|
||||||
|
if random_init_delay(period, &cancel).await.is_err() {
|
||||||
|
info!("shutting down");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let policy = self.get_eviction_policy();
|
let policy = self.get_eviction_policy();
|
||||||
let cf = self.eviction_iteration(&policy, cancel.clone()).await;
|
let cf = self.eviction_iteration(&policy, cancel.clone()).await;
|
||||||
|
|||||||
Reference in New Issue
Block a user