diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 6f9e5c0261..1ea42a91b8 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use std::time::Instant; use api::v1::region::compact_request; +use api::v1::region::compact_request::Options; use common_base::Plugins; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, error, info, warn}; @@ -50,9 +51,9 @@ use crate::compaction::picker::{new_picker, CompactionTask}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ - CompactRegionSnafu, Error, GetSchemaMetadataSnafu, RegionClosedSnafu, RegionDroppedSnafu, - RegionTruncatedSnafu, RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, - TimeoutSnafu, + CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, + RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result, + TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::projection::ProjectionMapper; @@ -93,13 +94,6 @@ impl CompactionRequest { pub(crate) fn region_id(&self) -> RegionId { self.current_version.metadata.region_id } - - /// Push waiter to the request. - pub(crate) fn push_waiter(&mut self, mut waiter: OptionOutputTx) { - if let Some(waiter) = waiter.take_inner() { - self.waiters.push(waiter); - } - } } /// Compaction scheduler tracks and manages compaction tasks. @@ -150,8 +144,24 @@ impl CompactionScheduler { max_parallelism: usize, ) -> Result<()> { if let Some(status) = self.region_status.get_mut(®ion_id) { - // Region is compacting. Add the waiter to pending list. - status.merge_waiter(waiter); + match compact_options { + Options::Regular(_) => { + // Region is compacting. Add the waiter to pending list. + status.merge_waiter(waiter); + } + options @ Options::StrictWindow(_) => { + // Incoming compaction request is manually triggered. + status.set_pending_request(PendingCompaction { + options, + waiter, + max_parallelism, + }); + info!( + "Region {} is compacting, manually compaction will be re-scheduled.", + region_id + ); + } + } return Ok(()); } @@ -188,6 +198,35 @@ impl CompactionScheduler { return; }; + if let Some(pending_request) = std::mem::take(&mut status.pending_request) { + let PendingCompaction { + options, + waiter, + max_parallelism, + } = pending_request; + + let request = status.new_compaction_request( + self.request_sender.clone(), + waiter, + self.engine_config.clone(), + self.cache_manager.clone(), + manifest_ctx, + self.listener.clone(), + schema_metadata_manager, + max_parallelism, + ); + + if let Err(e) = self.schedule_compaction_request(request, options).await { + error!(e; "Failed to continue pending manual compaction for region id: {}", region_id); + } else { + debug!( + "Successfully scheduled manual compaction for region id: {}", + region_id + ); + } + return; + } + // We should always try to compact the region until picker returns None. let request = status.new_compaction_request( self.request_sender.clone(), @@ -424,27 +463,6 @@ impl Drop for CompactionScheduler { } } -/// Pending compaction tasks. -struct PendingCompaction { - waiters: Vec, -} - -impl PendingCompaction { - /// Push waiter to the request. - fn push_waiter(&mut self, mut waiter: OptionOutputTx) { - if let Some(waiter) = waiter.take_inner() { - self.waiters.push(waiter); - } - } - - /// Send compaction error to waiter. - fn on_failure(&mut self, region_id: RegionId, err: Arc) { - for waiter in self.waiters.drain(..) { - waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id })); - } - } -} - /// Finds TTL of table by first examine table options then database options. async fn find_ttl( table_id: TableId, @@ -478,10 +496,10 @@ struct CompactionStatus { version_control: VersionControlRef, /// Access layer of the region. access_layer: AccessLayerRef, - /// Compaction pending to schedule. - /// - /// For simplicity, we merge all pending compaction requests into one. - pending_compaction: Option, + /// Pending waiters for compaction. + waiters: Vec, + /// Pending compactions that are supposed to run as soon as current compaction task finished. + pending_request: Option, } impl CompactionStatus { @@ -495,23 +513,44 @@ impl CompactionStatus { region_id, version_control, access_layer, - pending_compaction: None, + waiters: Vec::new(), + pending_request: None, } } - /// Merge the watier to the pending compaction. - fn merge_waiter(&mut self, waiter: OptionOutputTx) { - let pending = self - .pending_compaction - .get_or_insert_with(|| PendingCompaction { - waiters: Vec::new(), - }); - pending.push_waiter(waiter); + /// Merge the waiter to the pending compaction. + fn merge_waiter(&mut self, mut waiter: OptionOutputTx) { + if let Some(waiter) = waiter.take_inner() { + self.waiters.push(waiter); + } } - fn on_failure(self, err: Arc) { - if let Some(mut pending) = self.pending_compaction { - pending.on_failure(self.region_id, err.clone()); + /// Set pending compaction request or replace current value if already exist. + fn set_pending_request(&mut self, pending: PendingCompaction) { + if let Some(mut prev) = self.pending_request.replace(pending) { + debug!( + "Replace pending compaction options with new request {:?} for region: {}", + prev.options, self.region_id + ); + if let Some(waiter) = prev.waiter.take_inner() { + waiter.send(ManualCompactionOverrideSnafu.fail()); + } + } + } + + fn on_failure(mut self, err: Arc) { + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } + + if let Some(pending_compaction) = self.pending_request { + pending_compaction + .waiter + .send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); } } @@ -522,7 +561,7 @@ impl CompactionStatus { fn new_compaction_request( &mut self, request_sender: Sender, - waiter: OptionOutputTx, + mut waiter: OptionOutputTx, engine_config: Arc, cache_manager: CacheManagerRef, manifest_ctx: &ManifestContextRef, @@ -532,26 +571,26 @@ impl CompactionStatus { ) -> CompactionRequest { let current_version = CompactionVersion::from(self.version_control.current().version); let start_time = Instant::now(); - let mut req = CompactionRequest { + let mut waiters = Vec::with_capacity(self.waiters.len() + 1); + waiters.extend(std::mem::take(&mut self.waiters)); + + if let Some(waiter) = waiter.take_inner() { + waiters.push(waiter); + } + + CompactionRequest { engine_config, current_version, access_layer: self.access_layer.clone(), request_sender: request_sender.clone(), - waiters: Vec::new(), + waiters, start_time, cache_manager, manifest_ctx: manifest_ctx.clone(), listener, schema_metadata_manager, max_parallelism, - }; - - if let Some(pending) = self.pending_compaction.take() { - req.waiters = pending.waiters; } - req.push_waiter(waiter); - - req } } @@ -689,8 +728,20 @@ fn get_expired_ssts( .collect() } +/// Pending compaction request that is supposed to run after current task is finished, +/// typically used for manual compactions. +struct PendingCompaction { + /// Compaction options. Currently, it can only be [StrictWindow]. + pub(crate) options: compact_request::Options, + /// Waiters of pending requests. + pub(crate) waiter: OptionOutputTx, + /// Max parallelism for pending compaction. + pub(crate) max_parallelism: usize, +} + #[cfg(test)] mod tests { + use api::v1::region::StrictWindow; use tokio::sync::oneshot; use super::*; @@ -763,6 +814,7 @@ mod tests { #[tokio::test] async fn test_schedule_on_finished() { + common_telemetry::init_default_ut_logging(); let job_scheduler = Arc::new(VecScheduler::default()); let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); let (tx, _rx) = mpsc::channel(4); @@ -828,6 +880,119 @@ mod tests { purger.clone(), ); // The task is pending. + let (tx, _rx) = oneshot::channel(); + scheduler + .schedule_compaction( + region_id, + compact_request::Options::Regular(Default::default()), + &version_control, + &env.access_layer, + OptionOutputTx::new(Some(OutputTx::new(tx))), + &manifest_ctx, + schema_metadata_manager.clone(), + 1, + ) + .await + .unwrap(); + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(1, job_scheduler.num_jobs()); + assert!(!scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .waiters + .is_empty()); + + // On compaction finished and schedule next compaction. + scheduler + .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone()) + .await; + assert_eq!(1, scheduler.region_status.len()); + assert_eq!(2, job_scheduler.num_jobs()); + + // 5 files for next compaction. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &[], + purger.clone(), + ); + let (tx, _rx) = oneshot::channel(); + // The task is pending. + scheduler + .schedule_compaction( + region_id, + compact_request::Options::Regular(Default::default()), + &version_control, + &env.access_layer, + OptionOutputTx::new(Some(OutputTx::new(tx))), + &manifest_ctx, + schema_metadata_manager, + 1, + ) + .await + .unwrap(); + assert_eq!(2, job_scheduler.num_jobs()); + assert!(!scheduler + .region_status + .get(&builder.region_id()) + .unwrap() + .waiters + .is_empty()); + } + + #[tokio::test] + async fn test_manual_compaction_when_compaction_in_progress() { + common_telemetry::init_default_ut_logging(); + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let purger = builder.file_purger(); + let region_id = builder.region_id(); + + let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); + schema_metadata_manager + .register_region_table_info( + builder.region_id().table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + kv_backend, + ) + .await; + + // 5 files to compact. + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + + let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0] + .files + .values() + .map(|file| file.meta_ref().clone()) + .collect(); + + // 5 files for next compaction and removes old files. + apply_edit( + &version_control, + &[(0, end), (20, end), (40, end), (60, end), (80, end)], + &file_metas, + purger.clone(), + ); + scheduler .schedule_compaction( region_id, @@ -841,14 +1006,36 @@ mod tests { ) .await .unwrap(); + // Should schedule 1 compaction. assert_eq!(1, scheduler.region_status.len()); assert_eq!(1, job_scheduler.num_jobs()); assert!(scheduler .region_status - .get(&builder.region_id()) + .get(®ion_id) .unwrap() - .pending_compaction - .is_some()); + .pending_request + .is_none()); + + // Schedule another manual compaction. + let (tx, _rx) = oneshot::channel(); + scheduler + .schedule_compaction( + region_id, + compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }), + &version_control, + &env.access_layer, + OptionOutputTx::new(Some(OutputTx::new(tx))), + &manifest_ctx, + schema_metadata_manager.clone(), + 1, + ) + .await + .unwrap(); + assert_eq!(1, scheduler.region_status.len()); + // Current job num should be 1 since compaction is in progress. + assert_eq!(1, job_scheduler.num_jobs()); + let status = scheduler.region_status.get(&builder.region_id()).unwrap(); + assert!(status.pending_request.is_some()); // On compaction finished and schedule next compaction. scheduler @@ -856,33 +1043,8 @@ mod tests { .await; assert_eq!(1, scheduler.region_status.len()); assert_eq!(2, job_scheduler.num_jobs()); - // 5 files for next compaction. - apply_edit( - &version_control, - &[(0, end), (20, end), (40, end), (60, end), (80, end)], - &[], - purger.clone(), - ); - // The task is pending. - scheduler - .schedule_compaction( - region_id, - compact_request::Options::Regular(Default::default()), - &version_control, - &env.access_layer, - OptionOutputTx::none(), - &manifest_ctx, - schema_metadata_manager, - 1, - ) - .await - .unwrap(); - assert_eq!(2, job_scheduler.num_jobs()); - assert!(scheduler - .region_status - .get(&builder.region_id()) - .unwrap() - .pending_compaction - .is_some()); + + let status = scheduler.region_status.get(&builder.region_id()).unwrap(); + assert!(status.pending_request.is_none()); } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 9f55c45804..235ed4ca0b 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -939,6 +939,9 @@ pub enum Error { column: String, default_value: String, }, + + #[snafu(display("Manual compaction is override by following operations."))] + ManualCompactionOverride {}, } pub type Result = std::result::Result; @@ -1082,6 +1085,8 @@ impl ErrorExt for Error { PushBloomFilterValue { source, .. } | BloomFilterFinish { source, .. } => { source.status_code() } + + ManualCompactionOverride {} => StatusCode::Cancelled, } }