From b5d54ba52a92f263c087eff56ab7ced9499c1ae6 Mon Sep 17 00:00:00 2001 From: "Alex Chi Z." <4198311+skyzh@users.noreply.github.com> Date: Fri, 10 Jan 2025 15:53:00 -0500 Subject: [PATCH] refactor(pageserver): move queue logic to compaction.rs (#10330) ## Problem close https://github.com/neondatabase/neon/issues/10031, part of https://github.com/neondatabase/neon/issues/9114 ## Summary of changes Move the compaction job generation to `compaction.rs`, thus making the code more readable and debuggable. We now also return running job through the get compaction job API, versus before we only return scheduled jobs. --------- Signed-off-by: Alex Chi Z --- libs/pageserver_api/src/models.rs | 2 + pageserver/src/http/routes.rs | 14 +- pageserver/src/tenant.rs | 175 +++-------- pageserver/src/tenant/timeline/compaction.rs | 293 ++++++++++++++++++- 4 files changed, 324 insertions(+), 160 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 39390d7647..9af6c4021d 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -272,6 +272,8 @@ pub struct CompactInfoResponse { pub compact_key_range: Option, pub compact_lsn_range: Option, pub sub_compaction: bool, + pub running: bool, + pub job_id: usize, } #[derive(Serialize, Deserialize, Clone)] diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 60ef4c3702..94e0b101bd 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; use crate::DEFAULT_PG_VERSION; use crate::{disk_usage_eviction_task, tenant}; use pageserver_api::models::{ - CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, - TimelineGcRequest, TimelineInfo, + StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest, + TimelineInfo, }; use utils::{ auth::SwappableJwtAuth, @@ -2052,15 +2052,7 @@ async fn timeline_compact_info_handler( let tenant = state .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; - let res = tenant.get_scheduled_compaction_tasks(timeline_id); - let mut resp = Vec::new(); - for item in res { - resp.push(CompactInfoResponse { - compact_key_range: item.compact_key_range, - compact_lsn_range: item.compact_lsn_range, - sub_compaction: item.sub_compaction, - }); - } + let resp = tenant.get_scheduled_compaction_tasks(timeline_id); json_response(StatusCode::OK, resp) } .instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8e61d09de7..2928c435cb 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -21,6 +21,7 @@ use enumset::EnumSet; use futures::stream::FuturesUnordered; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::CompactInfoResponse; use pageserver_api::models::LsnLease; use pageserver_api::models::TimelineArchivalState; use pageserver_api::models::TimelineState; @@ -37,21 +38,17 @@ use remote_timeline_client::manifest::{ }; use remote_timeline_client::UploadQueueNotReadyError; use std::collections::BTreeMap; -use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; -use timeline::compaction::GcCompactJob; -use timeline::compaction::ScheduledCompactionTask; +use timeline::compaction::GcCompactionQueue; use timeline::import_pgdata; use timeline::offload::offload_timeline; use timeline::offload::OffloadError; -use timeline::CompactFlags; use timeline::CompactOptions; -use timeline::CompactionError; use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; @@ -347,10 +344,8 @@ pub struct Tenant { /// Overhead of mutex is acceptable because compaction is done with a multi-second period. compaction_circuit_breaker: std::sync::Mutex, - /// Scheduled compaction tasks. Currently, this can only be populated by triggering - /// a manual gc-compaction from the manual compaction API. - scheduled_compaction_tasks: - std::sync::Mutex>>, + /// Scheduled gc-compaction tasks. + scheduled_compaction_tasks: std::sync::Mutex>>, /// If the tenant is in Activating state, notify this to encourage it /// to proceed to Active as soon as possible, rather than waiting for lazy @@ -2997,104 +2992,18 @@ impl Tenant { if has_pending_l0_compaction_task { Some(true) } else { - let mut has_pending_scheduled_compaction_task; - let next_scheduled_compaction_task = { - let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) { - if !tline_pending_tasks.is_empty() { - info!( - "{} tasks left in the compaction schedule queue", - tline_pending_tasks.len() - ); - } - let next_task = tline_pending_tasks.pop_front(); - has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty(); - next_task - } else { - has_pending_scheduled_compaction_task = false; - None - } + let queue = { + let guard = self.scheduled_compaction_tasks.lock().unwrap(); + guard.get(timeline_id).cloned() }; - if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task - { - if !next_scheduled_compaction_task - .options - .flags - .contains(CompactFlags::EnhancedGcBottomMostCompaction) - { - warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); - } else if next_scheduled_compaction_task.options.sub_compaction { - info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); - let jobs: Vec = timeline - .gc_compaction_split_jobs( - GcCompactJob::from_compact_options( - next_scheduled_compaction_task.options.clone(), - ), - next_scheduled_compaction_task - .options - .sub_compaction_max_job_size_mb, - ) - .await - .map_err(CompactionError::Other)?; - if jobs.is_empty() { - info!("no jobs to run, skipping scheduled compaction task"); - } else { - has_pending_scheduled_compaction_task = true; - let jobs_len = jobs.len(); - let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - let tline_pending_tasks = guard.entry(*timeline_id).or_default(); - for (idx, job) in jobs.into_iter().enumerate() { - // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` - // until we do further refactors to allow directly call `compact_with_gc`. - let mut flags: EnumSet = EnumSet::default(); - flags |= CompactFlags::EnhancedGcBottomMostCompaction; - if job.dry_run { - flags |= CompactFlags::DryRun; - } - let options = CompactOptions { - flags, - sub_compaction: false, - compact_key_range: Some(job.compact_key_range.into()), - compact_lsn_range: Some(job.compact_lsn_range.into()), - sub_compaction_max_job_size_mb: None, - }; - tline_pending_tasks.push_back(if idx == jobs_len - 1 { - ScheduledCompactionTask { - options, - // The last job in the queue sends the signal and releases the gc guard - result_tx: next_scheduled_compaction_task - .result_tx - .take(), - gc_block: next_scheduled_compaction_task - .gc_block - .take(), - } - } else { - ScheduledCompactionTask { - options, - result_tx: None, - gc_block: None, - } - }); - } - info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len); - } - } else { - let _ = timeline - .compact_with_options( - cancel, - next_scheduled_compaction_task.options, - ctx, - ) - .instrument(info_span!("scheduled_compact_timeline", %timeline_id)) - .await?; - if let Some(tx) = next_scheduled_compaction_task.result_tx.take() { - // TODO: we can send compaction statistics in the future - tx.send(()).ok(); - } - } + if let Some(queue) = queue { + let has_pending_tasks = queue + .iteration(cancel, ctx, &self.gc_block, timeline) + .await?; + Some(has_pending_tasks) + } else { + Some(false) } - Some(has_pending_scheduled_compaction_task) } } else { None @@ -3124,34 +3033,32 @@ impl Tenant { } /// Cancel scheduled compaction tasks - pub(crate) fn cancel_scheduled_compaction( - &self, - timeline_id: TimelineId, - ) -> Vec { + pub(crate) fn cancel_scheduled_compaction(&self, timeline_id: TimelineId) { let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) { - let current_tline_pending_tasks = std::mem::take(tline_pending_tasks); - current_tline_pending_tasks.into_iter().collect() - } else { - Vec::new() + if let Some(q) = guard.get_mut(&timeline_id) { + q.cancel_scheduled(); } } pub(crate) fn get_scheduled_compaction_tasks( &self, timeline_id: TimelineId, - ) -> Vec { - use itertools::Itertools; - let guard = self.scheduled_compaction_tasks.lock().unwrap(); - guard - .get(&timeline_id) - .map(|tline_pending_tasks| { - tline_pending_tasks - .iter() - .map(|x| x.options.clone()) - .collect_vec() - }) - .unwrap_or_default() + ) -> Vec { + let res = { + let guard = self.scheduled_compaction_tasks.lock().unwrap(); + guard.get(&timeline_id).map(|q| q.remaining_jobs()) + }; + let Some((running, remaining)) = res else { + return Vec::new(); + }; + let mut result = Vec::new(); + if let Some((id, running)) = running { + result.extend(running.into_compact_info_resp(id, true)); + } + for (id, job) in remaining { + result.extend(job.into_compact_info_resp(id, false)); + } + result } /// Schedule a compaction task for a timeline. @@ -3160,20 +3067,12 @@ impl Tenant { timeline_id: TimelineId, options: CompactOptions, ) -> anyhow::Result> { - let gc_guard = match self.gc_block.start().await { - Ok(guard) => guard, - Err(e) => { - bail!("cannot run gc-compaction because gc is blocked: {}", e); - } - }; let (tx, rx) = tokio::sync::oneshot::channel(); let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - let tline_pending_tasks = guard.entry(timeline_id).or_default(); - tline_pending_tasks.push_back(ScheduledCompactionTask { - options, - result_tx: Some(tx), - gc_block: Some(gc_guard), - }); + let q = guard + .entry(timeline_id) + .or_insert_with(|| Arc::new(GcCompactionQueue::new())); + q.schedule_manual_compaction(options, Some(tx)); Ok(rx) } diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 55cde8603e..05f8d476f9 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,7 +4,7 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. -use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; @@ -16,10 +16,12 @@ use super::{ use anyhow::{anyhow, bail, Context}; use bytes::Bytes; +use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; use pageserver_api::key::KEY_SIZE; use pageserver_api::keyspace::ShardedRange; +use pageserver_api::models::CompactInfoResponse; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use serde::Serialize; use tokio_util::sync::CancellationToken; @@ -30,6 +32,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder} use crate::page_cache; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; +use crate::tenant::gc_block::GcBlock; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::batch_split_writer::{ BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter, @@ -63,16 +66,284 @@ use super::CompactionError; /// Maximum number of deltas before generating an image layer in bottom-most compaction. const COMPACTION_DELTA_THRESHOLD: usize = 5; -/// A scheduled compaction task. -pub(crate) struct ScheduledCompactionTask { - /// It's unfortunate that we need to store a compact options struct here because the only outer - /// API we can call here is `compact_with_options` which does a few setup calls before starting the - /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future. - pub options: CompactOptions, - /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender. - pub result_tx: Option>, - /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard. - pub gc_block: Option, +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct GcCompactionJobId(pub usize); + +impl std::fmt::Display for GcCompactionJobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +#[derive(Debug, Clone)] +pub enum GcCompactionQueueItem { + Manual(CompactOptions), + SubCompactionJob(CompactOptions), + #[allow(dead_code)] + UpdateL2Lsn(Lsn), + Notify(GcCompactionJobId), +} + +impl GcCompactionQueueItem { + pub fn into_compact_info_resp( + self, + id: GcCompactionJobId, + running: bool, + ) -> Option { + match self { + GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse { + compact_key_range: options.compact_key_range, + compact_lsn_range: options.compact_lsn_range, + sub_compaction: options.sub_compaction, + running, + job_id: id.0, + }), + GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse { + compact_key_range: options.compact_key_range, + compact_lsn_range: options.compact_lsn_range, + sub_compaction: options.sub_compaction, + running, + job_id: id.0, + }), + GcCompactionQueueItem::UpdateL2Lsn(_) => None, + GcCompactionQueueItem::Notify(_) => None, + } + } +} + +struct GcCompactionQueueInner { + running: Option<(GcCompactionJobId, GcCompactionQueueItem)>, + queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>, + notify: HashMap>, + gc_guards: HashMap, + last_id: GcCompactionJobId, +} + +impl GcCompactionQueueInner { + fn next_id(&mut self) -> GcCompactionJobId { + let id = self.last_id; + self.last_id = GcCompactionJobId(id.0 + 1); + id + } +} + +/// A structure to store gc_compaction jobs. +pub struct GcCompactionQueue { + /// All items in the queue, and the currently-running job. + inner: std::sync::Mutex, + /// Ensure only one thread is consuming the queue. + consumer_lock: tokio::sync::Mutex<()>, +} + +impl GcCompactionQueue { + pub fn new() -> Self { + GcCompactionQueue { + inner: std::sync::Mutex::new(GcCompactionQueueInner { + running: None, + queued: VecDeque::new(), + notify: HashMap::new(), + gc_guards: HashMap::new(), + last_id: GcCompactionJobId(0), + }), + consumer_lock: tokio::sync::Mutex::new(()), + } + } + + pub fn cancel_scheduled(&self) { + let mut guard = self.inner.lock().unwrap(); + guard.queued.clear(); + guard.notify.clear(); + guard.gc_guards.clear(); + } + + /// Schedule a manual compaction job. + pub fn schedule_manual_compaction( + &self, + options: CompactOptions, + notify: Option>, + ) -> GcCompactionJobId { + let mut guard = self.inner.lock().unwrap(); + let id = guard.next_id(); + guard + .queued + .push_back((id, GcCompactionQueueItem::Manual(options))); + if let Some(notify) = notify { + guard.notify.insert(id, notify); + } + info!("scheduled compaction job id={}", id); + id + } + + /// Trigger an auto compaction. + #[allow(dead_code)] + pub fn trigger_auto_compaction(&self, _: &Arc) {} + + /// Notify the caller the job has finished and unblock GC. + fn notify_and_unblock(&self, id: GcCompactionJobId) { + info!("compaction job id={} finished", id); + let mut guard = self.inner.lock().unwrap(); + if let Some(blocking) = guard.gc_guards.remove(&id) { + drop(blocking) + } + if let Some(tx) = guard.notify.remove(&id) { + let _ = tx.send(()); + } + } + + async fn handle_sub_compaction( + &self, + id: GcCompactionJobId, + options: CompactOptions, + timeline: &Arc, + gc_block: &GcBlock, + ) -> Result<(), CompactionError> { + info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); + let jobs: Vec = timeline + .gc_compaction_split_jobs( + GcCompactJob::from_compact_options(options.clone()), + options.sub_compaction_max_job_size_mb, + ) + .await + .map_err(CompactionError::Other)?; + if jobs.is_empty() { + info!("no jobs to run, skipping scheduled compaction task"); + self.notify_and_unblock(id); + } else { + let gc_guard = match gc_block.start().await { + Ok(guard) => guard, + Err(e) => { + return Err(CompactionError::Other(anyhow!( + "cannot run gc-compaction because gc is blocked: {}", + e + ))); + } + }; + + let jobs_len = jobs.len(); + let mut pending_tasks = Vec::new(); + for job in jobs { + // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` + // until we do further refactors to allow directly call `compact_with_gc`. + let mut flags: EnumSet = EnumSet::default(); + flags |= CompactFlags::EnhancedGcBottomMostCompaction; + if job.dry_run { + flags |= CompactFlags::DryRun; + } + let options = CompactOptions { + flags, + sub_compaction: false, + compact_key_range: Some(job.compact_key_range.into()), + compact_lsn_range: Some(job.compact_lsn_range.into()), + sub_compaction_max_job_size_mb: None, + }; + pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options)); + } + pending_tasks.push(GcCompactionQueueItem::Notify(id)); + { + let mut guard = self.inner.lock().unwrap(); + guard.gc_guards.insert(id, gc_guard); + let mut tasks = Vec::new(); + for task in pending_tasks { + let id = guard.next_id(); + tasks.push((id, task)); + } + tasks.reverse(); + for item in tasks { + guard.queued.push_front(item); + } + } + info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len); + } + Ok(()) + } + + /// Take a job from the queue and process it. Returns if there are still pending tasks. + pub async fn iteration( + &self, + cancel: &CancellationToken, + ctx: &RequestContext, + gc_block: &GcBlock, + timeline: &Arc, + ) -> Result { + let _one_op_at_a_time_guard = self.consumer_lock.lock().await; + let has_pending_tasks; + let (id, item) = { + let mut guard = self.inner.lock().unwrap(); + let Some((id, item)) = guard.queued.pop_front() else { + return Ok(false); + }; + guard.running = Some((id, item.clone())); + has_pending_tasks = !guard.queued.is_empty(); + (id, item) + }; + + match item { + GcCompactionQueueItem::Manual(options) => { + if !options + .flags + .contains(CompactFlags::EnhancedGcBottomMostCompaction) + { + warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options); + } else if options.sub_compaction { + self.handle_sub_compaction(id, options, timeline, gc_block) + .await?; + } else { + let gc_guard = match gc_block.start().await { + Ok(guard) => guard, + Err(e) => { + return Err(CompactionError::Other(anyhow!( + "cannot run gc-compaction because gc is blocked: {}", + e + ))); + } + }; + { + let mut guard = self.inner.lock().unwrap(); + guard.gc_guards.insert(id, gc_guard); + } + let _ = timeline + .compact_with_options(cancel, options, ctx) + .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id)) + .await?; + self.notify_and_unblock(id); + } + } + GcCompactionQueueItem::SubCompactionJob(options) => { + let _ = timeline + .compact_with_options(cancel, options, ctx) + .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id)) + .await?; + } + GcCompactionQueueItem::Notify(id) => { + self.notify_and_unblock(id); + } + GcCompactionQueueItem::UpdateL2Lsn(_) => { + unreachable!() + } + } + { + let mut guard = self.inner.lock().unwrap(); + guard.running = None; + } + Ok(has_pending_tasks) + } + + #[allow(clippy::type_complexity)] + pub fn remaining_jobs( + &self, + ) -> ( + Option<(GcCompactionJobId, GcCompactionQueueItem)>, + VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>, + ) { + let guard = self.inner.lock().unwrap(); + (guard.running.clone(), guard.queued.clone()) + } + + #[allow(dead_code)] + pub fn remaining_jobs_num(&self) -> usize { + let guard = self.inner.lock().unwrap(); + guard.queued.len() + if guard.running.is_some() { 1 } else { 0 } + } } /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will