feat(pageserver): add automatic trigger for gc-compaction (#10798)

## Problem

part of https://github.com/neondatabase/neon/issues/9114

## Summary of changes

Add the auto trigger for gc-compaction. It computes two values: L1 size
and L2 size. When L1 size >= initial trigger threshold, we will trigger
an initial gc-compaction. When l1_size / l2_size >=
gc_compaction_ratio_percent, we will trigger the "tiered" gc-compaction.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
Alex Chi Z.
2025-02-25 09:50:39 -05:00
committed by GitHub
parent b7fcf2c7a7
commit 015092d259
11 changed files with 446 additions and 71 deletions

View File

@@ -584,7 +584,7 @@ pub mod tenant_conf_defaults {
// image layers should be created.
pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_GC_COMPACTION_ENABLED: bool = false;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 10240000;
pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_KB: u64 = 5 * 1024 * 1024; // 5GB
pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100;
}

View File

@@ -2396,7 +2396,8 @@ async fn timeline_checkpoint_handler(
CompactionError::ShuttingDown => ApiError::ShuttingDown,
CompactionError::Offload(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::CollectKeySpaceError(e) => ApiError::InternalServerError(anyhow::anyhow!(e)),
CompactionError::Other(e) => ApiError::InternalServerError(e)
CompactionError::Other(e) => ApiError::InternalServerError(e),
CompactionError::AlreadyRunning(_) => ApiError::InternalServerError(anyhow::anyhow!(e)),
}
)?;
}

View File

