pageserver: misc task cleanups (#10723)

This patch does a bunch of superficial cleanups of `tenant::tasks` to
avoid noise in subsequent PRs. There are no functional changes.

PS: enable "hide whitespace" when reviewing, due to the unindentation of
large async blocks.
This commit is contained in:
Erik Grinaker
2025-02-08 12:02:13 +01:00
committed by GitHub
parent 6cd3b501ec
commit 874accd6ed
4 changed files with 342 additions and 401 deletions

View File

@@ -61,6 +61,7 @@ use crate::{
remote_timeline_client::LayerFileMetadata,
secondary::SecondaryTenant,
storage_layer::{AsLayerDesc, EvictionError, Layer, LayerName, LayerVisibilityHint},
tasks::sleep_random,
},
CancellableTask, DiskUsageEvictionTask,
};
@@ -210,14 +211,8 @@ async fn disk_usage_eviction_task(
info!("disk usage based eviction task finishing");
};
use crate::tenant::tasks::random_init_delay;
{
if random_init_delay(task_config.period, &cancel)
.await
.is_err()
{
return;
}
if sleep_random(task_config.period, &cancel).await.is_err() {
return;
}
let mut iteration_no = 0;

View File

@@ -1,15 +1,17 @@
//! This module contains functions to serve per-tenant background processes,
//! such as compaction and GC
//! This module contains per-tenant background processes, e.g. compaction and GC.
use std::cmp::max;
use std::ops::ControlFlow;
use std::future::Future;
use std::ops::{ControlFlow, RangeInclusive};
use std::pin::pin;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use rand::Rng;
use tokio::sync::Semaphore;
use scopeguard::defer;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio_util::sync::CancellationToken;
use tracing::*;
@@ -20,8 +22,10 @@ use crate::tenant::throttle::Stats;
use crate::tenant::timeline::compaction::CompactionOutcome;
use crate::tenant::timeline::CompactionError;
use crate::tenant::{Tenant, TenantState};
use pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD;
use utils::completion::Barrier;
use utils::rate_limit::RateLimit;
use utils::{backoff, completion, pausable_failpoint};
use utils::{backoff, pausable_failpoint};
/// Semaphore limiting concurrent background tasks (across all tenants).
///
@@ -52,6 +56,10 @@ static CONCURRENT_COMPACTION_TASKS: Lazy<Semaphore> = Lazy::new(|| {
Semaphore::new(permits)
});
/// Background jobs.
///
/// NB: not all of these acquire a CONCURRENT_BACKGROUND_TASKS semaphore permit, only the ones that
/// do any significant IO.
#[derive(
Debug,
PartialEq,
@@ -76,15 +84,15 @@ pub(crate) enum BackgroundLoopKind {
}
pub struct BackgroundLoopSemaphorePermit<'a> {
_permit: tokio::sync::SemaphorePermit<'static>,
_permit: SemaphorePermit<'static>,
_recorder: BackgroundLoopSemaphoreMetricsRecorder<'a>,
}
/// Cancellation safe.
pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
_ctx: &RequestContext,
/// Acquires a semaphore permit, to limit concurrent background jobs.
pub(crate) async fn acquire_concurrency_permit(
loop_kind: BackgroundLoopKind,
use_compaction_semaphore: bool,
_ctx: &RequestContext,
) -> BackgroundLoopSemaphorePermit<'static> {
// TODO: use a lower threshold and remove the pacer once we resolve some blockage.
const WARN_THRESHOLD: Duration = Duration::from_secs(600);
@@ -121,12 +129,10 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
}
}
/// Start per tenant background loops: compaction and gc.
pub fn start_background_loops(
tenant: &Arc<Tenant>,
background_jobs_can_start: Option<&completion::Barrier>,
) {
/// Start per tenant background loops: compaction, GC, and ingest housekeeping.
pub fn start_background_loops(tenant: &Arc<Tenant>, can_start: Option<&Barrier>) {
let tenant_shard_id = tenant.tenant_shard_id;
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::Compaction,
@@ -135,13 +141,15 @@ pub fn start_background_loops(
&format!("compactor for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
let can_start = can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
compaction_loop(tenant, cancel)
// If you rename this span, change the RUST_LOG env variable in test_runner/performance/test_branch_creation.py
.instrument(info_span!("compaction_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
@@ -150,6 +158,7 @@ pub fn start_background_loops(
}
},
);
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::GarbageCollector,
@@ -158,13 +167,15 @@ pub fn start_background_loops(
&format!("garbage collector for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
let can_start = can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
gc_loop(tenant, cancel)
.instrument(info_span!("gc_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
@@ -181,13 +192,15 @@ pub fn start_background_loops(
&format!("ingest housekeeping for tenant {tenant_shard_id}"),
{
let tenant = Arc::clone(tenant);
let background_jobs_can_start = background_jobs_can_start.cloned();
let can_start = can_start.cloned();
async move {
let cancel = task_mgr::shutdown_token();
let cancel = task_mgr::shutdown_token(); // NB: must be in async context
tokio::select! {
_ = cancel.cancelled() => { return Ok(()) },
_ = completion::Barrier::maybe_wait(background_jobs_can_start) => {}
_ = cancel.cancelled() => return Ok(()),
_ = Barrier::maybe_wait(can_start) => {}
};
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
defer!(TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc());
ingest_housekeeping_loop(tenant, cancel)
.instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
.await;
@@ -197,372 +210,309 @@ pub fn start_background_loops(
);
}
///
/// Compaction task's main loop
///
/// Compaction task's main loop.
async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
let ctx = RequestContext::todo_child(TaskKind::Compaction, DownloadBehavior::Download);
let mut first = true;
let mut error_run = 0; // consecutive errors
let period = tenant.get_compaction_period();
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if random_init_delay(period, &cancel).await.is_err() {
break;
}
}
let period = tenant.get_compaction_period();
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::from_secs(1)
} else {
period
};
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
sleep_duration = wait_duration;
}
}
// the duration is recorded by performance tests by enabling debug in this function
tracing::debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
// TODO: we shouldn't need to await to find tenant and this could be moved outside of
// loop, #3501. There are also additional "allowed_errors" in tests.
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
break;
}
}
let sleep_duration;
if period == Duration::ZERO {
#[cfg(not(feature = "testing"))]
info!("automatic compaction is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10)
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Compaction,
};
// Run compaction
let IterationResult { output, elapsed } = iteration
.run(tenant.compaction_iteration(&cancel, &ctx))
.await;
match output {
Ok(outcome) => {
error_run = 0;
// schedule the next compaction immediately in case there is a pending compaction task
sleep_duration = if let CompactionOutcome::Pending = outcome {
Duration::from_secs(1)
} else {
period
};
}
Err(err) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(&err, error_run, &wait_duration, cancel.is_cancelled());
sleep_duration = wait_duration;
}
}
// the duration is recorded by performance tests by enabling debug in this function
debug!(
elapsed_ms = elapsed.as_millis(),
"compaction iteration complete"
);
};
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
// TODO: move this to a separate task (housekeeping loop) that isn't affected by the back-off,
// so we get some upper bound guarantee on when walredo quiesce / this throttling reporting here happens.
if let Some(walredo_mgr) = &tenant.walredo_mgr {
walredo_mgr.maybe_quiesce(period * 10);
}
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
fn log_compaction_error(
e: &CompactionError,
error_run_count: u32,
sleep_duration: &std::time::Duration,
err: &CompactionError,
error_count: u32,
sleep_duration: &Duration,
task_cancelled: bool,
) {
use crate::tenant::upload_queue::NotInitialized;
use crate::tenant::PageReconstructError;
use CompactionError::*;
enum LooksLike {
Info,
Error,
}
let level = match err {
ShuttingDown => return,
Offload(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,
Other(err) => {
let root_cause = err.root_cause();
let decision = match e {
ShuttingDown => None,
Offload(_) => Some(LooksLike::Error),
_ if task_cancelled => Some(LooksLike::Info),
Other(e) => {
let root_cause = e.root_cause();
let is_stopping = {
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
upload_queue || timeline
};
let upload_queue = root_cause
.downcast_ref::<NotInitialized>()
.is_some_and(|e| e.is_stopping());
let timeline = root_cause
.downcast_ref::<PageReconstructError>()
.is_some_and(|e| e.is_stopping());
let is_stopping = upload_queue || timeline;
if is_stopping {
Some(LooksLike::Info)
Level::INFO
} else {
Some(LooksLike::Error)
Level::ERROR
}
}
};
match decision {
Some(LooksLike::Info) => info!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:#}",
),
Some(LooksLike::Error) => error!(
"Compaction failed {error_run_count} times, retrying in {sleep_duration:?}: {e:?}",
),
None => {}
match level {
Level::ERROR => {
error!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
}
Level::INFO => {
info!("Compaction failed {error_count} times, retrying in {sleep_duration:?}: {err:#}")
}
level => unimplemented!("unexpected level {level:?}"),
}
}
///
/// GC task's main loop
///
/// GC task's main loop.
async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
const MAX_BACKOFF_SECS: f64 = 300.0;
// How many errors we have seen consequtively
let mut error_run_count = 0;
let mut error_run = 0; // consecutive errors
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx =
RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
// GC might require downloading, to find the cutoff LSN that corresponds to the
// cutoff specified as time.
let ctx = RequestContext::todo_child(TaskKind::GarbageCollector, DownloadBehavior::Download);
let mut first = true;
let mut first = true;
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
let period = tenant.get_gc_period();
let period = tenant.get_gc_period();
if first {
first = false;
let delays = async {
random_init_delay(period, &cancel).await?;
Ok::<_, Cancelled>(())
};
if delays.await.is_err() {
break;
}
}
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration;
if period == Duration::ZERO || gc_horizon == 0 {
#[cfg(not(feature = "testing"))]
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::Gc,
};
// Run gc
let IterationResult { output, elapsed: _ } =
iteration.run(tenant.gc_iteration(None, gc_horizon, tenant.get_pitr_interval(), &cancel, &ctx))
.await;
match output {
Ok(_) => {
error_run_count = 0;
sleep_duration = period;
}
Err(crate::tenant::GcError::TenantCancelled) => {
return;
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}");
}
sleep_duration = wait_duration;
}
}
};
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
if first {
first = false;
if sleep_random(period, &cancel).await.is_err() {
break;
}
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
TENANT_TASK_EVENTS.with_label_values(&["start"]).inc();
async {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
tokio::select! {
_ = cancel.cancelled() => {
return;
},
tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result {
ControlFlow::Break(()) => return,
ControlFlow::Continue(()) => (),
},
}
// We run ingest housekeeping with the same frequency as compaction: it is not worth
// having a distinct setting. But we don't run it in the same task, because compaction
// blocks on acquiring the background job semaphore.
let period = tenant.get_compaction_period();
// If compaction period is set to zero (to disable it), then we will use a reasonable default
let period = if period == Duration::ZERO {
humantime::Duration::from_str(
pageserver_api::config::tenant_conf_defaults::DEFAULT_COMPACTION_PERIOD,
)
.unwrap()
.into()
} else {
period
};
// Jitter the period by +/- 5%
let period =
rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100);
// Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
// a tenant, since it won't have started writing any ephemeral files yet.
if tokio::time::timeout(period, cancel.cancelled())
.await
.is_ok()
{
break;
}
let gc_horizon = tenant.get_gc_horizon();
let sleep_duration;
if period == Duration::ZERO || gc_horizon == 0 {
#[cfg(not(feature = "testing"))]
info!("automatic GC is disabled");
// check again in 10 seconds, in case it's been enabled again.
sleep_duration = Duration::from_secs(10);
} else {
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::IngestHouseKeeping,
kind: BackgroundLoopKind::Gc,
};
iteration.run(tenant.ingest_housekeeping()).await;
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
// Or just spawn another background loop for this throttle, it's not like it's super costly.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
if count_throttled == 0 {
// Run gc
let IterationResult { output, elapsed: _ } = iteration
.run(tenant.gc_iteration(
None,
gc_horizon,
tenant.get_pitr_interval(),
&cancel,
&ctx,
))
.await;
match output {
Ok(_) => {
error_run = 0;
sleep_duration = period;
}
Err(crate::tenant::GcError::TenantCancelled) => {
return;
}
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
.await;
TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc();
}
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
// if the tenant has a proper status already, no need to wait for anything
if tenant.current_state() == TenantState::Active {
ControlFlow::Continue(())
} else {
let mut tenant_state_updates = tenant.subscribe_for_state_updates();
loop {
match tenant_state_updates.changed().await {
Ok(()) => {
let new_state = &*tenant_state_updates.borrow();
match new_state {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => {
debug!("Not running the task loop, tenant is not active: {state:?}");
continue;
}
if matches!(e, crate::tenant::GcError::TimelineCancelled) {
// Timeline was cancelled during gc. We might either be in an event
// that affects the entire tenant (tenant deletion, pageserver shutdown),
// or in one that affects the timeline only (timeline deletion).
// Therefore, don't exit the loop.
info!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
} else {
error!("Gc failed {error_run} times, retrying in {wait_duration:?}: {e:?}");
}
}
Err(_sender_dropped_error) => {
return ControlFlow::Break(());
sleep_duration = wait_duration;
}
}
};
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
.is_ok()
{
break;
}
}
}
/// Ingest housekeeping's main loop.
async fn ingest_housekeeping_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
let mut last_throttle_flag_reset_at = Instant::now();
loop {
if wait_for_active_tenant(&tenant, &cancel).await.is_break() {
return;
}
// We run ingest housekeeping with the same frequency as compaction: it is not worth
// having a distinct setting. But we don't run it in the same task, because compaction
// blocks on acquiring the background job semaphore.
let mut period = tenant.get_compaction_period();
// If compaction period is set to zero (to disable it), then we will use a reasonable default
if period == Duration::ZERO {
period = humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD)
.unwrap()
.into()
}
// Always sleep first: we do not need to do ingest housekeeping early in the lifetime of
// a tenant, since it won't have started writing any ephemeral files yet. Jitter the
// period by ±5%.
let Ok(period) = sleep_jitter(period, period * 5 / 100, &cancel).await else {
break;
};
let iteration = Iteration {
started_at: Instant::now(),
period,
kind: BackgroundLoopKind::IngestHouseKeeping,
};
iteration.run(tenant.ingest_housekeeping()).await;
// TODO: rename the background loop kind to something more generic, like, tenant housekeeping.
// Or just spawn another background loop for this throttle, it's not like it's super costly.
info_span!(parent: None, "pagestream_throttle", tenant_id=%tenant.tenant_shard_id, shard_id=%tenant.tenant_shard_id.shard_slug()).in_scope(|| {
let now = Instant::now();
let prev = std::mem::replace(&mut last_throttle_flag_reset_at, now);
let Stats { count_accounted_start, count_accounted_finish, count_throttled, sum_throttled_usecs} = tenant.pagestream_throttle.reset_stats();
if count_throttled == 0 {
return;
}
let allowed_rps = tenant.pagestream_throttle.steady_rps();
let delta = now - prev;
info!(
n_seconds=%format_args!("{:.3}", delta.as_secs_f64()),
count_accounted = count_accounted_finish, // don't break existing log scraping
count_throttled,
sum_throttled_usecs,
count_accounted_start, // log after pre-existing fields to not break existing log scraping
allowed_rps=%format_args!("{allowed_rps:.0}"),
"shard was throttled in the last n_seconds"
);
});
}
}
/// Waits until the tenant becomes active, or returns `ControlFlow::Break()` to shut down.
async fn wait_for_active_tenant(
tenant: &Arc<Tenant>,
cancel: &CancellationToken,
) -> ControlFlow<()> {
if tenant.current_state() == TenantState::Active {
return ControlFlow::Continue(());
}
let mut update_rx = tenant.subscribe_for_state_updates();
loop {
tokio::select! {
_ = cancel.cancelled() => return ControlFlow::Break(()),
result = update_rx.changed() => if result.is_err() {
return ControlFlow::Break(());
}
}
match &*update_rx.borrow() {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => debug!("Not running the task loop, tenant is not active: {state:?}"),
}
}
}
@@ -571,26 +521,41 @@ async fn wait_for_active_tenant(tenant: &Arc<Tenant>) -> ControlFlow<()> {
#[error("cancelled")]
pub(crate) struct Cancelled;
/// Provide a random delay for background task initialization.
/// Sleeps for a random interval up to the given max value.
///
/// This delay prevents a thundering herd of background tasks and will likely keep them running on
/// different periods for more stable load.
pub(crate) async fn random_init_delay(
period: Duration,
pub(crate) async fn sleep_random(
max: Duration,
cancel: &CancellationToken,
) -> Result<(), Cancelled> {
if period == Duration::ZERO {
return Ok(());
}
) -> Result<Duration, Cancelled> {
sleep_random_range(Duration::ZERO..=max, cancel).await
}
let d = {
let mut rng = rand::thread_rng();
rng.gen_range(Duration::ZERO..=period)
};
match tokio::time::timeout(d, cancel.cancelled()).await {
Ok(_) => Err(Cancelled),
Err(_) => Ok(()),
/// Sleeps for a random interval in the given range. Returns the duration.
pub(crate) async fn sleep_random_range(
interval: RangeInclusive<Duration>,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let delay = rand::thread_rng().gen_range(interval);
if delay == Duration::ZERO {
return Ok(delay);
}
tokio::select! {
_ = cancel.cancelled() => Err(Cancelled),
_ = tokio::time::sleep(delay) => Ok(delay),
}
}
/// Sleeps for an interval with a random jitter.
pub(crate) async fn sleep_jitter(
duration: Duration,
jitter: Duration,
cancel: &CancellationToken,
) -> Result<Duration, Cancelled> {
let from = duration.saturating_sub(jitter);
let to = duration.saturating_add(jitter);
sleep_random_range(from..=to, cancel).await
}
struct Iteration {
@@ -606,42 +571,25 @@ struct IterationResult<O> {
impl Iteration {
#[instrument(skip_all)]
pub(crate) async fn run<Fut, O>(self, fut: Fut) -> IterationResult<O>
where
Fut: std::future::Future<Output = O>,
{
let Self {
started_at,
period,
kind,
} = self;
let mut fut = std::pin::pin!(fut);
pub(crate) async fn run<F: Future<Output = O>, O>(self, fut: F) -> IterationResult<O> {
let mut fut = pin!(fut);
// Wrap `fut` into a future that logs a message every `period` so that we get a
// very obvious breadcrumb in the logs _while_ a slow iteration is happening.
let liveness_logger = async move {
loop {
match tokio::time::timeout(period, &mut fut).await {
Ok(x) => return x,
Err(_) => {
// info level as per the same rationale why warn_when_period_overrun is info
// => https://github.com/neondatabase/neon/pull/5724
info!("still running");
}
}
let output = loop {
match tokio::time::timeout(self.period, &mut fut).await {
Ok(r) => break r,
Err(_) => info!("still running"),
}
};
let output = liveness_logger.await;
let elapsed = started_at.elapsed();
warn_when_period_overrun(elapsed, period, kind);
let elapsed = self.started_at.elapsed();
warn_when_period_overrun(elapsed, self.period, self.kind);
IterationResult { output, elapsed }
}
}
/// Attention: the `task` and `period` beocme labels of a pageserver-wide prometheus metric.
// NB: the `task` and `period` are used for metrics labels.
pub(crate) fn warn_when_period_overrun(
elapsed: Duration,
period: Duration,

View File

@@ -1718,10 +1718,10 @@ impl Timeline {
let prepare = async move {
let guard = self.compaction_lock.lock().await;
let permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
ctx,
let permit = super::tasks::acquire_concurrency_permit(
BackgroundLoopKind::Compaction,
self.conf.use_compaction_semaphore,
ctx,
)
.await;
@@ -3057,10 +3057,10 @@ impl Timeline {
let self_ref = &self;
let skip_concurrency_limiter = &skip_concurrency_limiter;
async move {
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
background_ctx,
let wait_for_permit = super::tasks::acquire_concurrency_permit(
BackgroundLoopKind::InitialLogicalSizeCalculation,
false,
background_ctx,
);
use crate::metrics::initial_logical_size::StartCircumstances;

View File

@@ -32,7 +32,7 @@ use crate::{
tenant::{
size::CalculateSyntheticSizeError,
storage_layer::LayerVisibilityHint,
tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit},
tasks::{sleep_random, BackgroundLoopKind, BackgroundLoopSemaphorePermit},
timeline::EvictionError,
LogicalSizeCalculationCause, Tenant,
},
@@ -83,8 +83,6 @@ impl Timeline {
#[instrument(skip_all, fields(tenant_id = %self.tenant_shard_id.tenant_id, shard_id = %self.tenant_shard_id.shard_slug(), timeline_id = %self.timeline_id))]
async fn eviction_task(self: Arc<Self>, tenant: Arc<Tenant>) {
use crate::tenant::tasks::random_init_delay;
// acquire the gate guard only once within a useful span
let Ok(guard) = self.gate.enter() else {
return;
@@ -97,7 +95,7 @@ impl Timeline {
EvictionPolicy::OnlyImitiate(lat) => lat.period,
EvictionPolicy::NoEviction => Duration::from_secs(10),
};
if random_init_delay(period, &self.cancel).await.is_err() {
if sleep_random(period, &self.cancel).await.is_err() {
return;
}
}
@@ -334,10 +332,10 @@ impl Timeline {
cancel: &CancellationToken,
ctx: &RequestContext,
) -> ControlFlow<(), BackgroundLoopSemaphorePermit<'static>> {
let acquire_permit = crate::tenant::tasks::concurrent_background_tasks_rate_limit_permit(
ctx,
let acquire_permit = crate::tenant::tasks::acquire_concurrency_permit(
BackgroundLoopKind::Eviction,
false,
ctx,
);
tokio::select! {