From 9f7ffb4d26f498ceb8ea24349f6e341fe29f8365 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 13 Apr 2026 16:12:11 +0800 Subject: [PATCH] feat(mito2): allow CompactionOutput to succeed independently (#7948) * refactor(mito2): improve compaction error handling and file removal Refactor compaction task execution to enhance error handling and robustness. - Implemented parallel execution of compaction tasks with proper error capture and logging for individual task failures. - Ensured JoinSnafu is no longer directly used in error propagation, instead handling errors within the task processing loop. - Adjusted file removal logic to correctly include expired SSTs after compaction merges. Signed-off-by: Lei, HUANG * refactor(mito2): extract SstMerger trait for testability in compaction Extract SstMerger trait and DefaultSstMerger implementation to improve the testability of DefaultCompactor. The DefaultCompactor is now generic over SstMerger, allowing mock implementations to be injected for unit testing without relying on the full object storage access layer. This refactoring separates the concerns of SST file merging from the overall compaction orchestration logic. Additionally: - Updated CompactionScheduler to use DefaultCompactor::default(). - Added unit tests for DefaultCompactor using a MockMerger. Signed-off-by: Lei, HUANG * fix(compaction): propagate join error during sst flush Correctly propagates the error when joining SST flush handles during compaction. Previously, the error was logged but not returned, leading to potential silent failures. Also reorders some imports for consistency. Signed-off-by: Lei, HUANG * perf(compaction): pre-allocate capacity for compacted_inputs Pre-allocates capacity for the compacted_inputs vector based on the estimated total size of inputs and expired SSTs. This optimization aims to reduce vector reallocations during the compaction process. Signed-off-by: Lei, HUANG * feat/allow-partial-compaction: ### Commit Message Enhance `DefaultCompactor` and `MockMerger` for Improved Flexibility - **`compactor.rs`**: - Added `Clone` trait to `DefaultSstMerger` and `MockMerger` to allow cloning. - Removed `Arc` wrapping from `DefaultCompactor`'s `merger` field for direct usage. - Updated `merge_ssts` method to require `Clone` trait for `SstMerger`. - Modified `MockMerger` to use `Arc` for `results` and `call_idx` to ensure thread safety. - Adjusted error handling to use `error::InvalidMetaSnafu` directly. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- src/mito2/src/compaction.rs | 2 +- src/mito2/src/compaction/compactor.rs | 390 +++++++++++++++++++++++--- 2 files changed, 354 insertions(+), 38 deletions(-) diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 944c51ebd6..d2120690ac 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -562,7 +562,7 @@ impl CompactionScheduler { listener, picker_output, compaction_region, - compactor: Arc::new(DefaultCompactor {}), + compactor: Arc::new(DefaultCompactor::default()), memory_manager: self.memory_manager.clone(), memory_policy: self.memory_policy, estimated_memory_bytes: estimated_bytes, diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index ff4317331f..fd3d01b276 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -38,11 +38,10 @@ use crate::compaction::picker::{PickerOutput, new_picker}; use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options}; use crate::config::MitoConfig; use crate::error::{ - EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result, + EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result, }; use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; -use crate::metrics; use crate::read::FlatSource; use crate::region::options::RegionOptions; use crate::region::version::VersionRef; @@ -56,6 +55,7 @@ use crate::sst::index::puffin_manager::PuffinManagerFactory; use crate::sst::location::region_dir_from_table_dir; use crate::sst::parquet::WriteOptions; use crate::sst::version::{SstVersion, SstVersionRef}; +use crate::{error, metrics}; /// Region version for compaction that does not hold memtables. #[derive(Clone)] @@ -299,12 +299,28 @@ pub trait Compactor: Send + Sync + 'static { ) -> Result<()>; } -/// DefaultCompactor is the default implementation of Compactor. -pub struct DefaultCompactor; - -impl DefaultCompactor { - /// Merge a single compaction output into SST files. +/// Trait for merging a single compaction output into SST files. +/// +/// This is extracted from `DefaultCompactor` to allow injecting mock +/// implementations in tests. +#[async_trait::async_trait] +pub trait SstMerger: Send + Sync + 'static { async fn merge_single_output( + &self, + compaction_region: CompactionRegion, + output: CompactionOutput, + write_opts: WriteOptions, + ) -> Result>; +} + +/// The production [`SstMerger`] that reads, merges, and writes SST files. +#[derive(Clone)] +pub struct DefaultSstMerger; + +#[async_trait::async_trait] +impl SstMerger for DefaultSstMerger { + async fn merge_single_output( + &self, compaction_region: CompactionRegion, output: CompactionOutput, write_opts: WriteOptions, @@ -424,54 +440,113 @@ impl DefaultCompactor { } } +/// DefaultCompactor is the default implementation of Compactor. +/// +/// It is parameterized by an [`SstMerger`] to allow injecting mock +/// implementations in tests. +pub struct DefaultCompactor { + merger: M, +} + +impl Default for DefaultCompactor { + fn default() -> Self { + Self { + merger: DefaultSstMerger, + } + } +} + +impl DefaultCompactor { + pub fn with_merger(merger: M) -> Self { + Self { merger } + } +} + #[async_trait::async_trait] -impl Compactor for DefaultCompactor { +impl Compactor for DefaultCompactor +where + M: Clone, +{ async fn merge_ssts( &self, compaction_region: &CompactionRegion, mut picker_output: PickerOutput, ) -> Result { - let mut futs = Vec::with_capacity(picker_output.outputs.len()); - let mut compacted_inputs = - Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum()); let internal_parallelism = compaction_region.max_parallelism.max(1); let compaction_time_window = picker_output.time_window_size; + let region_id = compaction_region.region_id; + + // Build tasks along with their input file metas so we can track which + // inputs correspond to each task. + let mut tasks: Vec<(Vec, _)> = Vec::with_capacity(picker_output.outputs.len()); for output in picker_output.outputs.drain(..) { let inputs_to_remove: Vec<_> = output.inputs.iter().map(|f| f.meta_ref().clone()).collect(); - compacted_inputs.extend(inputs_to_remove.iter().cloned()); let write_opts = WriteOptions { write_buffer_size: compaction_region.engine_config.sst_write_buffer_size, max_file_size: picker_output.max_file_size, ..Default::default() }; - futs.push(Self::merge_single_output( - compaction_region.clone(), - output, - write_opts, - )); - } - let mut output_files = Vec::with_capacity(futs.len()); - while !futs.is_empty() { - let mut task_chunk = Vec::with_capacity(internal_parallelism); - for _ in 0..internal_parallelism { - if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_compact(task)); - } - } - let metas = futures::future::try_join_all(task_chunk) - .await - .context(JoinSnafu)? - .into_iter() - .collect::>>>()?; - output_files.extend(metas.into_iter().flatten()); + let merger = self.merger.clone(); + let compaction_region = compaction_region.clone(); + let fut = async move { + merger + .merge_single_output(compaction_region, output, write_opts) + .await + }; + tasks.push((inputs_to_remove, fut)); } - // In case of remote compaction, we still allow the region edit after merge to - // clean expired ssts. - let mut inputs: Vec<_> = compacted_inputs.into_iter().collect(); - inputs.extend( + let mut output_files = Vec::with_capacity(tasks.len()); + let mut compacted_inputs = Vec::with_capacity( + tasks.iter().map(|(inputs, _)| inputs.len()).sum::() + + picker_output.expired_ssts.len(), + ); + + while !tasks.is_empty() { + let mut chunk: Vec<(Vec, _)> = Vec::with_capacity(internal_parallelism); + for _ in 0..internal_parallelism { + if let Some(task) = tasks.pop() { + chunk.push(task); + } + } + let spawned: Vec<_> = chunk + .into_iter() + .map(|(inputs, fut)| { + let handle = common_runtime::spawn_compact(fut); + (inputs, handle) + }) + .collect(); + + for (inputs, handle) in spawned { + match handle.await { + Ok(Ok(files)) => { + output_files.extend(files); + compacted_inputs.extend(inputs); + } + Ok(Err(e)) => { + warn!( + e; "Region {} failed to merge compaction output with inputs: [{}], skipping", + region_id, + inputs.iter().map(|f| f.file_id.to_string()).join(",") + ); + } + Err(e) => { + warn!( + "Region {} compaction task join error for inputs: [{}], skipping: {}", + region_id, + inputs.iter().map(|f| f.file_id.to_string()).join(","), + e + ); + return Err(e).context(error::JoinSnafu); + } + } + } + } + + // Include expired SSTs in removals — these don't depend on merge success. + compacted_inputs.extend( picker_output .expired_ssts .iter() @@ -480,7 +555,7 @@ impl Compactor for DefaultCompactor { Ok(MergeOutput { files_to_add: output_files, - files_to_remove: inputs, + files_to_remove: compacted_inputs, compaction_time_window: Some(compaction_time_window), }) } @@ -558,3 +633,244 @@ impl Compactor for DefaultCompactor { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::sync::Mutex; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use store_api::storage::{FileId, RegionId}; + + use super::*; + use crate::cache::CacheManager; + use crate::compaction::picker::PickerOutput; + use crate::sst::file::FileHandle; + use crate::sst::file_purger::NoopFilePurger; + use crate::sst::version::SstVersion; + use crate::test_util::memtable_util::metadata_for_test; + use crate::test_util::scheduler_util::SchedulerEnv; + + fn dummy_file_meta() -> FileMeta { + FileMeta { + region_id: RegionId::new(1, 1), + file_id: FileId::random(), + file_size: 100, + ..Default::default() + } + } + + fn new_file_handle(meta: FileMeta) -> FileHandle { + FileHandle::new(meta, Arc::new(NoopFilePurger)) + } + + /// Build a minimal [`CompactionRegion`] suitable for tests where the + /// [`SstMerger`] is mocked and never touches the access layer. + async fn new_test_compaction_region() -> CompactionRegion { + let env = SchedulerEnv::new().await; + let metadata = metadata_for_test(); + let manifest_ctx = env.mock_manifest_context(metadata.clone()).await; + CompactionRegion { + region_id: RegionId::new(1, 1), + region_options: RegionOptions::default(), + engine_config: Arc::new(MitoConfig::default()), + region_metadata: metadata.clone(), + cache_manager: Arc::new(CacheManager::default()), + access_layer: env.access_layer.clone(), + manifest_ctx, + current_version: CompactionVersion { + metadata, + options: RegionOptions::default(), + ssts: Arc::new(SstVersion::new()), + compaction_time_window: None, + }, + file_purger: None, + ttl: None, + max_parallelism: 1, + } + } + + /// An [`SstMerger`] that returns pre-configured results per call index. + /// + /// Call 0 gets `results[0]`, call 1 gets `results[1]`, etc. + #[derive(Clone)] + struct MockMerger { + results: Arc>>>>, + call_idx: Arc, + } + + impl MockMerger { + fn new(results: Vec>>) -> Self { + Self { + results: Arc::new(Mutex::new(results)), + call_idx: Arc::new(AtomicUsize::new(0)), + } + } + } + + #[async_trait::async_trait] + impl SstMerger for MockMerger { + async fn merge_single_output( + &self, + _compaction_region: CompactionRegion, + _output: CompactionOutput, + _write_opts: WriteOptions, + ) -> Result> { + let idx = self.call_idx.fetch_add(1, Ordering::SeqCst); + match self.results.lock().unwrap().get(idx) { + Some(Ok(files)) => Ok(files.clone()), + Some(Err(_)) => error::InvalidMetaSnafu { + reason: format!("simulated failure at index {idx}"), + } + .fail(), + None => panic!("MockMerger: no result configured for call index {idx}"), + } + } + } + + #[tokio::test] + async fn test_partial_merge_failure_collects_only_successful_outputs() { + common_telemetry::init_default_ut_logging(); + + let compaction_region = new_test_compaction_region().await; + + // Prepare 3 compaction outputs: output 0 and 2 succeed, output 1 fails. + let input_meta_0 = dummy_file_meta(); + let input_meta_1 = dummy_file_meta(); + let input_meta_2 = dummy_file_meta(); + + let output_meta_0 = vec![dummy_file_meta()]; + let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()]; + + let merger = MockMerger::new(vec![ + Ok(output_meta_0.clone()), + Err(error::InvalidMetaSnafu { + reason: "boom".to_string(), + } + .build()), + Ok(output_meta_2.clone()), + ]); + let compactor = DefaultCompactor::with_merger(merger); + + let picker_output = PickerOutput { + outputs: vec![ + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta_0.clone())], + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta_1.clone())], + filter_deleted: false, + output_time_range: None, + }, + CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta_2.clone())], + filter_deleted: false, + output_time_range: None, + }, + ], + expired_ssts: vec![], + time_window_size: 3600, + max_file_size: None, + }; + + let merge_output = compactor + .merge_ssts(&compaction_region, picker_output) + .await + .unwrap(); + + // Outputs 0 and 2 succeeded (1 + 2 = 3 files added). + assert_eq!(merge_output.files_to_add.len(), 3); + // Only inputs from successful merges should be removed. + assert_eq!(merge_output.files_to_remove.len(), 2); + + let removed_ids: Vec<_> = merge_output + .files_to_remove + .iter() + .map(|f| f.file_id) + .collect(); + assert!(removed_ids.contains(&input_meta_0.file_id)); + assert!(removed_ids.contains(&input_meta_2.file_id)); + // The failed output's input must NOT be removed. + assert!(!removed_ids.contains(&input_meta_1.file_id)); + } + + #[tokio::test] + async fn test_all_outputs_succeed() { + common_telemetry::init_default_ut_logging(); + + let compaction_region = new_test_compaction_region().await; + let input_meta = dummy_file_meta(); + let output_meta = vec![dummy_file_meta()]; + + let merger = MockMerger::new(vec![Ok(output_meta.clone())]); + let compactor = DefaultCompactor::with_merger(merger); + + let picker_output = PickerOutput { + outputs: vec![CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta.clone())], + filter_deleted: false, + output_time_range: None, + }], + expired_ssts: vec![], + time_window_size: 3600, + max_file_size: None, + }; + + let merge_output = compactor + .merge_ssts(&compaction_region, picker_output) + .await + .unwrap(); + + assert_eq!(merge_output.files_to_add.len(), 1); + assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id); + assert_eq!(merge_output.files_to_remove.len(), 1); + assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id); + } + + #[tokio::test] + async fn test_expired_ssts_always_removed() { + common_telemetry::init_default_ut_logging(); + + let compaction_region = new_test_compaction_region().await; + let input_meta = dummy_file_meta(); + let expired_meta = dummy_file_meta(); + + // The single merge output fails, but expired SSTs should still be removed. + let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu { + reason: "fail".to_string(), + } + .build())]); + let compactor = DefaultCompactor::with_merger(merger); + + let picker_output = PickerOutput { + outputs: vec![CompactionOutput { + output_level: 1, + inputs: vec![new_file_handle(input_meta.clone())], + filter_deleted: false, + output_time_range: None, + }], + expired_ssts: vec![new_file_handle(expired_meta.clone())], + time_window_size: 3600, + max_file_size: None, + }; + + let merge_output = compactor + .merge_ssts(&compaction_region, picker_output) + .await + .unwrap(); + + // No files added (merge failed). + assert!(merge_output.files_to_add.is_empty()); + // Only the expired SST should be in files_to_remove (not the failed merge's input). + assert_eq!(merge_output.files_to_remove.len(), 1); + assert_eq!( + merge_output.files_to_remove[0].file_id, + expired_meta.file_id + ); + } +}