@@ -34,6 +34,7 @@ use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;
use remote_storage::GenericRemoteStorage;
use remote_storage::TimeoutOrCancel;
use remote_timeline_client::index::GcCompactionState;
use remote_timeline_client::manifest::{
OffloadedTimelineManifest, TenantManifest, LATEST_TENANT_MANIFEST_VERSION,
};
@@ -1168,6 +1169,7 @@ impl Tenant {
resources,
CreateTimelineCause::Load,
idempotency.clone(),
index_part.gc_compaction.clone(),
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
@@ -3125,20 +3127,19 @@ impl Tenant {
// If we're done compacting, check the scheduled GC compaction queue for more work.
if outcome == CompactionOutcome::Done {
let queue = self
.scheduled_compaction_tasks
.lock()
.unwrap()
.get(&timeline.timeline_id)
.cloned();
if let Some(queue) = queue {
outcome = queue
.iteration(cancel, ctx, &self.gc_block, &timeline)
.instrument(
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
)
.await?;
}
let queue = {
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
guard
.entry(timeline.timeline_id)
.or_insert_with(|| Arc::new(GcCompactionQueue::new()))
.clone()
};
outcome = queue
.iteration(cancel, ctx, &self.gc_block, &timeline)
.instrument(
info_span!("gc_compact_timeline", timeline_id = %timeline.timeline_id),
)
.await?;
}
// If we're done compacting, offload the timeline if requested.
@@ -3195,6 +3196,7 @@ impl Tenant {
.unwrap()
.fail(&CIRCUIT_BREAKERS_BROKEN, err);
}
CompactionError::AlreadyRunning(_) => {}
}
}
@@ -4150,6 +4152,7 @@ impl Tenant {
resources: TimelineResources,
cause: CreateTimelineCause,
create_idempotency: CreateTimelineIdempotency,
gc_compaction_state: Option<GcCompactionState>,
) -> anyhow::Result<Arc<Timeline>> {
let state = match cause {
CreateTimelineCause::Load => {
@@ -4181,6 +4184,7 @@ impl Tenant {
state,
self.attach_wal_lag_cooldown.clone(),
create_idempotency,
gc_compaction_state,
self.cancel.child_token(),
);
@@ -5246,6 +5250,7 @@ impl Tenant {
resources,
CreateTimelineCause::Load,
create_guard.idempotency.clone(),
None,
)
.context("Failed to create timeline data structure")?;

View File

@@ -184,6 +184,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
pub(crate) use download::download_initdb_tar_zst;
use index::GcCompactionState;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
@@ -913,6 +914,18 @@ impl RemoteTimelineClient {
Ok(())
}
/// Launch an index-file upload operation in the background, setting `import_pgdata` field.
pub(crate) fn schedule_index_upload_for_gc_compaction_state_update(
self: &Arc<Self>,
gc_compaction_state: GcCompactionState,
) -> anyhow::Result<()> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
upload_queue.dirty.gc_compaction = Some(gc_compaction_state);
self.schedule_index_upload(upload_queue);
Ok(())
}
///
/// Launch an index-file upload operation in the background, if necessary.
///

View File

@@ -85,9 +85,36 @@ pub struct IndexPart {
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) rel_size_migration: Option<RelSizeMigration>,
/// The LSN of gc-compaction horizon. Once gc-compaction is finished for all layer files below an LSN, this LSN will be updated.
/// Not used anymore -- kept here for backwards compatibility. Merged into the `gc_compaction` field.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) l2_lsn: Option<Lsn>,
l2_lsn: Option<Lsn>,
/// State for the garbage-collecting compaction pass.
///
/// Garbage-collecting compaction (gc-compaction) prunes `Value`s that are outside
/// the PITR window and not needed by child timelines.
///
/// A commonly used synonym for this compaction pass is
/// "bottommost-compaction" because the affected LSN range
/// is the "bottom" of the (key,lsn) map.
///
/// Gc-compaction is a quite expensive operation; that's why we use
/// trigger condition.
/// This field here holds the state pertaining to that trigger condition
/// and (in future) to the progress of the gc-compaction, so that it's
/// resumable across restarts & migrations.
///
/// Note that the underlying algorithm is _also_ called `gc-compaction`
/// in most places & design docs; but in fact it is more flexible than
/// just the specific use case here; it needs a new name.
#[serde(skip_serializing_if = "Option::is_none", default)]
pub(crate) gc_compaction: Option<GcCompactionState>,
}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct GcCompactionState {
/// The upper bound of the last completed garbage-collecting compaction, aka. L2 LSN.
pub(crate) last_completed_lsn: Lsn,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -123,10 +150,11 @@ impl IndexPart {
/// - 10: +import_pgdata
/// - 11: +rel_size_migration
/// - 12: +l2_lsn
const LATEST_VERSION: usize = 12;
/// - 13: +gc_compaction
const LATEST_VERSION: usize = 13;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -144,6 +172,7 @@ impl IndexPart {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
}
}
@@ -450,6 +479,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -497,6 +527,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -545,6 +576,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -596,6 +628,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap();
@@ -642,6 +675,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -691,6 +725,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -745,6 +780,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -804,6 +840,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -864,6 +901,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -929,6 +967,7 @@ mod tests {
import_pgdata: None,
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1007,6 +1046,7 @@ mod tests {
}))),
rel_size_migration: None,
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1086,6 +1126,7 @@ mod tests {
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: None,
gc_compaction: None,
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();
@@ -1093,7 +1134,7 @@ mod tests {
}
#[test]
fn v12_l2_lsn_is_parsed() {
fn v12_v13_l2_gc_ompaction_is_parsed() {
let example = r#"{
"version": 12,
"layer_metadata":{
@@ -1124,7 +1165,10 @@ mod tests {
}
},
"rel_size_migration": "legacy",
"l2_lsn": "0/16960E8"
"l2_lsn": "0/16960E8",
"gc_compaction": {
"last_completed_lsn": "0/16960E8"
}
}"#;
let expected = IndexPart {
@@ -1166,6 +1210,9 @@ mod tests {
}))),
rel_size_migration: Some(RelSizeMigration::Legacy),
l2_lsn: Some("0/16960E8".parse::<Lsn>().unwrap()),
gc_compaction: Some(GcCompactionState {
last_completed_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
}),
};
let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap();

View File

