From 3764dd2e84db2e2bcf2df065df25304d4dddcaf6 Mon Sep 17 00:00:00 2001 From: John Spray Date: Mon, 6 May 2024 14:07:07 +0100 Subject: [PATCH] pageserver: call maybe_freeze_ephemeral_layer from a dedicated task (#7594) ## Problem In testing of the earlier fix for OOMs under heavy write load (https://github.com/neondatabase/neon/pull/7218), we saw that the limit on ephemeral layer size wasn't being reliably enforced. That was diagnosed as being due to overwhelmed compaction loops: most tenants were waiting on the semaphore for background tasks, and thereby not running the function that proactively rolls layers frequently enough. Related: https://github.com/neondatabase/neon/issues/6939 ## Summary of changes - Create a new per-tenant background loop for "ingest housekeeping", which invokes maybe_freeze_ephemeral_layer() without taking the background task semaphore. - Downgrade to DEBUG a log line in maybe_freeze_ephemeral_layer that had been INFO, but turns out to be pretty common in the field. There's some discussion on the issue (https://github.com/neondatabase/neon/issues/6939#issuecomment-2083554275) about alternatives for calling this maybe_freeze_epemeral_layer periodically without it getting stuck behind compaction. A whole task just for this feels like kind of a big hammer, but we may in future find that there are other pieces of lightweight housekeeping that we want to do here too. Why is it okay to call maybe_freeze_ephemeral_layer outside of the background tasks semaphore? - this is the same work we would do anyway if we receive writes from the safekeeper, just done a bit sooner. - The period of the new task is generously jittered (+/- 5%), so when the ephemeral layer size tips over the threshold, we shouldn't see an excessively aggressive thundering herd of layer freezes (and only layers larger than the mean layer size will be frozen) - All that said, this is an imperfect approach that relies on having a generous amount of RAM to dip into when we need to freeze somewhat urgently. It would be nice in future to also block compaction/GC when we recognize resource stress and need to do other work (like layer freezing) to reduce memory footprint. --- pageserver/src/task_mgr.rs | 3 ++ pageserver/src/tenant.rs | 28 ++++++++++ pageserver/src/tenant/tasks.rs | 85 ++++++++++++++++++++++++++++++- pageserver/src/tenant/timeline.rs | 23 +++------ 4 files changed, 122 insertions(+), 17 deletions(-) diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 0c245580ee..01a8974494 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -319,6 +319,9 @@ pub enum TaskKind { // Eviction. One per timeline. Eviction, + // Ingest housekeeping (flushing ephemeral layers on time threshold or disk pressure) + IngestHousekeeping, + /// See [`crate::disk_usage_eviction_task`]. DiskUsageEviction, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fdc49ae295..2d7a2e0f9d 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1676,6 +1676,34 @@ impl Tenant { Ok(()) } + // Call through to all timelines to freeze ephemeral layers if needed. Usually + // this happens during ingest: this background housekeeping is for freezing layers + // that are open but haven't been written to for some time. + async fn ingest_housekeeping(&self) { + // Scan through the hashmap and collect a list of all the timelines, + // while holding the lock. Then drop the lock and actually perform the + // compactions. We don't want to block everything else while the + // compaction runs. + let timelines = { + self.timelines + .lock() + .unwrap() + .values() + .filter_map(|timeline| { + if timeline.is_active() { + Some(timeline.clone()) + } else { + None + } + }) + .collect::>() + }; + + for timeline in &timelines { + timeline.maybe_freeze_ephemeral_layer().await; + } + } + pub fn current_state(&self) -> TenantState { self.state.borrow().clone() } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 41b77c1f4a..f153719f98 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -2,6 +2,7 @@ //! such as compaction and GC use std::ops::ControlFlow; +use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -9,9 +10,11 @@ use crate::context::{DownloadBehavior, RequestContext}; use crate::metrics::TENANT_TASK_EVENTS; use crate::task_mgr; use crate::task_mgr::{TaskKind, BACKGROUND_RUNTIME}; +use crate::tenant::config::defaults::DEFAULT_COMPACTION_PERIOD; use crate::tenant::throttle::Stats; use crate::tenant::timeline::CompactionError; use crate::tenant::{Tenant, TenantState}; +use rand::Rng; use tokio_util::sync::CancellationToken; use tracing::*; use utils::{backoff, completion}; @@ -44,6 +47,7 @@ pub(crate) enum BackgroundLoopKind { Compaction, Gc, Eviction, + IngestHouseKeeping, ConsumptionMetricsCollectMetrics, ConsumptionMetricsSyntheticSizeWorker, InitialLogicalSizeCalculation, @@ -132,6 +136,30 @@ pub fn start_background_loops( } }, ); + + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::IngestHousekeeping, + Some(tenant_shard_id), + None, + &format!("ingest housekeeping for tenant {tenant_shard_id}"), + false, + { + let tenant = Arc::clone(tenant); + let background_jobs_can_start = background_jobs_can_start.cloned(); + async move { + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()) }, + _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + }; + ingest_housekeeping_loop(tenant, cancel) + .instrument(info_span!("ingest_housekeeping_loop", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug())) + .await; + Ok(()) + } + }, + ); } /// @@ -379,6 +407,61 @@ async fn gc_loop(tenant: Arc, cancel: CancellationToken) { TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); } +async fn ingest_housekeeping_loop(tenant: Arc, cancel: CancellationToken) { + TENANT_TASK_EVENTS.with_label_values(&["start"]).inc(); + async { + loop { + tokio::select! { + _ = cancel.cancelled() => { + return; + }, + tenant_wait_result = wait_for_active_tenant(&tenant) => match tenant_wait_result { + ControlFlow::Break(()) => return, + ControlFlow::Continue(()) => (), + }, + } + + // We run ingest housekeeping with the same frequency as compaction: it is not worth + // having a distinct setting. But we don't run it in the same task, because compaction + // blocks on acquiring the background job semaphore. + let period = tenant.get_compaction_period(); + + // If compaction period is set to zero (to disable it), then we will use a reasonable default + let period = if period == Duration::ZERO { + humantime::Duration::from_str(DEFAULT_COMPACTION_PERIOD) + .unwrap() + .into() + } else { + period + }; + + // Jitter the period by +/- 5% + let period = + rand::thread_rng().gen_range((period * (95)) / 100..(period * (105)) / 100); + + // Always sleep first: we do not need to do ingest housekeeping early in the lifetime of + // a tenant, since it won't have started writing any ephemeral files yet. + if tokio::time::timeout(period, cancel.cancelled()) + .await + .is_ok() + { + break; + } + + let started_at = Instant::now(); + tenant.ingest_housekeeping().await; + + warn_when_period_overrun( + started_at.elapsed(), + period, + BackgroundLoopKind::IngestHouseKeeping, + ); + } + } + .await; + TENANT_TASK_EVENTS.with_label_values(&["stop"]).inc(); +} + async fn wait_for_active_tenant(tenant: &Arc) -> ControlFlow<()> { // if the tenant has a proper status already, no need to wait for anything if tenant.current_state() == TenantState::Active { @@ -420,8 +503,6 @@ pub(crate) async fn random_init_delay( period: Duration, cancel: &CancellationToken, ) -> Result<(), Cancelled> { - use rand::Rng; - if period == Duration::ZERO { return Ok(()); } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index c7a5598cec..3748036e4f 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1498,11 +1498,11 @@ impl Timeline { self.flush_frozen_layers_and_wait(to_lsn).await } - /// If there is no writer, and conditions for rolling the latest layer are met, then freeze it. - /// - /// This is for use in background housekeeping, to provide guarantees of layers closing eventually - /// even if there are no ongoing writes to drive that. - async fn maybe_freeze_ephemeral_layer(&self) { + // Check if an open ephemeral layer should be closed: this provides + // background enforcement of checkpoint interval if there is no active WAL receiver, to avoid keeping + // an ephemeral layer open forever when idle. It also freezes layers if the global limit on + // ephemeral layer bytes has been breached. + pub(super) async fn maybe_freeze_ephemeral_layer(&self) { let Ok(_write_guard) = self.write_lock.try_lock() else { // If the write lock is held, there is an active wal receiver: rolling open layers // is their responsibility while they hold this lock. @@ -1529,13 +1529,11 @@ impl Timeline { // we are a sharded tenant and have skipped some WAL let last_freeze_ts = *self.last_freeze_ts.read().unwrap(); if last_freeze_ts.elapsed() >= self.get_checkpoint_timeout() { - // This should be somewhat rare, so we log it at INFO level. - // - // We checked for checkpoint timeout so that a shard without any - // data ingested (yet) doesn't write a remote index as soon as it + // Only do this if have been layer-less longer than get_checkpoint_timeout, so that a shard + // without any data ingested (yet) doesn't write a remote index as soon as it // sees its LSN advance: we only do this if we've been layer-less // for some time. - tracing::info!( + tracing::debug!( "Advancing disk_consistent_lsn past WAL ingest gap {} -> {}", disk_consistent_lsn, last_record_lsn @@ -1625,11 +1623,6 @@ impl Timeline { (guard, permit) }; - // Prior to compaction, check if an open ephemeral layer should be closed: this provides - // background enforcement of checkpoint interval if there is no active WAL receiver, to avoid keeping - // an ephemeral layer open forever when idle. - self.maybe_freeze_ephemeral_layer().await; - // this wait probably never needs any "long time spent" logging, because we already nag if // compaction task goes over it's period (20s) which is quite often in production. let (_guard, _permit) = tokio::select! {