diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index d2120690ac..981d4f1d11 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -24,12 +24,13 @@ mod twcs; mod window; use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Instant; use api::v1::region::compact_request; use api::v1::region::compact_request::Options; use common_base::Plugins; +use common_base::cancellation::CancellationHandle; use common_memory_manager::OnExhaustedPolicy; use common_meta::key::SchemaMetadataManagerRef; use common_telemetry::{debug, error, info, warn}; @@ -53,9 +54,9 @@ use crate::compaction::picker::{CompactionTask, PickerOutput, new_picker}; use crate::compaction::task::CompactionTaskImpl; use crate::config::MitoConfig; use crate::error::{ - CompactRegionSnafu, Error, GetSchemaMetadataSnafu, ManualCompactionOverrideSnafu, - RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, RemoteCompactionSnafu, Result, - TimeRangePredicateOverflowSnafu, TimeoutSnafu, + CompactRegionSnafu, CompactionCancelledSnafu, Error, GetSchemaMetadataSnafu, + ManualCompactionOverrideSnafu, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, + RemoteCompactionSnafu, Result, TimeRangePredicateOverflowSnafu, TimeoutSnafu, }; use crate::metrics::{COMPACTION_STAGE_ELAPSED, INFLIGHT_COMPACTION_COUNT}; use crate::read::BoxedRecordBatchStream; @@ -186,7 +187,7 @@ impl CompactionScheduler { } // The region can compact directly. - let mut status: CompactionStatus = + let mut status = CompactionStatus::new(region_id, version_control.clone(), access_layer.clone()); let request = status.new_compaction_request( self.request_sender.clone(), @@ -199,17 +200,25 @@ impl CompactionScheduler { max_parallelism, ); - let result = self + let result = match self .schedule_compaction_request(request, compact_options) - .await; - if matches!(result, Ok(true)) { - // Only if the compaction request is scheduled successfully, - // we insert the region into the status map. - self.region_status.insert(region_id, status); - } + .await + { + Ok(Some(active_compaction)) => { + // Publish CompactionStatus only after a task has been accepted by the scheduler. + // This avoids exposing a half-initialized region status that could collect pending + // DDL/compaction state even though no compaction is actually running. + status.active_compaction = Some(active_compaction); + self.region_status.insert(region_id, status); + + Ok(()) + } + Ok(None) => Ok(()), + Err(e) => Err(e), + }; self.listener.on_compaction_scheduled(region_id); - result.map(|_| ()) + result } // Handle pending manual compaction request for the region. @@ -251,14 +260,16 @@ impl CompactionScheduler { }; match self.schedule_compaction_request(request, options).await { - Ok(true) => { + Ok(Some(active_compaction)) => { + let status = self.region_status.get_mut(®ion_id).unwrap(); + status.active_compaction = Some(active_compaction); debug!( "Successfully scheduled manual compaction for region id: {}", region_id ); true } - Ok(false) => { + Ok(None) => { // We still need to handle the pending DDL requests. // So we can't return early here. false @@ -278,6 +289,11 @@ impl CompactionScheduler { manifest_ctx: &ManifestContextRef, schema_metadata_manager: SchemaMetadataManagerRef, ) -> Vec { + let Some(status) = self.region_status.get_mut(®ion_id) else { + return Vec::new(); + }; + status.clear_running_task(); + // If there a pending compaction request, handle it first // and defer returning the pending DDL requests to the caller. if self @@ -297,7 +313,6 @@ impl CompactionScheduler { return Vec::new(); }; - // Notify all waiters that compaction is finished. for waiter in std::mem::take(&mut status.waiters) { waiter.send(Ok(0)); } @@ -331,13 +346,17 @@ impl CompactionScheduler { ) .await { - Ok(true) => { + Ok(Some(active_compaction)) => { + self.region_status + .get_mut(®ion_id) + .unwrap() + .active_compaction = Some(active_compaction); debug!( "Successfully scheduled next compaction for region id: {}", region_id ); } - Ok(false) => { + Ok(None) => { // No further compaction tasks can be scheduled; cleanup the `CompactionStatus` for this region. // All DDL requests and pending compaction requests have already been processed. // Safe to remove the region from status tracking. @@ -352,6 +371,14 @@ impl CompactionScheduler { Vec::new() } + /// Notifies the scheduler that the compaction job is cancelled cooperatively. + pub(crate) async fn on_compaction_cancelled( + &mut self, + region_id: RegionId, + ) -> Vec { + self.remove_region_on_cancel(region_id) + } + /// Notifies the scheduler that the compaction job is failed. pub(crate) fn on_compaction_failed(&mut self, region_id: RegionId, err: Arc) { error!(err; "Region {} failed to compact, cancel all pending tasks", region_id); @@ -406,20 +433,23 @@ impl CompactionScheduler { has_pending } - /// Returns true if the region is compacting. - pub(crate) fn is_compacting(&self, region_id: RegionId) -> bool { - self.region_status.contains_key(®ion_id) + pub(crate) fn request_cancel(&mut self, region_id: RegionId) -> RequestCancelResult { + let Some(status) = self.region_status.get_mut(®ion_id) else { + return RequestCancelResult::NotRunning; + }; + + status.request_cancel() } /// Schedules a compaction request. /// - /// Returns true if the compaction request is scheduled successfully. - /// Returns false if no compaction task can be scheduled for this region. + /// Returns the active compaction state if the request is scheduled successfully. + /// Returns `None` if no compaction task can be scheduled for this region. async fn schedule_compaction_request( &mut self, request: CompactionRequest, options: compact_request::Options, - ) -> Result { + ) -> Result> { let region_id = request.region_id(); let (dynamic_compaction_opts, ttl) = find_dynamic_options( region_id.table_id(), @@ -492,7 +522,7 @@ impl CompactionScheduler { for waiter in waiters { waiter.send(Ok(0)); } - return Ok(false); + return Ok(None); }; // If specified to run compaction remotely, we schedule the compaction job remotely. @@ -523,7 +553,7 @@ impl CompactionScheduler { job_id, region_id ); INFLIGHT_COMPACTION_COUNT.inc(); - return Ok(true); + return Ok(Some(ActiveCompaction::Remote)); } Err(e) => { if !dynamic_compaction_opts.fallback_to_local() { @@ -555,21 +585,25 @@ impl CompactionScheduler { // Create a local compaction task. let estimated_bytes = estimate_compaction_bytes(&picker_output); + + let cancel_handle = Arc::new(CancellationHandle::default()); + let state = LocalCompactionState::new(cancel_handle.clone()); let local_compaction_task = Box::new(CompactionTaskImpl { + state: state.clone(), request_sender, waiters, start_time, listener, picker_output, compaction_region, - compactor: Arc::new(DefaultCompactor::default()), + compactor: Arc::new(DefaultCompactor::with_cancel_handle(cancel_handle.clone())), memory_manager: self.memory_manager.clone(), memory_policy: self.memory_policy, estimated_memory_bytes: estimated_bytes, }); self.submit_compaction_task(local_compaction_task, region_id) - .map(|_| true) + .map(|_| Some(ActiveCompaction::Local { state })) } fn submit_compaction_task( @@ -597,6 +631,77 @@ impl CompactionScheduler { // Notifies all pending tasks. status.on_failure(err); } + + fn remove_region_on_cancel(&mut self, region_id: RegionId) -> Vec { + let Some(status) = self.region_status.remove(®ion_id) else { + return Vec::new(); + }; + + status.on_cancel() + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalCompactionState { + cancel_handle: Arc, + commit_started: Arc>, +} + +#[derive(Debug)] +enum ActiveCompaction { + Local { state: LocalCompactionState }, + Remote, +} + +impl LocalCompactionState { + fn new(cancel_handle: Arc) -> Self { + Self { + cancel_handle, + commit_started: Arc::new(Mutex::new(false)), + } + } + + /// Returns the cancellation handle for this compaction task. + pub(crate) fn cancel_handle(&self) -> Arc { + self.cancel_handle.clone() + } + + /// Marks the compaction task as started to commit, + /// which means the compaction task is in the final stage and is about to update region version and manifest. + /// It will reject cancellation request after this method is called. + /// + /// Returns true if this is the first time to mark commit started, false otherwise. + pub(crate) fn mark_commit_started(&self) -> bool { + let mut commit_started = self.commit_started.lock().unwrap(); + if self.cancel_handle.is_cancelled() { + return false; + } + *commit_started = true; + true + } + + /// Request cancellation for this compaction task. + pub(crate) fn request_cancel(&self) -> RequestCancelResult { + // The cancel handle must under the lock of `commit_started` to avoid racing between cancellation and commit. + let commit_started = self.commit_started.lock().unwrap(); + if *commit_started { + return RequestCancelResult::TooLateToCancel; + } + if self.cancel_handle.is_cancelled() { + return RequestCancelResult::AlreadyCancelling; + } + + self.cancel_handle.cancel(); + RequestCancelResult::CancelIssued + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum RequestCancelResult { + CancelIssued, + AlreadyCancelling, + TooLateToCancel, + NotRunning, } impl Drop for CompactionScheduler { @@ -703,6 +808,8 @@ struct CompactionStatus { pending_request: Option, /// Pending DDL requests that should run when compaction is done. pending_ddl_requests: Vec, + /// Active compaction state. + active_compaction: Option, } impl CompactionStatus { @@ -719,9 +826,39 @@ impl CompactionStatus { waiters: Vec::new(), pending_request: None, pending_ddl_requests: Vec::new(), + active_compaction: None, } } + #[cfg(test)] + fn start_local_task(&mut self) -> LocalCompactionState { + let state = LocalCompactionState::new(Arc::new(CancellationHandle::default())); + self.active_compaction = Some(ActiveCompaction::Local { + state: state.clone(), + }); + state + } + + #[cfg(test)] + fn start_remote_task(&mut self) { + self.active_compaction = Some(ActiveCompaction::Remote); + } + + fn request_cancel(&mut self) -> RequestCancelResult { + let Some(active_compaction) = &self.active_compaction else { + return RequestCancelResult::NotRunning; + }; + + match active_compaction { + ActiveCompaction::Local { state, .. } => state.request_cancel(), + ActiveCompaction::Remote => RequestCancelResult::TooLateToCancel, + } + } + + fn clear_running_task(&mut self) -> bool { + self.active_compaction.take().is_some() + } + /// Merge the waiter to the pending compaction. fn merge_waiter(&mut self, mut waiter: OptionOutputTx) { if let Some(waiter) = waiter.take_inner() { @@ -764,6 +901,23 @@ impl CompactionStatus { } } + #[must_use] + fn on_cancel(mut self) -> Vec { + for waiter in self.waiters.drain(..) { + waiter.send(CompactionCancelledSnafu.fail()); + } + + if let Some(pending_compaction) = self.pending_request { + pending_compaction.waiter.send( + Err(Arc::new(CompactionCancelledSnafu.build())).context(CompactRegionSnafu { + region_id: self.region_id, + }), + ); + } + + std::mem::take(&mut self.pending_ddl_requests) + } + /// Creates a new compaction request for compaction picker. /// /// It consumes all pending compaction waiters. @@ -1362,6 +1516,58 @@ mod tests { ); } + #[tokio::test] + async fn test_schedule_compaction_does_not_publish_status_when_schedule_fails() { + common_telemetry::init_default_ut_logging(); + let env = SchedulerEnv::new() + .await + .scheduler(Arc::new(FailingScheduler)); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let mut builder = VersionControlBuilder::new(); + let end = 1000 * 1000; + let version_control = Arc::new( + builder + .push_l0_file(0, end) + .push_l0_file(10, end) + .push_l0_file(50, end) + .push_l0_file(80, end) + .push_l0_file(90, end) + .build(), + ); + let region_id = builder.region_id(); + let manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager(); + schema_metadata_manager + .register_region_table_info( + builder.region_id().table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + kv_backend, + ) + .await; + + let result = scheduler + .schedule_compaction( + region_id, + compact_request::Options::Regular(Default::default()), + &version_control, + &env.access_layer, + OptionOutputTx::none(), + &manifest_ctx, + schema_metadata_manager, + 1, + ) + .await; + + assert!(result.is_err()); + assert!(!scheduler.region_status.contains_key(®ion_id)); + } + #[tokio::test] async fn test_manual_compaction_when_compaction_in_progress() { common_telemetry::init_default_ut_logging(); @@ -1542,6 +1748,11 @@ mod tests { region_id, CompactionStatus::new(region_id, version_control, env.access_layer.clone()), ); + scheduler + .region_status + .get_mut(®ion_id) + .unwrap() + .start_local_task(); let (output_tx, _output_rx) = oneshot::channel(); scheduler.add_ddl_request_to_pending(SenderDdlRequest { @@ -1558,6 +1769,142 @@ mod tests { assert!(scheduler.has_pending_ddls(region_id)); } + #[tokio::test] + async fn test_request_cancel_state_transitions() { + let env = SchedulerEnv::new().await; + let builder = VersionControlBuilder::new(); + let region_id = builder.region_id(); + let version_control = Arc::new(builder.build()); + let mut status = + CompactionStatus::new(region_id, version_control, env.access_layer.clone()); + let state = status.start_local_task(); + + assert_eq!(status.request_cancel(), RequestCancelResult::CancelIssued); + assert!(state.cancel_handle().is_cancelled()); + assert_eq!( + status.request_cancel(), + RequestCancelResult::AlreadyCancelling + ); + + assert!(!state.mark_commit_started()); + assert_eq!( + status.request_cancel(), + RequestCancelResult::AlreadyCancelling + ); + + assert!(status.clear_running_task()); + assert_eq!(status.request_cancel(), RequestCancelResult::NotRunning); + } + + #[tokio::test] + async fn test_request_cancel_remote_compaction_is_too_late() { + let env = SchedulerEnv::new().await; + let builder = VersionControlBuilder::new(); + let region_id = builder.region_id(); + let version_control = Arc::new(builder.build()); + let mut status = + CompactionStatus::new(region_id, version_control, env.access_layer.clone()); + + status.start_remote_task(); + + assert_eq!( + status.request_cancel(), + RequestCancelResult::TooLateToCancel + ); + assert!(status.active_compaction.is_some()); + } + + #[tokio::test] + async fn test_on_compaction_cancelled_returns_pending_ddl_requests() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + let _manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + scheduler + .region_status + .get_mut(®ion_id) + .unwrap() + .start_local_task(); + + let (output_tx, _output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + let pending_ddls = scheduler.on_compaction_cancelled(region_id).await; + + assert_eq!(pending_ddls.len(), 1); + assert!(!scheduler.has_pending_ddls(region_id)); + assert!(!scheduler.region_status.contains_key(®ion_id)); + assert_eq!(job_scheduler.num_jobs(), 0); + } + + #[tokio::test] + async fn test_on_compaction_cancelled_prioritizes_pending_ddls_over_pending_compaction() { + let job_scheduler = Arc::new(VecScheduler::default()); + let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone()); + let (tx, _rx) = mpsc::channel(4); + let mut scheduler = env.mock_compaction_scheduler(tx); + let builder = VersionControlBuilder::new(); + let version_control = Arc::new(builder.build()); + let region_id = builder.region_id(); + let _manifest_ctx = env + .mock_manifest_context(version_control.current().version.metadata.clone()) + .await; + let (_schema_metadata_manager, _kv_backend) = mock_schema_metadata_manager(); + + scheduler.region_status.insert( + region_id, + CompactionStatus::new(region_id, version_control, env.access_layer.clone()), + ); + let status = scheduler.region_status.get_mut(®ion_id).unwrap(); + status.start_local_task(); + let (manual_tx, manual_rx) = oneshot::channel(); + status.set_pending_request(PendingCompaction { + options: compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }), + waiter: OptionOutputTx::from(manual_tx), + max_parallelism: 1, + }); + + let (output_tx, _output_rx) = oneshot::channel(); + scheduler.add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender: OptionOutputTx::from(output_tx), + request: crate::request::DdlRequest::EnterStaging( + store_api::region_request::EnterStagingRequest { + partition_directive: + store_api::region_request::StagingPartitionDirective::RejectAllWrites, + }, + ), + }); + + let pending_ddls = scheduler.on_compaction_cancelled(region_id).await; + + assert_eq!(pending_ddls.len(), 1); + assert!(!scheduler.region_status.contains_key(®ion_id)); + assert_eq!(job_scheduler.num_jobs(), 0); + assert_matches!(manual_rx.await.unwrap(), Err(_)); + } + #[tokio::test] async fn test_pending_ddl_request_failed_on_compaction_failed() { let env = SchedulerEnv::new().await; @@ -1713,6 +2060,11 @@ mod tests { region_id, CompactionStatus::new(region_id, version_control, env.access_layer.clone()), ); + scheduler + .region_status + .get_mut(®ion_id) + .unwrap() + .start_local_task(); let (output_tx, _output_rx) = oneshot::channel(); scheduler.add_ddl_request_to_pending(SenderDdlRequest { @@ -1752,6 +2104,7 @@ mod tests { let (manual_tx, manual_rx) = oneshot::channel(); let mut status = CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone()); + status.start_local_task(); status.set_pending_request(PendingCompaction { options: compact_request::Options::Regular(Default::default()), waiter: OptionOutputTx::from(manual_tx), @@ -1827,6 +2180,7 @@ mod tests { let (manual_tx, manual_rx) = oneshot::channel(); let mut status = CompactionStatus::new(region_id, version_control.clone(), env.access_layer.clone()); + status.start_local_task(); status.set_pending_request(PendingCompaction { options: compact_request::Options::Regular(Default::default()), waiter: OptionOutputTx::from(manual_tx), @@ -1873,6 +2227,11 @@ mod tests { region_id, CompactionStatus::new(region_id, version_control, env.access_layer.clone()), ); + scheduler + .region_status + .get_mut(®ion_id) + .unwrap() + .start_local_task(); let pending_ddls = scheduler .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) @@ -1910,6 +2269,11 @@ mod tests { region_id, CompactionStatus::new(region_id, version_control, env.access_layer.clone()), ); + scheduler + .region_status + .get_mut(®ion_id) + .unwrap() + .start_local_task(); let pending_ddls = scheduler .on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 827fb9ebdd..d28f2db189 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -16,8 +16,9 @@ use std::num::NonZero; use std::sync::Arc; use std::time::Duration; +use common_base::cancellation::{CancellableFuture, CancellationHandle}; use common_meta::key::SchemaMetadataManagerRef; -use common_telemetry::{info, warn}; +use common_telemetry::{debug, info, warn}; use common_time::TimeToLive; use either::Either; use itertools::Itertools; @@ -438,19 +439,25 @@ impl SstMerger for DefaultSstMerger { /// implementations in tests. pub struct DefaultCompactor { merger: M, + cancel_handle: Arc, } -impl Default for DefaultCompactor { - fn default() -> Self { +#[cfg(test)] +impl DefaultCompactor { + pub fn with_merger(merger: M) -> Self { Self { - merger: DefaultSstMerger, + merger, + cancel_handle: Arc::new(CancellationHandle::default()), } } } -impl DefaultCompactor { - pub fn with_merger(merger: M) -> Self { - Self { merger } +impl DefaultCompactor { + pub fn with_cancel_handle(cancel_handle: Arc) -> Self { + Self { + merger: DefaultSstMerger, + cancel_handle, + } } } @@ -503,7 +510,7 @@ where chunk.push(task); } } - let spawned: Vec<_> = chunk + let mut spawned: Vec<_> = chunk .into_iter() .map(|(inputs, fut)| { let handle = common_runtime::spawn_compact(fut); @@ -511,30 +518,56 @@ where }) .collect(); - for (inputs, handle) in spawned { - match handle.await { - Ok(Ok(files)) => { + while let Some((inputs, handle)) = spawned.pop() { + let abort_handle = handle.abort_handle(); + match CancellableFuture::new(handle, self.cancel_handle.clone()).await { + Ok(Ok(Ok(files))) => { output_files.extend(files); compacted_inputs.extend(inputs); } - Ok(Err(e)) => { + Ok(Ok(Err(e))) => { warn!( - e; "Region {} failed to merge compaction output with inputs: [{}], skipping", + e; "Failed to merge compaction output for region: {}, inputs: [{}]", region_id, inputs.iter().map(|f| f.file_id.to_string()).join(",") ); } - Err(e) => { + Ok(Err(e)) => { warn!( "Region {} compaction task join error for inputs: [{}], skipping: {}", region_id, inputs.iter().map(|f| f.file_id.to_string()).join(","), e ); + // If the cancel handle is cancelled, + // cancel the remaining tasks before returns the error. + if self.cancel_handle.is_cancelled() { + abort_handle.abort(); + for (_, handle) in spawned { + handle.abort(); + } + } return Err(e).context(error::JoinSnafu); } + Err(_) => { + debug!( + "Compaction merge cancelled for region: {}, aborting remaining {} spawned tasks", + region_id, + spawned.len(), + ); + abort_handle.abort(); + for (_, handle) in spawned { + handle.abort(); + } + break; + } } } + + if self.cancel_handle.is_cancelled() { + info!("Compaction merge cancelled for region: {}", region_id); + break; + } } // Include expired SSTs in removals — these don't depend on merge success. @@ -584,14 +617,17 @@ where #[cfg(test)] mod tests { - use std::sync::Mutex; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + use std::time::Duration; use store_api::storage::{FileId, RegionId}; + use tokio::time::sleep; - use super::*; + use super::{DefaultCompactor, *}; use crate::cache::CacheManager; use crate::compaction::picker::PickerOutput; + use crate::error::Result; use crate::sst::file::FileHandle; use crate::sst::file_purger::NoopFilePurger; use crate::sst::version::SstVersion; @@ -821,4 +857,85 @@ mod tests { expired_meta.file_id ); } + + #[derive(Clone)] + struct BlockingMerger { + call_idx: Arc, + } + + #[async_trait::async_trait] + impl SstMerger for BlockingMerger { + async fn merge_single_output( + &self, + _compaction_region: CompactionRegion, + _output: CompactionOutput, + _write_opts: WriteOptions, + ) -> Result> { + self.call_idx.fetch_add(1, Ordering::SeqCst); + std::future::pending().await + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_merge_ssts_cancels_spawned_tasks() { + common_telemetry::init_default_ut_logging(); + + let mut compaction_region = new_test_compaction_region().await; + compaction_region.max_parallelism = 2; + + let cancel_handle = Arc::new(CancellationHandle::default()); + let call_idx = Arc::new(AtomicUsize::new(0)); + let compactor = DefaultCompactor { + merger: BlockingMerger { + call_idx: call_idx.clone(), + }, + cancel_handle: cancel_handle.clone(), + }; + + let picker_output = PickerOutput { + outputs: vec![ + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(dummy_file_meta())], + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(dummy_file_meta())], + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(dummy_file_meta())], + filter_deleted: false, + output_time_range: None, + }, + ], + expired_ssts: vec![], + time_window_size: 3600, + max_file_size: None, + }; + + let task = tokio::spawn(async move { + compactor + .merge_ssts(&compaction_region, picker_output) + .await + }); + + sleep(Duration::from_millis(100)).await; + cancel_handle.cancel(); + + let merge_output = task + .await + .expect("merge_ssts should stop after cancellation") + .unwrap(); + + let started = call_idx.load(Ordering::SeqCst); + + assert!(merge_output.files_to_add.is_empty()); + assert!(merge_output.files_to_remove.is_empty()); + assert_eq!(started, 2); + } } diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs index e21db2bab8..a8182353aa 100644 --- a/src/mito2/src/compaction/task.rs +++ b/src/mito2/src/compaction/task.rs @@ -16,13 +16,15 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use std::time::Instant; +use common_base::cancellation::CancellableFuture; use common_memory_manager::OnExhaustedPolicy; use common_telemetry::{error, info, warn}; use itertools::Itertools; use snafu::ResultExt; use tokio::sync::mpsc; -use crate::compaction::compactor::{CompactionRegion, Compactor}; +use crate::compaction::LocalCompactionState; +use crate::compaction::compactor::{CompactionRegion, Compactor, MergeOutput}; use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager}; use crate::compaction::picker::{CompactionTask, PickerOutput}; use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu}; @@ -30,8 +32,8 @@ use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED}; use crate::region::RegionRoleState; use crate::request::{ - BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, RegionEditResult, - WorkerRequest, WorkerRequestWithTime, + BackgroundNotify, CompactionCancelled, CompactionFailed, CompactionFinished, OutputTx, + RegionEditResult, WorkerRequest, WorkerRequestWithTime, }; use crate::sst::file::FileMeta; use crate::worker::WorkerListener; @@ -41,6 +43,8 @@ use crate::{error, metrics}; pub const MAX_PARALLEL_COMPACTION: usize = 1; pub(crate) struct CompactionTaskImpl { + /// Shared local-compaction state for cooperative cancellation. + pub(crate) state: LocalCompactionState, pub compaction_region: CompactionRegion, /// Request sender to notify the worker. pub(crate) request_sender: mpsc::Sender, @@ -184,9 +188,7 @@ impl CompactionTaskImpl { ); } - async fn handle_expiration_and_compaction(&mut self) -> error::Result { - self.mark_files_compacting(true); - + async fn handle_expiration(&mut self) { // 1. In case of local compaction, we can delete expired ssts in advance. if !self.picker_output.expired_ssts.is_empty() { let remove_timer = COMPACTION_STAGE_ELAPSED @@ -203,7 +205,9 @@ impl CompactionTaskImpl { .await; remove_timer.observe_duration(); } + } + async fn handle_compaction(&mut self) -> error::Result { // 2. Merge inputs let merge_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["merge"]) @@ -239,6 +243,13 @@ impl CompactionTaskImpl { .on_merge_ssts_finished(self.compaction_region.region_id) .await; + Ok(compaction_result) + } + + async fn update_manifest( + &self, + compaction_result: crate::compaction::compactor::MergeOutput, + ) -> error::Result { let _manifest_timer = COMPACTION_STAGE_ELAPSED .with_label_values(&["write_manifest"]) .start_timer(); @@ -296,14 +307,61 @@ impl CompactionTask for CompactionTaskImpl { } }; - let notify = match self.handle_expiration_and_compaction().await { - Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished { - region_id: self.compaction_region.region_id, - senders: std::mem::take(&mut self.waiters), - start_time: self.start_time, - edit, - }), - Err(e) => { + // Marks files compacting before compaction and unmark after compaction (even if compaction is cancelled or failed), so that they won't be picked by other compaction tasks. + self.mark_files_compacting(true); + self.handle_expiration().await; + + let cancel_handle = self.state.cancel_handle(); + // Run compaction with cooperative cancellation. + let notify = match CancellableFuture::new( + async { self.handle_compaction().await }, + cancel_handle, + ) + .await + { + Ok(Ok(merge_output)) => { + // Stop accepting cancellation once we are about to publish the compaction edit. + if !self.state.mark_commit_started() { + let senders = std::mem::take(&mut self.waiters); + BackgroundNotify::CompactionCancelled(CompactionCancelled { + region_id: self.compaction_region.region_id, + senders, + }) + } else { + match self.update_manifest(merge_output).await { + Ok(edit) => { + let senders = std::mem::take(&mut self.waiters); + BackgroundNotify::CompactionFinished(CompactionFinished { + region_id: self.compaction_region.region_id, + senders, + start_time: self.start_time, + edit, + }) + } + Err(e) => { + error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id); + let err = Arc::new(e); + self.on_failure(err.clone()); + BackgroundNotify::CompactionFailed(CompactionFailed { + region_id: self.compaction_region.region_id, + err, + }) + } + } + } + } + Err(_) => { + info!( + "Compaction cancelled, region id: {}", + self.compaction_region.region_id + ); + let senders = std::mem::take(&mut self.waiters); + BackgroundNotify::CompactionCancelled(CompactionCancelled { + region_id: self.compaction_region.region_id, + senders, + }) + } + Ok(Err(e)) => { error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id); let err = Arc::new(e); // notify compaction waiters @@ -334,7 +392,7 @@ mod tests { fn test_picker_output_with_expired_ssts() { // Test that PickerOutput correctly includes expired_ssts // This verifies that expired SSTs are properly identified and included - // in the picker output, which is then handled by handle_expiration_and_compaction + // in the picker output, which is then handled by handle_expiration() let file_ids = (0..3).map(|_| FileId::random()).collect::>(); let expired_ssts = vec![ @@ -382,6 +440,6 @@ mod tests { // // The behavior is tested indirectly through integration tests: // - remove_expired() logs errors but doesn't stop compaction - // - handle_expiration_and_compaction() continues even if remove_expired() encounters errors - // - The function is designed to be non-blocking for compaction + // - handle_expiration() continues even if remove_expired() encounters errors + // - The expiration stage is designed to be non-blocking for compaction } diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index cbcad3a58a..dc8aa94fdf 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -18,6 +18,8 @@ use std::sync::Arc; use std::time::Duration; use api::v1::{ColumnSchema, Rows}; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; use datatypes::arrow::array::AsArray; use datatypes::arrow::datatypes::TimestampMillisecondType; @@ -650,7 +652,7 @@ async fn test_readonly_during_compaction_with_format(flat_format: bool) { } #[tokio::test] -async fn test_enter_staging_deferred_by_inflight_compaction() { +async fn test_enter_staging_cancels_inflight_local_compaction_before_commit() { common_telemetry::init_default_ut_logging(); let mut env = TestEnv::new().await; let listener = Arc::new(CompactionListener::default()); @@ -706,17 +708,91 @@ async fn test_enter_staging_deferred_by_inflight_compaction() { }), ) .await - .unwrap(); }); tokio::time::sleep(Duration::from_millis(100)).await; - assert!(!enter_staging.is_finished()); + // The enter staging should finished, and the compaction should be cancelled. + assert!(enter_staging.is_finished()); + let _ = enter_staging.await.unwrap().unwrap(); +} - listener.wake(); - enter_staging.await.unwrap(); +#[tokio::test] +async fn test_manual_compaction_returns_cancelled_when_enter_staging_cancels_it() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new().await; + let listener = Arc::new(CompactionListener::default()); + let engine = env + .create_engine_with( + MitoConfig { + max_background_purges: 1, + ..Default::default() + }, + None, + Some(listener.clone()), + None, + ) + .await; - let region = engine.get_region(region_id).unwrap(); - assert!(region.is_staging()); + let region_id = RegionId::new(2050, 1); + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let request = CreateRequestBuilder::new() + .insert_option("compaction.type", "twcs") + .build(); + let column_schemas = request + .column_metadatas + .iter() + .map(column_metadata_to_column_schema) + .collect::>(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_and_flush(&engine, region_id, &column_schemas, 0..10).await; + put_and_flush(&engine, region_id, &column_schemas, 5..20).await; + + let engine_cloned = engine.clone(); + let compact = tokio::spawn(async move { + engine_cloned + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) + .await + }); + + listener.wait_handle_finished().await; + + let engine_cloned = engine.clone(); + let enter_staging = tokio::spawn(async move { + engine_cloned + .handle_request( + region_id, + RegionRequest::EnterStaging(EnterStagingRequest { + partition_directive: StagingPartitionDirective::RejectAllWrites, + }), + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(100)).await; + assert!(compact.is_finished()); + assert!(enter_staging.is_finished()); + + let err = compact.await.unwrap().unwrap_err(); + assert_eq!(err.status_code(), StatusCode::Cancelled); + + let _ = enter_staging.await.unwrap(); } #[tokio::test] diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index c6b69fe607..eb802d50b7 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -1073,6 +1073,9 @@ pub enum Error { #[snafu(display("Manual compaction is override by following operations."))] ManualCompactionOverride {}, + #[snafu(display("Compaction is cancelled."))] + CompactionCancelled {}, + #[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))] CompactionMemoryExhausted { region_id: RegionId, @@ -1389,7 +1392,7 @@ impl ErrorExt for Error { #[cfg(feature = "vector_index")] VectorIndexBuild { .. } | VectorIndexFinish { .. } => StatusCode::Internal, - ManualCompactionOverride {} => StatusCode::Cancelled, + ManualCompactionOverride {} | CompactionCancelled {} => StatusCode::Cancelled, CompactionMemoryExhausted { source, .. } => source.status_code(), diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 4f8f1b0d51..cc965f48df 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -46,9 +46,9 @@ use store_api::storage::{FileId, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::error::{ - CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu, - FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu, - Result, UnexpectedSnafu, + CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, + Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, + MissingPartitionExprSnafu, Result, UnexpectedSnafu, }; use crate::flush::FlushReason; use crate::manifest::action::{RegionEdit, TruncateKind}; @@ -895,6 +895,8 @@ pub(crate) enum BackgroundNotify { IndexBuildFailed(IndexBuildFailed), /// Compaction has finished. CompactionFinished(CompactionFinished), + /// Compaction has been cancelled cooperatively. + CompactionCancelled(CompactionCancelled), /// Compaction has failed. CompactionFailed(CompactionFailed), /// Truncate result. @@ -991,6 +993,24 @@ pub(crate) struct CompactionFinished { pub(crate) edit: RegionEdit, } +/// Notifies a compaction job has been cancelled cooperatively. +#[derive(Debug)] +pub(crate) struct CompactionCancelled { + /// Region id. + pub(crate) region_id: RegionId, + /// Waiters to wake once the cancellation has been observed by the worker. + pub(crate) senders: Vec, +} + +impl CompactionCancelled { + pub(crate) fn on_success(self) { + for sender in self.senders { + sender.send(CompactionCancelledSnafu {}.fail()); + } + info!("Compaction cancelled for region: {}", self.region_id); + } +} + impl CompactionFinished { pub fn on_success(self) { // only update compaction time on success @@ -1149,10 +1169,13 @@ pub(crate) struct CopyRegionFromRequest { mod tests { use api::v1::value::ValueData; use api::v1::{Row, SemanticType}; + use common_error::ext::ErrorExt; + use common_error::status_code::StatusCode; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnDefaultConstraint; use mito_codec::test_util::i64_value; use store_api::metadata::RegionMetadataBuilder; + use tokio::sync::oneshot; use super::*; use crate::error::Error; @@ -1216,6 +1239,21 @@ mod tests { assert_eq!(None, request.column_index_by_name("c2")); } + #[test] + fn test_compaction_cancelled_sends_cancelled_error() { + let (tx, rx) = oneshot::channel(); + let request = CompactionCancelled { + region_id: RegionId::new(1, 1), + senders: vec![OutputTx::new(tx)], + }; + + request.on_success(); + + let err = rx.blocking_recv().unwrap().unwrap_err(); + assert!(matches!(err, Error::CompactionCancelled { .. })); + assert_eq!(err.status_code(), StatusCode::Cancelled); + } + #[test] fn test_write_request_column_num() { let rows = Rows { diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index fd5ad82f3f..3fb6a63184 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -1196,6 +1196,9 @@ impl RegionWorkerLoop { BackgroundNotify::CompactionFinished(req) => { self.handle_compaction_finished(region_id, req).await } + BackgroundNotify::CompactionCancelled(req) => { + self.handle_compaction_cancelled(region_id, req).await + } BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await, BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await, BackgroundNotify::RegionChange(req) => { diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index 841d10025d..fd021771b1 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -23,7 +23,8 @@ use crate::error::RegionNotFoundSnafu; use crate::metrics::COMPACTION_REQUEST_COUNT; use crate::region::MitoRegionRef; use crate::request::{ - BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx, + BuildIndexRequest, CompactionCancelled, CompactionFailed, CompactionFinished, OnFailure, + OptionOutputTx, }; use crate::sst::index::IndexBuildType; use crate::worker::RegionWorkerLoop; @@ -119,6 +120,28 @@ impl RegionWorkerLoop { self.handle_ddl_requests(&mut pending_ddls).await; } + pub(crate) async fn handle_compaction_cancelled( + &mut self, + region_id: RegionId, + request: CompactionCancelled, + ) where + S: LogStore, + { + request.on_success(); + + // Reuse the scheduler's finish path to wake pending DDLs after a cooperative stop. + let mut pending_ddls = match self.regions.get_region(region_id) { + Some(_) => { + self.compaction_scheduler + .on_compaction_cancelled(region_id) + .await + } + None => Vec::new(), + }; + + self.handle_ddl_requests(&mut pending_ddls).await; + } + /// When compaction fails, we simply log the error. pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) { error!(req.err; "Failed to compact region: {}", req.region_id); diff --git a/src/mito2/src/worker/handle_enter_staging.rs b/src/mito2/src/worker/handle_enter_staging.rs index 83bd51df15..75e7071468 100644 --- a/src/mito2/src/worker/handle_enter_staging.rs +++ b/src/mito2/src/worker/handle_enter_staging.rs @@ -19,6 +19,7 @@ use store_api::logstore::LogStore; use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective}; use store_api::storage::RegionId; +use crate::compaction::RequestCancelResult; use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu}; use crate::flush::FlushReason; use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange}; @@ -98,18 +99,24 @@ impl RegionWorkerLoop { return; } - if self.compaction_scheduler.is_compacting(region_id) { - // Safety: region is compacting, add ddl request to pending queue. - self.compaction_scheduler - .add_ddl_request_to_pending(SenderDdlRequest { - region_id, - sender, - request: DdlRequest::EnterStaging(EnterStagingRequest { - partition_directive, - }), - }); + match self.compaction_scheduler.request_cancel(region_id) { + RequestCancelResult::CancelIssued + | RequestCancelResult::AlreadyCancelling + | RequestCancelResult::TooLateToCancel => { + // Safety: region is compacting or has entered the non-cancellable publish stage, + // keep the DDL pending until the current task finishes or acknowledges cancellation. + self.compaction_scheduler + .add_ddl_request_to_pending(SenderDdlRequest { + region_id, + sender, + request: DdlRequest::EnterStaging(EnterStagingRequest { + partition_directive, + }), + }); - return; + return; + } + RequestCancelResult::NotRunning => {} } self.handle_enter_staging(region, partition_directive, sender);