diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index dec585ff65..78be98bd34 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -283,6 +283,52 @@ pub(crate) enum SpawnMode { Lazy, } +/// A notifier that can be used to trigger compaction shared by all timelines of a tenant. +/// +/// It is used to notify the compaction loop that there is work to be done. It is also used +/// to balance the load of compaction between timelines. If there are pending L0 compaction in +/// one of the timeline, it could preempt long-running compaction jobs (e.g., image compaction) +/// on other timelines. +pub struct CompactionNotifier { + notify: Notify, + l0_count: std::sync::Mutex>, +} + +impl CompactionNotifier { + pub fn new() -> Self { + Self { + notify: Notify::new(), + l0_count: std::sync::Mutex::new(HashMap::new()), + } + } + + pub fn notify_one(&self) { + self.notify.notify_one(); + } + + pub fn notified(&self) -> tokio::sync::futures::Notified<'_> { + self.notify.notified() + } + + pub fn on_l0_update(&self, timeline_id: TimelineId, l0_count: usize) { + let mut guard = self.l0_count.lock().unwrap(); + guard.insert(timeline_id, l0_count); + } + + pub fn on_shutdown(&self, timeline_id: TimelineId) { + let mut guard = self.l0_count.lock().unwrap(); + guard.remove(&timeline_id); + } + + pub fn get_max_l0_count(&self) -> Option<(usize, TimelineId)> { + let guard = self.l0_count.lock().unwrap(); + guard + .iter() + .max_by_key(|(_, count)| *count) + .map(|(timeline_id, count)| (*count, *timeline_id)) + } +} + /// /// Tenant consists of multiple timelines. Keep them in a hash table. /// @@ -358,7 +404,7 @@ pub struct Tenant { compaction_circuit_breaker: std::sync::Mutex, /// Signals the tenant compaction loop that there is L0 compaction work to be done. - pub(crate) l0_compaction_trigger: Arc, + pub(crate) l0_compaction_trigger: Arc, /// Scheduled gc-compaction tasks. scheduled_compaction_tasks: std::sync::Mutex>>, @@ -4242,7 +4288,7 @@ impl Tenant { // use an extremely long backoff. Some(Duration::from_secs(3600 * 24)), )), - l0_compaction_trigger: Arc::new(Notify::new()), + l0_compaction_trigger: Arc::new(CompactionNotifier::new()), scheduled_compaction_tasks: Mutex::new(Default::default()), activate_now_sem: tokio::sync::Semaphore::new(0), attach_wal_lag_cooldown: Arc::new(std::sync::OnceLock::new()), diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 782b7d88b0..8f37982b24 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -47,7 +47,7 @@ use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::runtime::Handle; use tokio::sync::mpsc::Sender; -use tokio::sync::{oneshot, watch, Notify}; +use tokio::sync::{oneshot, watch}; use tokio_util::sync::CancellationToken; use tracing::*; use utils::critical; @@ -151,7 +151,8 @@ use super::{ MaybeOffloaded, }; use super::{ - debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, HeatMapTimeline, + debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, CompactionNotifier, + HeatMapTimeline, }; use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe}; use super::{ @@ -224,7 +225,7 @@ pub struct TimelineResources { pub remote_client: RemoteTimelineClient, pub pagestream_throttle: Arc, pub pagestream_throttle_metrics: Arc, - pub l0_compaction_trigger: Arc, + pub l0_compaction_trigger: Arc, pub l0_flush_global_state: l0_flush::L0FlushGlobalState, } @@ -428,7 +429,7 @@ pub struct Timeline { compaction_failed: AtomicBool, /// Notifies the tenant compaction loop that there is pending L0 compaction work. - l0_compaction_trigger: Arc, + l0_compaction_trigger: Arc, /// Make sure we only have one running gc at a time. /// @@ -1965,6 +1966,8 @@ impl Timeline { // ... and inform any waiters for newer LSNs that there won't be any. self.last_record_lsn.shutdown(); + self.l0_compaction_trigger.on_shutdown(self.timeline_id); + if let ShutdownMode::FreezeAndFlush = mode { let do_flush = if let Some((open, frozen)) = self .layers @@ -4119,6 +4122,8 @@ impl Timeline { if l0_count >= self.get_compaction_threshold() { self.l0_compaction_trigger.notify_one(); } + self.l0_compaction_trigger + .on_l0_update(self.timeline_id, l0_count); // Delay the next flush to backpressure if compaction can't keep up. We delay by the // flush duration such that the flush takes 2x as long. This is propagated up to WAL @@ -5067,20 +5072,22 @@ impl Timeline { // image layer generation taking too long time and blocking L0 compaction. So in this // mode, we also inspect the current number of L0 layers and skip image layer generation // if there are too many of them. - let num_of_l0_layers = { - let layers = self.layers.read().await; - layers.layer_map()?.level0_deltas().len() - }; - let image_preempt_threshold = self.get_image_creation_preempt_threshold() - * self.get_compaction_threshold(); - if image_preempt_threshold != 0 && num_of_l0_layers >= image_preempt_threshold { - tracing::info!( - "preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {}", - partition.start().unwrap(), partition.end().unwrap(), num_of_l0_layers - ); - last_partition_processed = Some(partition.clone()); - all_generated = false; - break; + if let Some((max_num_of_l0_layers, timeline_id)) = + self.l0_compaction_trigger.get_max_l0_count() + { + let image_preempt_threshold = self.get_image_creation_preempt_threshold() + * self.get_compaction_threshold(); + if image_preempt_threshold != 0 + && max_num_of_l0_layers >= image_preempt_threshold + { + tracing::info!( + "preempt image layer generation at {lsn} when processing partition {}..{}: too many L0 layers {} on timeline {}", + partition.start().unwrap(), partition.end().unwrap(), max_num_of_l0_layers, timeline_id + ); + last_partition_processed = Some(partition.clone()); + all_generated = false; + break; + } } } } @@ -5459,8 +5466,13 @@ impl Timeline { self.remote_client .schedule_compaction_update(&remove_layers, new_deltas)?; + let l0_count = guard.layer_map()?.level0_deltas().len(); + drop_wlock(guard); + self.l0_compaction_trigger + .on_l0_update(self.timeline_id, l0_count); + Ok(()) }