@@ -295,6 +295,7 @@ fn log_compaction_error(
let level = match err {
ShuttingDown => return,
Offload(_) => Level::ERROR,
AlreadyRunning(_) => Level::ERROR,
CollectKeySpaceError(CollectKeySpaceError::Cancelled) => Level::INFO,
CollectKeySpaceError(_) => Level::ERROR,
_ if task_cancelled => Level::INFO,

View File

@@ -19,7 +19,7 @@ use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use compaction::CompactionOutcome;
use compaction::{CompactionOutcome, GcCompactionCombinedSettings};
use enumset::EnumSet;
use fail::fail_point;
use futures::FutureExt;
@@ -148,6 +148,7 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::index::GcCompactionState;
use super::{
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
MaybeOffloaded,
@@ -323,6 +324,9 @@ pub struct Timeline {
ancestor_timeline: Option<Arc<Timeline>>,
ancestor_lsn: Lsn,
// The LSN of gc-compaction that was last applied to this timeline.
gc_compaction_state: ArcSwap<Option<GcCompactionState>>,
pub(super) metrics: TimelineMetrics,
// `Timeline` doesn't write these metrics itself, but it manages the lifetime. Code
@@ -1889,6 +1893,7 @@ impl Timeline {
// abruptly stall nor resume L0 flushes in these cases.
Err(CompactionError::Offload(_)) => {}
Err(CompactionError::ShuttingDown) => {}
Err(CompactionError::AlreadyRunning(_)) => {}
};
result
@@ -2531,6 +2536,31 @@ impl Timeline {
)
}
fn get_gc_compaction_settings(&self) -> GcCompactionCombinedSettings {
let tenant_conf = &self.tenant_conf.load();
let gc_compaction_enabled = tenant_conf
.tenant_conf
.gc_compaction_enabled
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_enabled);
let gc_compaction_initial_threshold_kb = tenant_conf
.tenant_conf
.gc_compaction_initial_threshold_kb
.unwrap_or(
self.conf
.default_tenant_conf
.gc_compaction_initial_threshold_kb,
);
let gc_compaction_ratio_percent = tenant_conf
.tenant_conf
.gc_compaction_ratio_percent
.unwrap_or(self.conf.default_tenant_conf.gc_compaction_ratio_percent);
GcCompactionCombinedSettings {
gc_compaction_enabled,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
}
}
fn get_image_creation_preempt_threshold(&self) -> usize {
let tenant_conf = self.tenant_conf.load();
tenant_conf
@@ -2609,6 +2639,7 @@ impl Timeline {
state: TimelineState,
attach_wal_lag_cooldown: Arc<OnceLock<WalLagCooldown>>,
create_idempotency: crate::tenant::CreateTimelineIdempotency,
gc_compaction_state: Option<GcCompactionState>,
cancel: CancellationToken,
) -> Arc<Self> {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -2667,6 +2698,8 @@ impl Timeline {
}),
disk_consistent_lsn: AtomicLsn::new(disk_consistent_lsn.0),
gc_compaction_state: ArcSwap::new(Arc::new(gc_compaction_state)),
last_freeze_at: AtomicLsn::new(disk_consistent_lsn.0),
last_freeze_ts: RwLock::new(Instant::now()),
@@ -2831,6 +2864,20 @@ impl Timeline {
);
}
pub(crate) fn update_gc_compaction_state(
&self,
gc_compaction_state: GcCompactionState,
) -> anyhow::Result<()> {
self.gc_compaction_state
.store(Arc::new(Some(gc_compaction_state.clone())));
self.remote_client
.schedule_index_upload_for_gc_compaction_state_update(gc_compaction_state)
}
pub(crate) fn get_gc_compaction_state(&self) -> Option<GcCompactionState> {
self.gc_compaction_state.load_full().as_ref().clone()
}
/// Creates and starts the wal receiver.
///
/// This function is expected to be called at most once per Timeline's lifecycle
@@ -5373,6 +5420,8 @@ pub(crate) enum CompactionError {
CollectKeySpaceError(CollectKeySpaceError),
#[error(transparent)]
Other(anyhow::Error),
#[error("Compaction already running: {0}")]
AlreadyRunning(&'static str),
}
impl From<OffloadError> for CompactionError {

View File

@@ -20,11 +20,13 @@ use bytes::Bytes;
use enumset::EnumSet;
use fail::fail_point;
use itertools::Itertools;
use once_cell::sync::Lazy;
use pageserver_api::key::KEY_SIZE;
use pageserver_api::keyspace::ShardedRange;
use pageserver_api::models::CompactInfoResponse;
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
use serde::Serialize;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utils::critical;
@@ -37,6 +39,7 @@ use crate::statvfs::Statvfs;
use crate::tenant::checks::check_valid_layermap;
use crate::tenant::gc_block::GcBlock;
use crate::tenant::layer_map::LayerMap;
use crate::tenant::remote_timeline_client::index::GcCompactionState;
use crate::tenant::remote_timeline_client::WaitCompletionError;
use crate::tenant::storage_layer::batch_split_writer::{
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
@@ -77,13 +80,22 @@ impl std::fmt::Display for GcCompactionJobId {
}
}
pub struct GcCompactionCombinedSettings {
pub gc_compaction_enabled: bool,
pub gc_compaction_initial_threshold_kb: u64,
pub gc_compaction_ratio_percent: u64,
}
#[derive(Debug, Clone)]
pub enum GcCompactionQueueItem {
Manual(CompactOptions),
MetaJob {
/// Compaction options
options: CompactOptions,
/// Whether the compaction is triggered automatically (determines whether we need to update L2 LSN)
auto: bool,
},
SubCompactionJob(CompactOptions),
#[allow(dead_code)]
UpdateL2Lsn(Lsn),
Notify(GcCompactionJobId),
Notify(GcCompactionJobId, Option<Lsn>),
}
impl GcCompactionQueueItem {
@@ -93,7 +105,7 @@ impl GcCompactionQueueItem {
running: bool,
) -> Option<CompactInfoResponse> {
match self {
GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
GcCompactionQueueItem::MetaJob { options, .. } => Some(CompactInfoResponse {
compact_key_range: options.compact_key_range,
compact_lsn_range: options.compact_lsn_range,
sub_compaction: options.sub_compaction,
@@ -107,17 +119,22 @@ impl GcCompactionQueueItem {
running,
job_id: id.0,
}),
GcCompactionQueueItem::UpdateL2Lsn(_) => None,
GcCompactionQueueItem::Notify(_) => None,
GcCompactionQueueItem::Notify(_, _) => None,
}
}
}
#[derive(Default)]
struct GcCompactionGuardItems {
notify: Option<tokio::sync::oneshot::Sender<()>>,
gc_guard: Option<gc_block::Guard>,
permit: Option<OwnedSemaphorePermit>,
}
struct GcCompactionQueueInner {
running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
guards: HashMap<GcCompactionJobId, GcCompactionGuardItems>,
last_id: GcCompactionJobId,
}
@@ -137,14 +154,18 @@ pub struct GcCompactionQueue {
consumer_lock: tokio::sync::Mutex<()>,
}
static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
// Only allow two timelines on one pageserver to run gc compaction at a time.
Arc::new(Semaphore::new(2))
});
impl GcCompactionQueue {
pub fn new() -> Self {
GcCompactionQueue {
inner: std::sync::Mutex::new(GcCompactionQueueInner {
running: None,
queued: VecDeque::new(),
notify: HashMap::new(),
gc_guards: HashMap::new(),
guards: HashMap::new(),
last_id: GcCompactionJobId(0),
}),
consumer_lock: tokio::sync::Mutex::new(()),
@@ -154,8 +175,9 @@ impl GcCompactionQueue {
pub fn cancel_scheduled(&self) {
let mut guard = self.inner.lock().unwrap();
guard.queued.clear();
guard.notify.clear();
guard.gc_guards.clear();
// TODO: if there is a running job, we should keep the gc guard. However, currently, the cancel
// API is only used for testing purposes, so we can drop everything here.
guard.guards.clear();
}
/// Schedule a manual compaction job.
@@ -166,29 +188,162 @@ impl GcCompactionQueue {
) -> GcCompactionJobId {
let mut guard = self.inner.lock().unwrap();
let id = guard.next_id();
guard
.queued
.push_back((id, GcCompactionQueueItem::Manual(options)));
if let Some(notify) = notify {
guard.notify.insert(id, notify);
}
guard.queued.push_back((
id,
GcCompactionQueueItem::MetaJob {
options,
auto: false,
},
));
guard.guards.entry(id).or_default().notify = notify;
info!("scheduled compaction job id={}", id);
id
}
/// Schedule an auto compaction job.
fn schedule_auto_compaction(
&self,
options: CompactOptions,
permit: OwnedSemaphorePermit,
) -> GcCompactionJobId {
let mut guard = self.inner.lock().unwrap();
let id = guard.next_id();
guard.queued.push_back((
id,
GcCompactionQueueItem::MetaJob {
options,
auto: true,
},
));
guard.guards.entry(id).or_default().permit = Some(permit);
id
}
/// Trigger an auto compaction.
#[allow(dead_code)]
pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
pub async fn trigger_auto_compaction(&self, timeline: &Arc<Timeline>) {
let GcCompactionCombinedSettings {
gc_compaction_enabled,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
} = timeline.get_gc_compaction_settings();
if !gc_compaction_enabled {
return;
}
if self.remaining_jobs_num() > 0 {
// Only schedule auto compaction when the queue is empty
return;
}
if timeline.ancestor_timeline().is_some() {
// Do not trigger auto compaction for child timelines. We haven't tested
// it enough in staging yet.
return;
}
let Ok(permit) = CONCURRENT_GC_COMPACTION_TASKS.clone().try_acquire_owned() else {
// Only allow one compaction run at a time. TODO: As we do `try_acquire_owned`, we cannot ensure
// the fairness of the lock across timelines. We should listen for both `acquire` and `l0_compaction_trigger`
// to ensure the fairness while avoid starving other tasks.
return;
};
let gc_compaction_state = timeline.get_gc_compaction_state();
let l2_lsn = gc_compaction_state
.map(|x| x.last_completed_lsn)
.unwrap_or(Lsn::INVALID);
let layers = {
let guard = timeline.layers.read().await;
let layer_map = guard.layer_map().unwrap();
layer_map.iter_historic_layers().collect_vec()
};
let mut l2_size: u64 = 0;
let mut l1_size = 0;
let gc_cutoff = *timeline.get_applied_gc_cutoff_lsn();
for layer in layers {
if layer.lsn_range.start <= l2_lsn {
l2_size += layer.file_size();
} else if layer.lsn_range.start <= gc_cutoff {
l1_size += layer.file_size();
}
}
fn trigger_compaction(
l1_size: u64,
l2_size: u64,
gc_compaction_initial_threshold_kb: u64,
gc_compaction_ratio_percent: u64,
) -> bool {
const AUTO_TRIGGER_LIMIT: u64 = 150 * 1024 * 1024 * 1024; // 150GB
if l1_size >= AUTO_TRIGGER_LIMIT || l2_size >= AUTO_TRIGGER_LIMIT {
// Do not auto-trigger when physical size >= 150GB
return false;
}
// initial trigger
if l2_size == 0 && l1_size >= gc_compaction_initial_threshold_kb * 1024 {
info!(
"trigger auto-compaction because l1_size={} >= gc_compaction_initial_threshold_kb={}",
l1_size,
gc_compaction_initial_threshold_kb
);
return true;
}
// size ratio trigger
if l2_size == 0 {
return false;
}
if l1_size as f64 / l2_size as f64 >= (gc_compaction_ratio_percent as f64 / 100.0) {
info!(
"trigger auto-compaction because l1_size={} / l2_size={} > gc_compaction_ratio_percent={}",
l1_size,
l2_size,
gc_compaction_ratio_percent
);
return true;
}
false
}
if trigger_compaction(
l1_size,
l2_size,
gc_compaction_initial_threshold_kb,
gc_compaction_ratio_percent,
) {
self.schedule_auto_compaction(
CompactOptions {
flags: {
let mut flags = EnumSet::new();
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
flags
},
sub_compaction: true,
compact_key_range: None,
compact_lsn_range: None,
sub_compaction_max_job_size_mb: None,
},
permit,
);
info!(
"scheduled auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
l1_size, l2_size, l2_lsn, gc_cutoff
);
} else {
info!(
"did not trigger auto gc-compaction: l1_size={}, l2_size={}, l2_lsn={}, gc_cutoff={}",
l1_size, l2_size, l2_lsn, gc_cutoff
);
}
}
/// Notify the caller the job has finished and unblock GC.
fn notify_and_unblock(&self, id: GcCompactionJobId) {
info!("compaction job id={} finished", id);
let mut guard = self.inner.lock().unwrap();
if let Some(blocking) = guard.gc_guards.remove(&id) {
drop(blocking)
}
if let Some(tx) = guard.notify.remove(&id) {
let _ = tx.send(());
if let Some(items) = guard.guards.remove(&id) {
drop(items.gc_guard);
if let Some(tx) = items.notify {
let _ = tx.send(());
}
}
}
@@ -198,9 +353,10 @@ impl GcCompactionQueue {
options: CompactOptions,
timeline: &Arc<Timeline>,
gc_block: &GcBlock,
auto: bool,
) -> Result<(), CompactionError> {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs: Vec<GcCompactJob> = timeline
let jobs = timeline
.gc_compaction_split_jobs(
GcCompactJob::from_compact_options(options.clone()),
options.sub_compaction_max_job_size_mb,
@@ -223,6 +379,9 @@ impl GcCompactionQueue {
let jobs_len = jobs.len();
let mut pending_tasks = Vec::new();
// gc-compaction might pick more layers or fewer layers to compact. The L2 LSN does not need to be accurate.
// And therefore, we simply assume the maximum LSN of all jobs is the expected L2 LSN.
let expected_l2_lsn = jobs.iter().map(|job| job.compact_lsn_range.end).max();
for job in jobs {
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
// until we do further refactors to allow directly call `compact_with_gc`.
@@ -240,10 +399,16 @@ impl GcCompactionQueue {
};
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
}
pending_tasks.push(GcCompactionQueueItem::Notify(id));
if !auto {
pending_tasks.push(GcCompactionQueueItem::Notify(id, None));
} else {
pending_tasks.push(GcCompactionQueueItem::Notify(id, expected_l2_lsn));
}
{
let mut guard = self.inner.lock().unwrap();
guard.gc_guards.insert(id, gc_guard);
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
let mut tasks = Vec::new();
for task in pending_tasks {
let id = guard.next_id();
@@ -267,29 +432,41 @@ impl GcCompactionQueue {
gc_block: &GcBlock,
timeline: &Arc<Timeline>,
) -> Result<CompactionOutcome, CompactionError> {
let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
let has_pending_tasks;
let (id, item) = {
let mut guard = self.inner.lock().unwrap();
let Some((id, item)) = guard.queued.pop_front() else {
return Ok(CompactionOutcome::Done);
};
guard.running = Some((id, item.clone()));
has_pending_tasks = !guard.queued.is_empty();
(id, item)
let Ok(_one_op_at_a_time_guard) = self.consumer_lock.try_lock() else {
return Err(CompactionError::AlreadyRunning("cannot run gc-compaction because another gc-compaction is running. This should not happen because we only call this function from the gc-compaction queue."));
};
let has_pending_tasks;
let Some((id, item)) = ({
let mut guard = self.inner.lock().unwrap();
if let Some((id, item)) = guard.queued.pop_front() {
guard.running = Some((id, item.clone()));
has_pending_tasks = !guard.queued.is_empty();
Some((id, item))
} else {
has_pending_tasks = false;
None
}
}) else {
self.trigger_auto_compaction(timeline).await;
// Always yield after triggering auto-compaction. Gc-compaction is a low-priority task and we
// have not implemented preemption mechanism yet. We always want to yield it to more important
// tasks if there is one.
return Ok(CompactionOutcome::Done);
};
match item {
GcCompactionQueueItem::Manual(options) => {
GcCompactionQueueItem::MetaJob { options, auto } => {
if !options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
} else if options.sub_compaction {
self.handle_sub_compaction(id, options, timeline, gc_block)
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
self.handle_sub_compaction(id, options, timeline, gc_block, auto)
.await?;
} else {
// Auto compaction always enables sub-compaction so we don't need to handle update_l2_lsn
// in this branch.
let gc_guard = match gc_block.start().await {
Ok(guard) => guard,
Err(e) => {
@@ -301,20 +478,37 @@ impl GcCompactionQueue {
};
{
let mut guard = self.inner.lock().unwrap();
guard.gc_guards.insert(id, gc_guard);
guard.guards.entry(id).or_default().gc_guard = Some(gc_guard);
}
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
self.notify_and_unblock(id);
}
}
GcCompactionQueueItem::SubCompactionJob(options) => {
// TODO: error handling, clear the queue if any task fails?
let _ = timeline.compact_with_options(cancel, options, ctx).await?;
}
GcCompactionQueueItem::Notify(id) => {
GcCompactionQueueItem::Notify(id, l2_lsn) => {
self.notify_and_unblock(id);
}
GcCompactionQueueItem::UpdateL2Lsn(_) => {
unreachable!()
if let Some(l2_lsn) = l2_lsn {
let current_l2_lsn = timeline
.get_gc_compaction_state()
.map(|x| x.last_completed_lsn)
.unwrap_or(Lsn::INVALID);
if l2_lsn >= current_l2_lsn {
info!("l2_lsn updated to {}", l2_lsn);
timeline
.update_gc_compaction_state(GcCompactionState {
last_completed_lsn: l2_lsn,
})
.map_err(CompactionError::Other)?;
} else {
warn!(
"l2_lsn updated to {} but it is less than the current l2_lsn {}",
l2_lsn, current_l2_lsn
);
}
}
}
}
{
@@ -339,7 +533,6 @@ impl GcCompactionQueue {
(guard.running.clone(), guard.queued.clone())
}
#[allow(dead_code)]
pub fn remaining_jobs_num(&self) -> usize {
let guard = self.inner.lock().unwrap();
guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }

View File

@@ -137,6 +137,11 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
timelines.remove(&timeline.timeline_id).expect(
"timeline that we were deleting was concurrently removed from 'timelines' map",
);
tenant
.scheduled_compaction_tasks
.lock()
.unwrap()
.remove(&timeline.timeline_id);
}
TimelineOrOffloaded::Offloaded(timeline) => {
let offloaded_timeline = timelines_offloaded
@@ -300,6 +305,7 @@ impl DeleteTimelineFlow {
// Thus we need to skip the validation here.
CreateTimelineCause::Delete,
crate::tenant::CreateTimelineIdempotency::FailWithConflict, // doesn't matter what we put here
None, // doesn't matter what we put here
)
.context("create_timeline_struct")?;

View File

@@ -143,5 +143,12 @@ fn remove_timeline_from_tenant(
.remove(&timeline.timeline_id)
.expect("timeline that we were deleting was concurrently removed from 'timelines' map");
// Clear the compaction queue for this timeline
tenant
.scheduled_compaction_tasks
.lock()
.unwrap()
.remove(&timeline.timeline_id);
Arc::strong_count(&timeline)
}

View File

@@ -466,6 +466,59 @@ def test_pageserver_gc_compaction_interrupt(neon_env_builder: NeonEnvBuilder):
ps_http.timeline_gc(tenant_id, timeline_id, None)
@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_trigger(neon_env_builder: NeonEnvBuilder):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
"compaction_period": "5s",
# No PiTR interval and small GC horizon
"pitr_interval": "0s",
"gc_horizon": f"{1024 * 16}",
"lsn_lease_length": "0s",
"gc_compaction_enabled": "true",
"gc_compaction_initial_threshold_kb": "16",
"gc_compaction_ratio_percent": "50",
# Do not generate image layers with create_image_layers
"image_layer_creation_check_threshold": "100",
}
env = neon_env_builder.init_start(initial_tenant_conf=SMOKE_CONF)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
row_count = 10000
churn_rounds = 20
ps_http = env.pageserver.http_client()
workload = Workload(env, tenant_id, timeline_id)
workload.init(env.pageserver.id)
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)
ps_http.timeline_gc(
tenant_id, timeline_id, None
) # Force refresh gc info to have gc_cutoff generated
def compaction_finished():
queue_depth = len(ps_http.timeline_compact_info(tenant_id, timeline_id))
assert queue_depth == 0
for i in range(1, churn_rounds + 1):
log.info(f"Running churn round {i}/{churn_rounds} ...")
workload.churn_rows(row_count, env.pageserver.id, upload=True)
wait_until(compaction_finished, timeout=60)
workload.validate(env.pageserver.id)
# ensure gc_compaction is scheduled and it's actually running (instead of skipping due to no layers picked)
env.pageserver.assert_log_contains("gc_compact_timeline.*picked .* layers for compaction")
log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
# Stripe sizes in number of pages.
TINY_STRIPES = 16
LARGE_STRIPES = 32768