diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 2276ed428b..9693d232ee 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -279,7 +279,7 @@ impl GcCompactionQueue { gc_compaction_ratio_percent: u64, ) -> bool { const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB - if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT { + if l1_size + l2_size >= AUTO_TRIGGER_LIMIT { // Do not auto-trigger when physical size >= 150GB return false; } @@ -350,6 +350,11 @@ impl GcCompactionQueue { } } + fn clear_running_job(&self) { + let mut guard = self.inner.lock().unwrap(); + guard.running = None; + } + async fn handle_sub_compaction( &self, id: GcCompactionJobId, @@ -361,12 +366,20 @@ impl GcCompactionQueue { info!( "running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs" ); - let jobs = timeline + let res = timeline .gc_compaction_split_jobs( GcCompactJob::from_compact_options(options.clone()), options.sub_compaction_max_job_size_mb, ) - .await?; + .await; + let jobs = match res { + Ok(jobs) => jobs, + Err(err) => { + warn!("cannot split gc-compaction jobs: {}, unblocked gc", err); + self.notify_and_unblock(id); + return Err(err); + } + }; if jobs.is_empty() { info!("no jobs to run, skipping scheduled compaction task"); self.notify_and_unblock(id); @@ -446,7 +459,18 @@ impl GcCompactionQueue { if let Err(err) = &res { log_compaction_error(err, None, cancel.is_cancelled()); } - res + match res { + Ok(res) => Ok(res), + Err(CompactionError::ShuttingDown) => Err(CompactionError::ShuttingDown), + Err(_) => { + // There are some cases where traditional gc might collect some layer + // files causing gc-compaction cannot read the full history of the key. + // This needs to be resolved in the long-term by improving the compaction + // process. For now, let's simply avoid such errors triggering the + // circuit breaker. + Ok(CompactionOutcome::Skipped) + } + } } async fn iteration_inner( @@ -512,9 +536,16 @@ impl GcCompactionQueue { let mut guard = self.inner.lock().unwrap(); guard.guards.entry(id).or_default().gc_guard = Some(gc_guard); } - let compaction_result = - timeline.compact_with_options(cancel, options, ctx).await?; - self.notify_and_unblock(id); + let res = timeline.compact_with_options(cancel, options, ctx).await; + let compaction_result = match res { + Ok(res) => res, + Err(err) => { + warn!(%err, "failed to run gc-compaction, gc unblocked"); + self.notify_and_unblock(id); + self.clear_running_job(); + return Err(err); + } + }; if compaction_result == CompactionOutcome::YieldForL0 { yield_for_l0 = true; } @@ -553,10 +584,7 @@ impl GcCompactionQueue { } } } - { - let mut guard = self.inner.lock().unwrap(); - guard.running = None; - } + self.clear_running_job(); Ok(if yield_for_l0 { tracing::info!("give up gc-compaction: yield for L0 compaction"); CompactionOutcome::YieldForL0