diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 944c51ebd6..d2120690ac 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -562,7 +562,7 @@ impl CompactionScheduler { listener, picker_output, compaction_region, - compactor: Arc::new(DefaultCompactor {}), + compactor: Arc::new(DefaultCompactor::default()), memory_manager: self.memory_manager.clone(), memory_policy: self.memory_policy, estimated_memory_bytes: estimated_bytes, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ff4317331f..fd3d01b276 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -38,11 +38,10 @@ use crate::compaction::picker::{PickerOutput, new_picker}; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; use crate::error::{ - EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result, + EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::metrics; use crate::read::FlatSource; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; @@ -56,6 +55,7 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; use crate::sst::parquet::WriteOptions; use crate::sst::version::{SstVersion, SstVersionRef}; +use crate::{error, metrics}; /// Region version for compaction that does not hold memtables. #[derive(Clone)] @@ -299,12 +299,28 @@ pub trait Compactor: Send + Sync + 'static { ) -> Result<()>; } -/// DefaultCompactor is the default implementation of Compactor. -pub struct DefaultCompactor; - -impl DefaultCompactor { - /// Merge a single compaction output into SST files. +/// Trait for merging a single compaction output into SST files. +/// +/// This is extracted from `DefaultCompactor` to allow injecting mock +/// implementations in tests. +#[async_trait::async_trait] +pub trait SstMerger: Send + Sync + 'static { async fn merge_single_output( + &self, + compaction_region: CompactionRegion, + output: CompactionOutput, + write_opts: WriteOptions, + ) -> Result>; +} + +/// The production [`SstMerger`] that reads, merges, and writes SST files. +#[derive(Clone)] +pub struct DefaultSstMerger; + +#[async_trait::async_trait] +impl SstMerger for DefaultSstMerger { + async fn merge_single_output( + &self, compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, @@ -424,54 +440,113 @@ impl DefaultCompactor { } } +/// DefaultCompactor is the default implementation of Compactor. +/// +/// It is parameterized by an [`SstMerger`] to allow injecting mock +/// implementations in tests. +pub struct DefaultCompactor { + merger: M, +} + +impl Default for DefaultCompactor { + fn default() -> Self { + Self { + merger: DefaultSstMerger, + } + } +} + +impl DefaultCompactor { + pub fn with_merger(merger: M) -> Self { + Self { merger } + } +} + #[async_trait::async_trait] -impl Compactor for DefaultCompactor { +impl Compactor for DefaultCompactor +where + M: Clone, +{ async fn merge_ssts( &self, compaction_region: &CompactionRegion, mut picker_output: PickerOutput, ) -> Result { - let mut futs = Vec::with_capacity(picker_output.outputs.len()); - let mut compacted_inputs = - Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); let internal_parallelism = compaction_region.max_parallelism.max(1); let compaction_time_window = picker_output.time_window_size; + let region_id = compaction_region.region_id; + + // Build tasks along with their input file metas so we can track which + // inputs correspond to each task. + let mut tasks: Vec<(Vec, _)> = Vec::with_capacity(picker_output.outputs.len()); for output in picker_output.outputs.drain(..) { let inputs_to_remove: Vec<_> = output.inputs.iter().map(|f| f.meta_ref().clone()).collect(); - compacted_inputs.extend(inputs_to_remove.iter().cloned()); let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, max_file_size: picker_output.max_file_size, ..Default::default() }; - futs.push(Self::merge_single_output( - compaction_region.clone(), - output, - write_opts, - )); - } - let mut output_files = Vec::with_capacity(futs.len()); - while !futs.is_empty() { - let mut task_chunk = Vec::with_capacity(internal_parallelism); - for _ in 0..internal_parallelism { - if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_compact(task)); - } - } - let metas = futures::future::try_join_all(task_chunk) - .await - .context(JoinSnafu)? - .into_iter() - .collect::>>>()?; - output_files.extend(metas.into_iter().flatten()); + let merger = self.merger.clone(); + let compaction_region = compaction_region.clone(); + let fut = async move { + merger + .merge_single_output(compaction_region, output, write_opts) + .await + }; + tasks.push((inputs_to_remove, fut)); } - // In case of remote compaction, we still allow the region edit after merge to - // clean expired ssts. - let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); - inputs.extend( + let mut output_files = Vec::with_capacity(tasks.len()); + let mut compacted_inputs = Vec::with_capacity( + tasks.iter().map(|(inputs, _)| inputs.len()).sum::() + + picker_output.expired_ssts.len(), + ); + + while !tasks.is_empty() { + let mut chunk: Vec<(Vec, _)> = Vec::with_capacity(internal_parallelism); + for _ in 0..internal_parallelism { + if let Some(task) = tasks.pop() { + chunk.push(task); + } + } + let spawned: Vec<_> = chunk + .into_iter() + .map(|(inputs, fut)| { + let handle = common_runtime::spawn_compact(fut); + (inputs, handle) + }) + .collect(); + + for (inputs, handle) in spawned { + match handle.await { + Ok(Ok(files)) => { + output_files.extend(files); + compacted_inputs.extend(inputs); + } + Ok(Err(e)) => { + warn!( + e; "Region {} failed to merge compaction output with inputs: [{}], skipping", + region_id, + inputs.iter().map(|f| f.file_id.to_string()).join(",") + ); + } + Err(e) => { + warn!( + "Region {} compaction task join error for inputs: [{}], skipping: {}", + region_id, + inputs.iter().map(|f| f.file_id.to_string()).join(","), + e + ); + return Err(e).context(error::JoinSnafu); + } + } + } + } + + // Include expired SSTs in removals — these don't depend on merge success. + compacted_inputs.extend( picker_output .expired_ssts .iter() @@ -480,7 +555,7 @@ impl Compactor for DefaultCompactor { Ok(MergeOutput { files_to_add: output_files, - files_to_remove: inputs, + files_to_remove: compacted_inputs, compaction_time_window: Some(compaction_time_window), }) } @@ -558,3 +633,244 @@ impl Compactor for DefaultCompactor { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use store_api::storage::{FileId, RegionId}; + + use super::*; + use crate::cache::CacheManager; + use crate::compaction::picker::PickerOutput; + use crate::sst::file::FileHandle; + use crate::sst::file_purger::NoopFilePurger; + use crate::sst::version::SstVersion; + use crate::test_util::memtable_util::metadata_for_test; + use crate::test_util::scheduler_util::SchedulerEnv; + + fn dummy_file_meta() -> FileMeta { + FileMeta { + region_id: RegionId::new(1, 1), + file_id: FileId::random(), + file_size: 100, + ..Default::default() + } + } + + fn new_file_handle(meta: FileMeta) -> FileHandle { + FileHandle::new(meta, Arc::new(NoopFilePurger)) + } + + /// Build a minimal [`CompactionRegion`] suitable for tests where the + /// [`SstMerger`] is mocked and never touches the access layer. + async fn new_test_compaction_region() -> CompactionRegion { + let env = SchedulerEnv::new().await; + let metadata = metadata_for_test(); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + CompactionRegion { + region_id: RegionId::new(1, 1), + region_options: RegionOptions::default(), + engine_config: Arc::new(MitoConfig::default()), + region_metadata: metadata.clone(), + cache_manager: Arc::new(CacheManager::default()), + access_layer: env.access_layer.clone(), + manifest_ctx, + current_version: CompactionVersion { + metadata, + options: RegionOptions::default(), + ssts: Arc::new(SstVersion::new()), + compaction_time_window: None, + }, + file_purger: None, + ttl: None, + max_parallelism: 1, + } + } + + /// An [`SstMerger`] that returns pre-configured results per call index. + /// + /// Call 0 gets `results[0]`, call 1 gets `results[1]`, etc. + #[derive(Clone)] + struct MockMerger { + results: Arc>>>>, + call_idx: Arc, + } + + impl MockMerger { + fn new(results: Vec>>) -> Self { + Self { + results: Arc::new(Mutex::new(results)), + call_idx: Arc::new(AtomicUsize::new(0)), + } + } + } + + #[async_trait::async_trait] + impl SstMerger for MockMerger { + async fn merge_single_output( + &self, + _compaction_region: CompactionRegion, + _output: CompactionOutput, + _write_opts: WriteOptions, + ) -> Result> { + let idx = self.call_idx.fetch_add(1, Ordering::SeqCst); + match self.results.lock().unwrap().get(idx) { + Some(Ok(files)) => Ok(files.clone()), + Some(Err(_)) => error::InvalidMetaSnafu { + reason: format!("simulated failure at index {idx}"), + } + .fail(), + None => panic!("MockMerger: no result configured for call index {idx}"), + } + } + } + + #[tokio::test] + async fn test_partial_merge_failure_collects_only_successful_outputs() { + common_telemetry::init_default_ut_logging(); + + let compaction_region = new_test_compaction_region().await; + + // Prepare 3 compaction outputs: output 0 and 2 succeed, output 1 fails. + let input_meta_0 = dummy_file_meta(); + let input_meta_1 = dummy_file_meta(); + let input_meta_2 = dummy_file_meta(); + + let output_meta_0 = vec![dummy_file_meta()]; + let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()]; + + let merger = MockMerger::new(vec![ + Ok(output_meta_0.clone()), + Err(error::InvalidMetaSnafu { + reason: "boom".to_string(), + } + .build()), + Ok(output_meta_2.clone()), + ]); + let compactor = DefaultCompactor::with_merger(merger); + + let picker_output = PickerOutput { + outputs: vec![ + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta_0.clone())], + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta_1.clone())], + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta_2.clone())], + filter_deleted: false, + output_time_range: None, + }, + ], + expired_ssts: vec![], + time_window_size: 3600, + max_file_size: None, + }; + + let merge_output = compactor + .merge_ssts(&compaction_region, picker_output) + .await + .unwrap(); + + // Outputs 0 and 2 succeeded (1 + 2 = 3 files added). + assert_eq!(merge_output.files_to_add.len(), 3); + // Only inputs from successful merges should be removed. + assert_eq!(merge_output.files_to_remove.len(), 2); + + let removed_ids: Vec<_> = merge_output + .files_to_remove + .iter() + .map(|f| f.file_id) + .collect(); + assert!(removed_ids.contains(&input_meta_0.file_id)); + assert!(removed_ids.contains(&input_meta_2.file_id)); + // The failed output's input must NOT be removed. + assert!(!removed_ids.contains(&input_meta_1.file_id)); + } + + #[tokio::test] + async fn test_all_outputs_succeed() { + common_telemetry::init_default_ut_logging(); + + let compaction_region = new_test_compaction_region().await; + let input_meta = dummy_file_meta(); + let output_meta = vec![dummy_file_meta()]; + + let merger = MockMerger::new(vec![Ok(output_meta.clone())]); + let compactor = DefaultCompactor::with_merger(merger); + + let picker_output = PickerOutput { + outputs: vec![CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta.clone())], + filter_deleted: false, + output_time_range: None, + }], + expired_ssts: vec![], + time_window_size: 3600, + max_file_size: None, + }; + + let merge_output = compactor + .merge_ssts(&compaction_region, picker_output) + .await + .unwrap(); + + assert_eq!(merge_output.files_to_add.len(), 1); + assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id); + assert_eq!(merge_output.files_to_remove.len(), 1); + assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id); + } + + #[tokio::test] + async fn test_expired_ssts_always_removed() { + common_telemetry::init_default_ut_logging(); + + let compaction_region = new_test_compaction_region().await; + let input_meta = dummy_file_meta(); + let expired_meta = dummy_file_meta(); + + // The single merge output fails, but expired SSTs should still be removed. + let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu { + reason: "fail".to_string(), + } + .build())]); + let compactor = DefaultCompactor::with_merger(merger); + + let picker_output = PickerOutput { + outputs: vec![CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta.clone())], + filter_deleted: false, + output_time_range: None, + }], + expired_ssts: vec![new_file_handle(expired_meta.clone())], + time_window_size: 3600, + max_file_size: None, + }; + + let merge_output = compactor + .merge_ssts(&compaction_region, picker_output) + .await + .unwrap(); + + // No files added (merge failed). + assert!(merge_output.files_to_add.is_empty()); + // Only the expired SST should be in files_to_remove (not the failed merge's input). + assert_eq!(merge_output.files_to_remove.len(), 1); + assert_eq!( + merge_output.files_to_remove[0].file_id, + expired_meta.file_id + ); + } +}