diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index a4c578c592..61e6ce3cc3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1560,7 +1560,7 @@ impl Tenant { // No timeout here, GC & Compaction should be responsive to the // `TimelineState::Stopping` change. info!("waiting for layer_removal_cs.lock()"); - let layer_removal_guard = timeline.lcache.delete_guard().await; + let layer_removal_guard = timeline.lcache.delete_guard_write().await; info!("got layer_removal_cs.lock(), deleting layer files"); // NB: storage_sync upload tasks that reference these layers have been cancelled diff --git a/pageserver/src/tenant/layer_cache.rs b/pageserver/src/tenant/layer_cache.rs index 599040303b..1df56c21ce 100644 --- a/pageserver/src/tenant/layer_cache.rs +++ b/pageserver/src/tenant/layer_cache.rs @@ -13,7 +13,7 @@ pub struct LayerCache { /// This lock is acquired in [`Timeline::gc`], [`Timeline::compact`], /// and [`Tenant::delete_timeline`]. This is an `Arc` lock because we need an owned /// lock guard in functions that will be spawned to tokio I/O pool (which requires `'static`). - pub layers_removal_lock: Arc>, + pub layers_removal_lock: Arc>, /// We need this lock b/c we do not have any way to prevent GC/compaction from removing files in-use. /// We need to do reference counting on Arc to prevent this from happening, and we can safely remove this lock. @@ -36,13 +36,16 @@ pub struct LayerInUseWrite(tokio::sync::OwnedRwLockWriteGuard<()>); pub struct LayerInUseRead(tokio::sync::OwnedRwLockReadGuard<()>); #[derive(Clone)] -pub struct DeleteGuard(Arc>); +pub struct DeleteGuardRead(Arc>); + +#[derive(Clone)] +pub struct DeleteGuardWrite(Arc>); impl LayerCache { pub fn new(timeline: Weak, tenant_id: TenantId, timeline_id: TimelineId) -> Self { Self { layers_operation_lock: Arc::new(tokio::sync::RwLock::new(())), - layers_removal_lock: Arc::new(tokio::sync::Mutex::new(())), + layers_removal_lock: Arc::new(tokio::sync::RwLock::new(())), mapping: Mutex::new(HashMap::new()), timeline: timeline, tenant_id: tenant_id, @@ -70,9 +73,16 @@ impl LayerCache { } /// Ensures only one of compaction / gc can happen at a time. - pub async fn delete_guard(&self) -> DeleteGuard { - DeleteGuard(Arc::new( - self.layers_removal_lock.clone().lock_owned().await, + pub async fn delete_guard_read(&self) -> DeleteGuardRead { + DeleteGuardRead(Arc::new( + self.layers_removal_lock.clone().read_owned().await, + )) + } + + /// Ensures only one of compaction / gc can happen at a time. + pub async fn delete_guard_write(&self) -> DeleteGuardWrite { + DeleteGuardWrite(Arc::new( + self.layers_removal_lock.clone().write_owned().await, )) } diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 360818b5a7..e33d1aec55 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -14,35 +14,41 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::completion; +use super::timeline::ENABLE_TIERED_COMPACTION; + /// Start per tenant background loops: compaction and gc. pub fn start_background_loops( tenant: &Arc, background_jobs_can_start: Option<&completion::Barrier>, ) { let tenant_id = tenant.tenant_id; - task_mgr::spawn( - BACKGROUND_RUNTIME.handle(), - TaskKind::Compaction, - Some(tenant_id), - None, - &format!("compactor for tenant {tenant_id}"), - false, - { - let tenant = Arc::clone(tenant); - let background_jobs_can_start = background_jobs_can_start.cloned(); - async move { - let cancel = task_mgr::shutdown_token(); - tokio::select! { - _ = cancel.cancelled() => { return Ok(()) }, - _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} - }; - compaction_loop(tenant, cancel) - .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) - .await; - Ok(()) - } - }, - ); + // start two compaction threads + let range = if ENABLE_TIERED_COMPACTION { 0..2 } else { 0..1 }; + for _ in range { + task_mgr::spawn( + BACKGROUND_RUNTIME.handle(), + TaskKind::Compaction, + Some(tenant_id), + None, + &format!("compactor for tenant {tenant_id}"), + false, + { + let tenant = Arc::clone(tenant); + let background_jobs_can_start = background_jobs_can_start.cloned(); + async move { + let cancel = task_mgr::shutdown_token(); + tokio::select! { + _ = cancel.cancelled() => { return Ok(()) }, + _ = completion::Barrier::maybe_wait(background_jobs_can_start) => {} + }; + compaction_loop(tenant, cancel) + .instrument(info_span!("compaction_loop", tenant_id = %tenant_id)) + .await; + Ok(()) + } + }, + ); + } task_mgr::spawn( BACKGROUND_RUNTIME.handle(), TaskKind::GarbageCollector, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 7d0c3ea23b..30a17e88d0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -22,7 +22,7 @@ use tracing::*; use utils::id::TenantTimelineId; use std::cmp::{max, min, Ordering}; -use std::collections::{BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap, HashSet}; use std::fs; use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; @@ -79,7 +79,7 @@ use self::eviction_task::EvictionTaskTimelineState; use self::walreceiver::{WalReceiver, WalReceiverConf}; use super::config::TenantConf; -use super::layer_cache::{DeleteGuard, LayerCache}; +use super::layer_cache::{DeleteGuardRead, LayerCache, DeleteGuardWrite}; use super::layer_map::BatchedUpdates; use super::remote_timeline_client::index::IndexPart; use super::remote_timeline_client::RemoteTimelineClient; @@ -87,7 +87,7 @@ use super::storage_layer::{ DeltaLayer, ImageLayer, Layer, LayerAccessStatsReset, PersistentLayerDesc, }; -const ENABLE_TIERED_COMPACTION: bool = false; +pub const ENABLE_TIERED_COMPACTION: bool = false; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(super) enum FlushLoopState { @@ -155,6 +155,8 @@ pub struct Timeline { pub(super) lcache: LayerCache, + compacting: Mutex>, + /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. /// It is used by compaction task when it checks if new image layer should be created. @@ -840,7 +842,7 @@ impl Timeline { // Below are functions compact_level0() and create_image_layers() // but they are a bit ad hoc and don't quite work like it's explained // above. Rewrite it. - let layer_removal_cs = self.lcache.delete_guard().await; + let layer_removal_cs = self.lcache.delete_guard_read().await; // Is the timeline being deleted? if self.is_stopping() { return Err(anyhow::anyhow!("timeline is Stopping").into()); @@ -875,7 +877,7 @@ impl Timeline { // 3. Compact let timer = self.metrics.compact_time_histo.start_timer(); - self.compact_level0(layer_removal_cs.clone(), target_file_size, ctx) + self.compact_level0(layer_removal_cs, target_file_size, ctx) .await?; timer.stop_and_record(); } @@ -1174,7 +1176,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; // now lock out layer removal (compaction, gc, timeline deletion) - let layer_removal_guard = self.lcache.delete_guard().await; + let layer_removal_guard = self.lcache.delete_guard_write().await; { // to avoid racing with detach and delete_timeline @@ -1212,7 +1214,7 @@ impl Timeline { fn evict_layer_batch_impl( &self, - _layer_removal_cs: &DeleteGuard, + _layer_removal_cs: &DeleteGuardWrite, local_layer: &Arc, batch_updates: &mut BatchedUpdates<'_>, ) -> anyhow::Result { @@ -1439,6 +1441,7 @@ impl Timeline { layers: tokio::sync::RwLock::new((LayerMap::default(), LayerMapping::new())), lcache: LayerCache::new(myself.clone(), tenant_id, timeline_id), wanted_image_layers: Mutex::new(None), + compacting: Mutex::new(HashSet::default()), walredo_mgr, walreceiver: Mutex::new(None), @@ -2306,7 +2309,7 @@ impl Timeline { fn delete_historic_layer( &self, // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: DeleteGuard, + _layer_removal_cs: DeleteGuardWrite, layer: Arc, _updates: &mut BatchedUpdates<'_>, ) -> anyhow::Result<()> { @@ -2319,7 +2322,7 @@ impl Timeline { fn delete_historic_layer_new( &self, // we cannot remove layers otherwise, since gc and compaction will race - _layer_removal_cs: DeleteGuard, + _layer_removal_cs: DeleteGuardWrite, layer: Arc, updates: &mut BatchedUpdates<'_>, ) -> anyhow::Result<()> { @@ -3406,7 +3409,7 @@ impl Timeline { /// start of level0 files compaction, the on-demand download should be revisited as well. async fn compact_level0_phase1( &self, - _layer_removal_cs: DeleteGuard, + layer_removal_cs: DeleteGuardRead, target_file_size: u64, ctx: &RequestContext, ) -> Result { @@ -3756,7 +3759,7 @@ impl Timeline { /// async fn compact_level0( self: &Arc, - layer_removal_cs: DeleteGuard, + layer_removal_cs: DeleteGuardRead, target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { @@ -3773,6 +3776,9 @@ impl Timeline { .compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx) .await?; + drop(layer_removal_cs); + let layer_removal_cs = self.lcache.delete_guard_write().await; + if new_layers.is_empty() && deltas_to_compact.is_empty() { // nothing to do return Ok(()); @@ -3848,7 +3854,10 @@ impl Timeline { Ok(()) } - fn get_compact_task(tier_sizes: Vec<(usize, u64)>) -> Option> { + fn get_compact_task( + skip_tiers: &HashSet, + tier_sizes: Vec<(usize, u64)>, + ) -> Option> { let size_ratio = 1.25; let space_amplification_ratio = 2.0; let max_merge_width = 20; @@ -3858,15 +3867,19 @@ impl Timeline { let (_, last_tier_size) = *tier_sizes.last().unwrap(); let estimated_space_amp = (total_tier_size - last_tier_size) as f64 / last_tier_size as f64; if estimated_space_amp > space_amplification_ratio { - info!("full compaction triggered by space amplification"); - let tiers = tier_sizes - .iter() - .rev() - .take(max_merge_width) - .rev() - .map(|(tier_id, _)| *tier_id) - .collect::>(); - return Some(tiers); + if tier_sizes.is_empty() { + info!("full compaction cannot be triggered as some layers are being compacted"); + } else { + info!("full compaction triggered by space amplification"); + let tiers = tier_sizes + .iter() + .rev() + .take(max_merge_width) + .rev() + .map(|(tier_id, _)| *tier_id) + .collect::>(); + return Some(tiers); + } } // Trigger 2: by size ratio @@ -3884,6 +3897,9 @@ impl Timeline { .collect_vec(); return Some(compact_tiers); } + if skip_tiers.contains(&tier_id) { + break; + } total_size_up_to_lvl += size; compact_tiers.push(tier_id); } @@ -3892,7 +3908,7 @@ impl Timeline { async fn compact_tiered_phase1( &self, - _layer_removal_cs: DeleteGuard, + _layer_removal_cs: DeleteGuardRead, target_file_size: u64, ctx: &RequestContext, ) -> Result, CompactionError> { @@ -3918,19 +3934,24 @@ impl Timeline { // Gather the files to compact in this iteration. let tier_sizes: Vec<(usize, u64)> = layers.sorted_runs.compute_tier_sizes(); + let mut compacting_tiers = self.compacting.lock().unwrap(); - let Some(tier_to_compact) = Self::get_compact_task(tier_sizes) else { + let Some(tier_to_compact) = Self::get_compact_task(&compacting_tiers, tier_sizes) else { return Ok(None); }; - println!("tier_to_compact: {tier_to_compact:?}"); - let min_merge_width = 3; if tier_to_compact.len() < min_merge_width { return Ok(None); } + println!("tier_to_compact: {tier_to_compact:?}"); + for &tier in &tier_to_compact { + compacting_tiers.insert(tier); + } + drop(compacting_tiers); + let mut deltas_to_compact_layers = vec![]; for (tier_id, layers) in layers.sorted_runs.runs.iter() { if tier_to_compact.contains(tier_id) { @@ -4179,6 +4200,11 @@ impl Timeline { drop(all_keys_iter); // So that deltas_to_compact is no longer borrowed + let mut compacting_tiers = self.compacting.lock().unwrap(); + for &tier in &tier_to_compact { + compacting_tiers.remove(&tier); + } + Ok(Some(CompactTieredPhase1Result { new_layers, new_tier_at: *tier_to_compact.last().unwrap(), @@ -4190,7 +4216,7 @@ impl Timeline { /// Tiered Compaction for level > 0 async fn compact_tiered( self: &Arc, - layer_removal_cs: DeleteGuard, + layer_removal_cs: DeleteGuardRead, target_file_size: u64, ctx: &RequestContext, ) -> Result<(), CompactionError> { @@ -4202,6 +4228,9 @@ impl Timeline { .compact_tiered_phase1(layer_removal_cs.clone(), target_file_size, ctx) .await? else { return Ok(()); }; + drop(layer_removal_cs); + let layer_removal_cs = self.lcache.delete_guard_write().await; + println!("new_tier_at: {:?}", new_tier_at); println!("removed_tiers: {:?}", removed_tiers); @@ -4406,7 +4435,7 @@ impl Timeline { fail_point!("before-timeline-gc"); - let layer_removal_cs = self.lcache.delete_guard().await; + let layer_removal_cs = self.lcache.delete_guard_write().await; // Is the timeline being deleted? if self.is_stopping() { anyhow::bail!("timeline is Stopping"); @@ -4444,7 +4473,7 @@ impl Timeline { async fn gc_timeline( &self, - layer_removal_cs: DeleteGuard, + layer_removal_cs: DeleteGuardWrite, horizon_cutoff: Lsn, pitr_cutoff: Lsn, retain_lsns: Vec,